Commit a72fda79 authored by Ad Schellevis's avatar Ad Schellevis

(netflow/flowd) move resolution to BaseFlowAggregator, add cleanup() to expire old data

parent 2e20c8d6
...@@ -41,7 +41,8 @@ class AggMetadata(object): ...@@ -41,7 +41,8 @@ class AggMetadata(object):
if not os.path.isdir(target_path): if not os.path.isdir(target_path):
os.makedirs(target_path) os.makedirs(target_path)
# open sqlite database and cursor # open sqlite database and cursor
self._db_connection = sqlite3.connect(self._filename) self._db_connection = sqlite3.connect(self._filename,
detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
self._db_cursor = self._db_connection.cursor() self._db_cursor = self._db_connection.cursor()
# known tables # known tables
self._tables = list() self._tables = list()
...@@ -81,8 +82,23 @@ class BaseFlowAggregator(object): ...@@ -81,8 +82,23 @@ class BaseFlowAggregator(object):
# list of fields to use in this aggregate # list of fields to use in this aggregate
agg_fields = None agg_fields = None
@classmethod
def resolutions(cls):
""" sample resolutions for this aggregation
:return: list of sample resolutions
"""
return list()
@classmethod
def history_per_resolution(cls):
""" history to keep in seconds per sample resolution
:return: dict sample resolution / expire time (seconds)
"""
return dict()
def __init__(self, resolution): def __init__(self, resolution):
""" construct new flow sample class """ construct new flow sample class
:return: None
""" """
self.resolution = resolution self.resolution = resolution
# target table name, data_<resolution in seconds> # target table name, data_<resolution in seconds>
...@@ -101,6 +117,7 @@ class BaseFlowAggregator(object): ...@@ -101,6 +117,7 @@ class BaseFlowAggregator(object):
def _fetch_known_targets(self): def _fetch_known_targets(self):
""" read known target table names from the sqlite db """ read known target table names from the sqlite db
:return: None
""" """
if self._db_connection is not None: if self._db_connection is not None:
self._known_targets = list() self._known_targets = list()
...@@ -112,6 +129,7 @@ class BaseFlowAggregator(object): ...@@ -112,6 +129,7 @@ class BaseFlowAggregator(object):
def _create_target_table(self): def _create_target_table(self):
""" construct target aggregate table, using resulution and list of agg_fields """ construct target aggregate table, using resulution and list of agg_fields
:return: None
""" """
if self._db_connection is not None: if self._db_connection is not None:
# construct new aggregate table # construct new aggregate table
...@@ -132,6 +150,7 @@ class BaseFlowAggregator(object): ...@@ -132,6 +150,7 @@ class BaseFlowAggregator(object):
def is_db_open(self): def is_db_open(self):
""" check if target database is open """ check if target database is open
:return: database connected (True/False)
""" """
if self._db_connection is not None: if self._db_connection is not None:
return True return True
...@@ -140,6 +159,7 @@ class BaseFlowAggregator(object): ...@@ -140,6 +159,7 @@ class BaseFlowAggregator(object):
def _open_db(self): def _open_db(self):
""" open / create database """ open / create database
:return: None
""" """
if self.target_filename is not None: if self.target_filename is not None:
# make sure the target directory exists # make sure the target directory exists
...@@ -147,12 +167,14 @@ class BaseFlowAggregator(object): ...@@ -147,12 +167,14 @@ class BaseFlowAggregator(object):
if not os.path.isdir(target_path): if not os.path.isdir(target_path):
os.makedirs(target_path) os.makedirs(target_path)
# open sqlite database # open sqlite database
self._db_connection = sqlite3.connect(self.target_filename % self.resolution) self._db_connection = sqlite3.connect(self.target_filename % self.resolution,
detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
# open update/insert cursor # open update/insert cursor
self._update_cur = self._db_connection.cursor() self._update_cur = self._db_connection.cursor()
def commit(self): def commit(self):
""" commit data """ commit data
:return: None
""" """
if self._db_connection is not None: if self._db_connection is not None:
self._db_connection.commit() self._db_connection.commit()
...@@ -160,6 +182,7 @@ class BaseFlowAggregator(object): ...@@ -160,6 +182,7 @@ class BaseFlowAggregator(object):
def add(self, flow): def add(self, flow):
""" calculate timeslices per flow depending on sample resolution """ calculate timeslices per flow depending on sample resolution
:param flow: flow data (from parse.py) :param flow: flow data (from parse.py)
:return: None
""" """
# make sure target exists # make sure target exists
if 'timeserie' not in self._known_targets: if 'timeserie' not in self._known_targets:
...@@ -184,3 +207,18 @@ class BaseFlowAggregator(object): ...@@ -184,3 +207,18 @@ class BaseFlowAggregator(object):
self._update_cur.execute(self._insert_stmt, flow) self._update_cur.execute(self._insert_stmt, flow)
# next start time # next start time
start_time += self.resolution start_time += self.resolution
def cleanup(self):
""" cleanup timeserie table
:param expire: cleanup table, remove data older then [expire] seconds
:return: None
"""
if 'timeserie' in self._known_targets and self.resolution in self.history_per_resolution():
self._update_cur.execute('select max(mtime) as "[timestamp]" from timeserie')
last_timestamp = self._update_cur.fetchall()[0][0]
if type(last_timestamp) == datetime.datetime:
expire = self.history_per_resolution()[self.resolution]
expire_timestamp = last_timestamp - datetime.timedelta(seconds=expire)
self._update_cur.execute('delete from timeserie where mtime < :expire', {'expire': expire_timestamp})
self.commit()
# todo: might need vacuum at some point.
...@@ -34,5 +34,23 @@ class FlowInterfaceTotals(BaseFlowAggregator): ...@@ -34,5 +34,23 @@ class FlowInterfaceTotals(BaseFlowAggregator):
target_filename = '/var/netflow/interface_%06d.sqlite' target_filename = '/var/netflow/interface_%06d.sqlite'
agg_fields = ['if_in', 'if_out'] agg_fields = ['if_in', 'if_out']
@classmethod
def resolutions(cls):
"""
:return: list of sample resolutions
"""
return [60, 60*5]
@classmethod
def history_per_resolution(cls):
"""
:return: dict sample resolution / expire time (seconds)
"""
return {60: 60*60}
def __init__(self, resolution): def __init__(self, resolution):
"""
:param resolution: sample resultion (seconds)
:return: None
"""
super(FlowInterfaceTotals, self).__init__(resolution) super(FlowInterfaceTotals, self).__init__(resolution)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment