Commit 158f0532 authored by Ad Schellevis's avatar Ad Schellevis

(netflow, flowd aggregation) add request for top(X) from given dataset, small cleanups

parent 31068301
......@@ -70,7 +70,7 @@ if valid_params:
for agg_class in lib.aggregates.get_aggregators():
if app_params['provider'] == agg_class.__name__:
obj = agg_class(resolution)
for record in obj.get_data(start_time, end_time, key_fields):
for record in obj.get_timeserie_data(start_time, end_time, key_fields):
record_key = []
for key_field in key_fields:
if key_field in record and record[key_field] != None:
......
#!/usr/local/bin/python2.7
"""
Copyright (c) 2016 Ad Schellevis
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
--------------------------------------------------------------------------------------
fetch top usage from data provider
"""
import time
import datetime
import os
import sys
import ujson
sys.path.insert(0, "/usr/local/opnsense/site-python")
from lib.parse import parse_flow
from lib.aggregate import BaseFlowAggregator
import lib.aggregates
import params
app_params = {'start_time': '',
'end_time': '',
'key_fields': '',
'value_field': '',
'filter': '',
'max_hits': '',
'provider': ''
}
params.update_params(app_params)
# handle input parameters
valid_params = False
if app_params['start_time'].isdigit():
start_time = int(app_params['start_time'])
if app_params['end_time'].isdigit():
end_time = int(app_params['end_time'])
if app_params['max_hits'].isdigit():
max_hits = int(app_params['max_hits'])
if app_params['key_fields']:
key_fields = app_params['key_fields'].split(',')
if app_params['value_field']:
value_field = app_params['value_field']
valid_params = True
data_filter=app_params['filter']
timeseries=dict()
if valid_params:
# collect requested top
result = dict()
for agg_class in lib.aggregates.get_aggregators():
if app_params['provider'] == agg_class.__name__:
# provider may specify multiple resolutions, we need to find the one most likely to serve the
# beginning of our timeframe
resolutions = sorted(agg_class.resolutions())
history_per_resolution = agg_class.history_per_resolution()
for resolution in resolutions:
if (resolution in history_per_resolution \
and time.time() - history_per_resolution[resolution] <= start_time ) \
or resolutions[-1] == resolution:
selected_resolution = resolution
break
obj = agg_class(selected_resolution)
result = obj.get_top_data(start_time, end_time, key_fields, value_field, data_filter, max_hits)
print (ujson.dumps(result))
else:
print ('missing parameters :')
tmp = list()
for key in app_params:
tmp.append('/%s %s' % (key, app_params[key]))
print (' %s %s'%(sys.argv[0], ' '.join(tmp)))
print ('')
print (' start_time : start time (seconds since epoch)')
print (' end_time : end timestamp (seconds since epoch)')
print (' key_fields : key field(s)')
print (' value_field : field to sum')
print (' filter : apply filter <field>=value')
print (' provider : data provider classname')
print (' max_hits : maximum number of hits (+1 for rest of data)')
print (' sample : if provided, use these keys to generate sample data (e.g. em0,em1,em2)')
......@@ -267,7 +267,32 @@ class BaseFlowAggregator(object):
# vacuum database if requested
self._update_cur.execute('vacuum')
def get_data(self, start_time, end_time, fields):
def _parse_timestamp(self, timestamp):
""" convert input to datetime.datetime or return if it already was of that type
:param timestamp: timestamp to convert
:return: datetime.datetime object
"""
if type(timestamp) in (int, float):
return datetime.datetime.fromtimestamp(timestamp)
elif type(timestamp) != datetime.datetime:
return datetime.datetime.fromtimestamp(0)
else:
return timestamp
def _valid_fields(self, fields):
""" cleanse fields (return only valid ones)
:param fields: field list
:return: list
"""
# validate field list (can only select fields in self.agg_fields)
select_fields = list()
for field in fields:
if field.strip() in self.agg_fields:
select_fields.append(field.strip())
return select_fields
def get_timeserie_data(self, start_time, end_time, fields):
""" fetch data from aggregation source, groups by mtime and selected fields
:param start_time: start timestamp
:param end_time: end timestamp
......@@ -276,10 +301,7 @@ class BaseFlowAggregator(object):
"""
if self.is_db_open() and 'timeserie' in self._known_targets:
# validate field list (can only select fields in self.agg_fields)
select_fields = list()
for field in self.agg_fields:
if field in fields:
select_fields.append(field)
select_fields = self._valid_fields(fields)
if len(select_fields) == 0:
# select "none", add static null as field
select_fields.append('null')
......@@ -288,20 +310,11 @@ class BaseFlowAggregator(object):
sql_select += 'from timeserie \n'
sql_select += 'where mtime >= :start_time and mtime < :end_time\n'
sql_select += 'group by mtime, %s\n'% ','.join(select_fields)
# make sure start- and end time are of datetime.datetime type
if type(start_time) in (int, float):
start_time = datetime.datetime.fromtimestamp(start_time)
elif type(start_time) != datetime.datetime:
start_time = datetime.datetime.fromtimestamp(0)
if type(end_time) in (int, float):
end_time = datetime.datetime.fromtimestamp(end_time)
elif type(end_time) != datetime.datetime:
end_time = datetime.datetime.fromtimestamp(0)
# execute select query
cur = self._db_connection.cursor()
cur.execute(sql_select, {'start_time': start_time, 'end_time': end_time})
cur.execute(sql_select, {'start_time': self._parse_timestamp(start_time),
'end_time': self._parse_timestamp(end_time)})
#
field_names = (map(lambda x:x[0], cur.description))
for record in cur.fetchall():
......@@ -315,3 +328,72 @@ class BaseFlowAggregator(object):
yield result_record
# close cursor
cur.close()
def get_top_data(self, start_time, end_time, fields, value_field, data_filter=None, max_hits=100):
""" Retrieve top (usage) from this aggregation.
Fetch data from aggregation source, groups by selected fields, sorts by value_field descending
use data_filter to filter before grouping.
:param start_time: start timestamp
:param end_time: end timestamp
:param fields: fields to retrieve
:param value_field: field to sum
:param data_filter: filter data, use as field=value
:param max_hits: maximum number of results, rest is summed into (other)
:return: iterator returning dict records (start_time, end_time, [fields], octets, packets)
"""
result = list()
select_fields = self._valid_fields(fields)
filter_fields = []
query_params = {}
if value_field == 'octets':
value_sql = 'sum(octets)'
elif value_field == 'packets':
value_sql = 'sum(packets)'
else:
value_sql = '0'
# query filters
query_params['start_time'] = self._parse_timestamp(start_time)
query_params['end_time'] = self._parse_timestamp(end_time)
if data_filter:
tmp = data_filter.split('=')[0].strip()
if tmp in self.agg_fields and data_filter.find('=') > -1:
filter_fields.append(tmp)
query_params[tmp] = '='.join(data_filter.split('=')[1:])
if len(select_fields) > 0:
# construct sql query to filter and select data
sql_select = 'select %s' % ','.join(select_fields)
sql_select += ', %s as total \n' % value_sql
sql_select += 'from timeserie \n'
sql_select += 'where mtime >= :start_time and mtime < :end_time\n'
for filter_field in filter_fields:
sql_select += ' and %s = :%s \n' % (filter_field, filter_field)
sql_select += 'group by %s\n'% ','.join(select_fields)
sql_select += 'order by %s desc ' % value_sql
# execute select query
cur = self._db_connection.cursor()
cur.execute(sql_select, query_params)
# fetch all data, to a max of [max_hits] rows.
field_names = (map(lambda x:x[0], cur.description))
for record in cur.fetchall():
result_record = dict()
for field_indx in range(len(field_names)):
if len(record) > field_indx:
result_record[field_names[field_indx]] = record[field_indx]
if len(result) < max_hits:
result.append(result_record)
else:
if len(result) == max_hits:
# generate row for "rest of data"
result.append({'total': 0})
for key in result_record:
if key not in result[-1]:
result[-1][key] = ""
result[-1]['total'] += result_record['total']
# close cursor
cur.close()
return result
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment