Commit 62362038 authored by Ad Schellevis's avatar Ad Schellevis

(netflow/flowd) get data from aggregator

parent aac5b9ec
...@@ -96,6 +96,14 @@ class BaseFlowAggregator(object): ...@@ -96,6 +96,14 @@ class BaseFlowAggregator(object):
""" """
return dict() return dict()
@classmethod
def seconds_per_day(cls, days):
"""
:param days: number of days
:return: number of seconds
"""
return 60*60*24
def __init__(self, resolution): def __init__(self, resolution):
""" construct new flow sample class """ construct new flow sample class
:return: None :return: None
...@@ -222,3 +230,51 @@ class BaseFlowAggregator(object): ...@@ -222,3 +230,51 @@ class BaseFlowAggregator(object):
self._update_cur.execute('delete from timeserie where mtime < :expire', {'expire': expire_timestamp}) self._update_cur.execute('delete from timeserie where mtime < :expire', {'expire': expire_timestamp})
self.commit() self.commit()
# todo: might need vacuum at some point. # todo: might need vacuum at some point.
def get_data(self, start_time, end_time, fields):
""" fetch data from aggregation source, groups by mtime and selected fields
:param start_time: start timestamp
:param end_time: end timestamp
:param fields: fields to retrieve
:return: iterator returning dict records (start_time, end_time, [fields], octets, packets)
"""
# validate field list (can only select fields in self.agg_fields)
select_fields = list()
for field in self.agg_fields:
if field in fields:
select_fields.append(field)
if len(select_fields) == 0:
# select "none", add static null as field
select_fields.append('null')
sql_select = 'select mtime as "start_time [timestamp]", %s' % ','.join(select_fields)
sql_select += ', sum(octets) as octets, sum(packets) as packets\n'
sql_select += 'from timeserie \n'
sql_select += 'where mtime >= :start_time and mtime < :end_time\n'
sql_select += 'group by mtime, %s\n'% ','.join(select_fields)
# make sure start- and end time are of datetime.datetime type
if type(start_time) in (int, float):
start_time = datetime.datetime.fromtimestamp(start_time)
elif type(start_time) != datetime.datetime:
start_time = datetime.datetime.fromtimestamp(0)
if type(end_time) in (int, float):
end_time = datetime.datetime.fromtimestamp(end_time)
elif type(end_time) != datetime.datetime:
end_time = datetime.datetime.fromtimestamp(0)
# execute select query
cur = self._db_connection.cursor()
cur.execute(sql_select, {'start_time': start_time, 'end_time': end_time})
#
field_names = (map(lambda x:x[0], cur.description))
for record in cur.fetchall():
result_record = dict()
for field_indx in range(len(field_names)):
if len(record) > field_indx:
result_record[field_names[field_indx]] = record[field_indx]
if 'start_time' in result_record:
result_record['end_time'] = result_record['start_time'] + datetime.timedelta(seconds=self.resolution)
# send data
yield result_record
# close cursor
cur.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment