Commit 98119316 authored by Ad Schellevis's avatar Ad Schellevis

(netflow, flowd agg) add vacuum, change poll interval

parent 78069d8f
...@@ -44,8 +44,9 @@ MAX_FILE_SIZE_MB=10 ...@@ -44,8 +44,9 @@ MAX_FILE_SIZE_MB=10
MAX_LOGS=10 MAX_LOGS=10
def aggregate_flowd(): def aggregate_flowd(do_vacuum=False):
""" aggregate collected flowd data """ aggregate collected flowd data
:param do_vacuum: vacuum database after cleanup
:return: None :return: None
""" """
# init metadata (progress maintenance) # init metadata (progress maintenance)
...@@ -53,7 +54,6 @@ def aggregate_flowd(): ...@@ -53,7 +54,6 @@ def aggregate_flowd():
# register aggregate classes to stream data to # register aggregate classes to stream data to
stream_agg_objects = list() stream_agg_objects = list()
resolutions = [60, 60*5]
for agg_class in lib.aggregates.get_aggregators(): for agg_class in lib.aggregates.get_aggregators():
for resolution in agg_class.resolutions(): for resolution in agg_class.resolutions():
stream_agg_objects.append(agg_class(resolution)) stream_agg_objects.append(agg_class(resolution))
...@@ -74,7 +74,7 @@ def aggregate_flowd(): ...@@ -74,7 +74,7 @@ def aggregate_flowd():
# expire old data # expire old data
for stream_agg_object in stream_agg_objects: for stream_agg_object in stream_agg_objects:
stream_agg_object.cleanup() stream_agg_object.cleanup(do_vacuum)
del stream_agg_object del stream_agg_object
del metadata del metadata
...@@ -122,13 +122,21 @@ class Main(object): ...@@ -122,13 +122,21 @@ class Main(object):
""" run, endless loop, until sigterm is received """ run, endless loop, until sigterm is received
:return: None :return: None
""" """
vacuum_interval = (60*60*8) # 8 hour vacuum cycle
vacuum_countdown = None
while self.running: while self.running:
# should we perform a vacuum
if not vacuum_countdown or vacuum_countdown < time.time():
vacuum_countdown = time.time() + vacuum_interval
do_vacuum = True
else:
do_vacuum = False
# run aggregate # run aggregate
aggregate_flowd() aggregate_flowd(do_vacuum)
# rotate if needed # rotate if needed
check_rotate() check_rotate()
# wait for next pass, exit on sigterm # wait for next pass, exit on sigterm
for i in range(120): for i in range(30):
if self.running: if self.running:
time.sleep(0.5) time.sleep(0.5)
else: else:
......
...@@ -230,9 +230,10 @@ class BaseFlowAggregator(object): ...@@ -230,9 +230,10 @@ class BaseFlowAggregator(object):
# next start time # next start time
start_time += self.resolution start_time += self.resolution
def cleanup(self): def cleanup(self, do_vacuum = False):
""" cleanup timeserie table """ cleanup timeserie table
:param expire: cleanup table, remove data older then [expire] seconds :param expire: cleanup table, remove data older then [expire] seconds
:param do_vacuum: vacuum database
:return: None :return: None
""" """
if self.is_db_open() and 'timeserie' in self._known_targets \ if self.is_db_open() and 'timeserie' in self._known_targets \
...@@ -244,7 +245,9 @@ class BaseFlowAggregator(object): ...@@ -244,7 +245,9 @@ class BaseFlowAggregator(object):
expire_timestamp = last_timestamp - datetime.timedelta(seconds=expire) expire_timestamp = last_timestamp - datetime.timedelta(seconds=expire)
self._update_cur.execute('delete from timeserie where mtime < :expire', {'expire': expire_timestamp}) self._update_cur.execute('delete from timeserie where mtime < :expire', {'expire': expire_timestamp})
self.commit() self.commit()
# todo: might need vacuum at some point. if do_vacuum:
# vacuum database if requested
self._update_cur.execute('vacuum')
def get_data(self, start_time, end_time, fields): def get_data(self, start_time, end_time, fields):
""" fetch data from aggregation source, groups by mtime and selected fields """ fetch data from aggregation source, groups by mtime and selected fields
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment