Commit a818fb3f authored by Ad Schellevis's avatar Ad Schellevis

(netflow/flowd) validate connection in BaseFlowAggregator

parent 62362038
......@@ -221,7 +221,8 @@ class BaseFlowAggregator(object):
:param expire: cleanup table, remove data older then [expire] seconds
:return: None
"""
if 'timeserie' in self._known_targets and self.resolution in self.history_per_resolution():
if self.is_db_open() and 'timeserie' in self._known_targets \
and self.resolution in self.history_per_resolution():
self._update_cur.execute('select max(mtime) as "[timestamp]" from timeserie')
last_timestamp = self._update_cur.fetchall()[0][0]
if type(last_timestamp) == datetime.datetime:
......@@ -238,43 +239,44 @@ class BaseFlowAggregator(object):
:param fields: fields to retrieve
:return: iterator returning dict records (start_time, end_time, [fields], octets, packets)
"""
# validate field list (can only select fields in self.agg_fields)
select_fields = list()
for field in self.agg_fields:
if field in fields:
select_fields.append(field)
if len(select_fields) == 0:
# select "none", add static null as field
select_fields.append('null')
sql_select = 'select mtime as "start_time [timestamp]", %s' % ','.join(select_fields)
sql_select += ', sum(octets) as octets, sum(packets) as packets\n'
sql_select += 'from timeserie \n'
sql_select += 'where mtime >= :start_time and mtime < :end_time\n'
sql_select += 'group by mtime, %s\n'% ','.join(select_fields)
# make sure start- and end time are of datetime.datetime type
if type(start_time) in (int, float):
start_time = datetime.datetime.fromtimestamp(start_time)
elif type(start_time) != datetime.datetime:
start_time = datetime.datetime.fromtimestamp(0)
if self.is_db_open() and 'timeserie' in self._known_targets:
# validate field list (can only select fields in self.agg_fields)
select_fields = list()
for field in self.agg_fields:
if field in fields:
select_fields.append(field)
if len(select_fields) == 0:
# select "none", add static null as field
select_fields.append('null')
sql_select = 'select mtime as "start_time [timestamp]", %s' % ','.join(select_fields)
sql_select += ', sum(octets) as octets, sum(packets) as packets\n'
sql_select += 'from timeserie \n'
sql_select += 'where mtime >= :start_time and mtime < :end_time\n'
sql_select += 'group by mtime, %s\n'% ','.join(select_fields)
# make sure start- and end time are of datetime.datetime type
if type(start_time) in (int, float):
start_time = datetime.datetime.fromtimestamp(start_time)
elif type(start_time) != datetime.datetime:
start_time = datetime.datetime.fromtimestamp(0)
if type(end_time) in (int, float):
end_time = datetime.datetime.fromtimestamp(end_time)
elif type(end_time) != datetime.datetime:
end_time = datetime.datetime.fromtimestamp(0)
if type(end_time) in (int, float):
end_time = datetime.datetime.fromtimestamp(end_time)
elif type(end_time) != datetime.datetime:
end_time = datetime.datetime.fromtimestamp(0)
# execute select query
cur = self._db_connection.cursor()
cur.execute(sql_select, {'start_time': start_time, 'end_time': end_time})
#
field_names = (map(lambda x:x[0], cur.description))
for record in cur.fetchall():
result_record = dict()
for field_indx in range(len(field_names)):
if len(record) > field_indx:
result_record[field_names[field_indx]] = record[field_indx]
if 'start_time' in result_record:
result_record['end_time'] = result_record['start_time'] + datetime.timedelta(seconds=self.resolution)
# send data
yield result_record
# close cursor
cur.close()
# execute select query
cur = self._db_connection.cursor()
cur.execute(sql_select, {'start_time': start_time, 'end_time': end_time})
#
field_names = (map(lambda x:x[0], cur.description))
for record in cur.fetchall():
result_record = dict()
for field_indx in range(len(field_names)):
if len(record) > field_indx:
result_record[field_names[field_indx]] = record[field_indx]
if 'start_time' in result_record:
result_record['end_time'] = result_record['start_time'] + datetime.timedelta(seconds=self.resolution)
# send data
yield result_record
# close cursor
cur.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment