Commit b34f61cb authored by Ad Schellevis's avatar Ad Schellevis

(netflow) style fixes

parent 8bdec9b4
...@@ -36,7 +36,7 @@ import ujson ...@@ -36,7 +36,7 @@ import ujson
if __name__ == '__main__': if __name__ == '__main__':
result = dict() result = dict()
netflow_nodes=list() netflow_nodes = list()
with tempfile.NamedTemporaryFile() as output_stream: with tempfile.NamedTemporaryFile() as output_stream:
subprocess.call(['/usr/sbin/ngctl', 'list'], stdout=output_stream, stderr=open(os.devnull, 'wb')) subprocess.call(['/usr/sbin/ngctl', 'list'], stdout=output_stream, stderr=open(os.devnull, 'wb'))
output_stream.seek(0) output_stream.seek(0)
...@@ -45,23 +45,23 @@ if __name__ == '__main__': ...@@ -45,23 +45,23 @@ if __name__ == '__main__':
netflow_nodes.append(line.split()[1]) netflow_nodes.append(line.split()[1])
for netflow_node in netflow_nodes: for netflow_node in netflow_nodes:
node_stats={'SrcIPaddress': list(), 'DstIPaddress': list(), 'Pkts': 0} node_stats = {'SrcIPaddress': list(), 'DstIPaddress': list(), 'Pkts': 0}
with tempfile.NamedTemporaryFile() as output_stream: with tempfile.NamedTemporaryFile() as output_stream:
subprocess.call(['/usr/sbin/flowctl','%s:'%netflow_node, 'show'], subprocess.call(['/usr/sbin/flowctl', '%s:' % netflow_node, 'show'],
stdout=output_stream, stderr=open(os.devnull, 'wb')) stdout=output_stream, stderr=open(os.devnull, 'wb'))
output_stream.seek(0) output_stream.seek(0)
for line in output_stream.read().split('\n'): for line in output_stream.read().split('\n'):
fields=line.split() fields = line.split()
if (len(fields) >= 8 and fields[0] != 'SrcIf'): if len(fields) >= 8 and fields[0] != 'SrcIf':
node_stats['Pkts'] += int(fields[7]) node_stats['Pkts'] += int(fields[7])
if fields[1] not in node_stats['SrcIPaddress']: if fields[1] not in node_stats['SrcIPaddress']:
node_stats['SrcIPaddress'].append(fields[1]) node_stats['SrcIPaddress'].append(fields[1])
if fields[3] not in node_stats['DstIPaddress']: if fields[3] not in node_stats['DstIPaddress']:
node_stats['DstIPaddress'].append(fields[3]) node_stats['DstIPaddress'].append(fields[3])
result[netflow_node]={'Pkts': node_stats['Pkts'], result[netflow_node] = {'Pkts': node_stats['Pkts'],
'if': netflow_node[8:], 'if': netflow_node[8:],
'SrcIPaddresses': len(node_stats['SrcIPaddress']), 'SrcIPaddresses': len(node_stats['SrcIPaddress']),
'DstIPaddresses': len(node_stats['DstIPaddress'])} 'DstIPaddresses': len(node_stats['DstIPaddress'])}
# handle command line argument (type selection) # handle command line argument (type selection)
if len(sys.argv) > 1 and 'json' in sys.argv: if len(sys.argv) > 1 and 'json' in sys.argv:
......
...@@ -31,6 +31,7 @@ import os ...@@ -31,6 +31,7 @@ import os
import datetime import datetime
import sqlite3 import sqlite3
class BaseFlowAggregator(object): class BaseFlowAggregator(object):
# target location ('/var/netflow/<store>.sqlite') # target location ('/var/netflow/<store>.sqlite')
target_filename = None target_filename = None
...@@ -48,20 +49,17 @@ class BaseFlowAggregator(object): ...@@ -48,20 +49,17 @@ class BaseFlowAggregator(object):
self._known_targets = list() self._known_targets = list()
# construct update and insert sql statements # construct update and insert sql statements
tmp = 'update %s set octets = octets + :octets_consumed, packets = packets + :packets_consumed ' tmp = 'update %s set octets = octets + :octets_consumed, packets = packets + :packets_consumed '
tmp = tmp + 'where mtime = :mtime and %s ' tmp += 'where mtime = :mtime and %s '
self._update_stmt = tmp % (self._target_table_name, self._update_stmt = tmp % (self._target_table_name,
' and '.join(map(lambda x: '%s = :%s'%(x,x), ' and '.join(map(lambda x: '%s = :%s' % (x, x), self.agg_fields)))
self.agg_fields)))
tmp = 'insert into %s (mtime, octets, packets, %s) values (:mtime, :octets_consumed, :packets_consumed, %s)' tmp = 'insert into %s (mtime, octets, packets, %s) values (:mtime, :octets_consumed, :packets_consumed, %s)'
self._insert_stmt = tmp % (self._target_table_name, self._insert_stmt = tmp % (self._target_table_name,
','.join(self.agg_fields), ','.join(self.agg_fields),
','.join(map(lambda x: ':%s'%(x), ','.join(map(lambda x: ':%s' % x, self.agg_fields)))
self.agg_fields)))
# open database # open database
self._open_db() self._open_db()
self._fetch_known_targets() self._fetch_known_targets()
def _fetch_known_targets(self): def _fetch_known_targets(self):
""" read known target table names from the sqlite db """ read known target table names from the sqlite db
""" """
...@@ -79,7 +77,7 @@ class BaseFlowAggregator(object): ...@@ -79,7 +77,7 @@ class BaseFlowAggregator(object):
if self._db_connection is not None: if self._db_connection is not None:
# construct new aggregate table # construct new aggregate table
sql_text = list() sql_text = list()
sql_text.append('create table %s ( ' % self._target_table_name ) sql_text.append('create table %s ( ' % self._target_table_name)
sql_text.append(' mtime timestamp') sql_text.append(' mtime timestamp')
for agg_field in self.agg_fields: for agg_field in self.agg_fields:
sql_text.append(', %s varchar(255)' % agg_field) sql_text.append(', %s varchar(255)' % agg_field)
...@@ -122,27 +120,28 @@ class BaseFlowAggregator(object): ...@@ -122,27 +120,28 @@ class BaseFlowAggregator(object):
def add(self, flow): def add(self, flow):
""" calculate timeslices per flow depending on sample resolution """ calculate timeslices per flow depending on sample resolution
:param flow: flow data (from parse.py)
""" """
# make sure target exists # make sure target exists
if self._target_table_name not in self._known_targets: if self._target_table_name not in self._known_targets:
self._create_target_table() self._create_target_table()
# push record(s) depending on resolution # push record(s) depending on resolution
start_time = int(flow['flow_start'] / self.resolution)*self.resolution start_time = int(flow['flow_start'] / self.resolution) * self.resolution
while start_time <= flow['flow_end']: while start_time <= flow['flow_end']:
consume_start_time = max(flow['flow_start'], start_time) consume_start_time = max(flow['flow_start'], start_time)
consume_end_time = min(start_time + self.resolution, flow['flow_end']) consume_end_time = min(start_time + self.resolution, flow['flow_end'])
if flow['duration_ms'] != 0: if flow['duration_ms'] != 0:
consume_perc = (consume_end_time - consume_start_time) / float(flow['duration_ms']/1000) consume_perc = (consume_end_time - consume_start_time) / float(flow['duration_ms'] / 1000)
else: else:
consume_perc = 1 consume_perc = 1
if self.is_db_open(): if self.is_db_open():
# upsert data # upsert data
flow['octets_consumed'] = consume_perc*flow['octets'] flow['octets_consumed'] = consume_perc * flow['octets']
flow['packets_consumed'] = consume_perc*flow['packets'] flow['packets_consumed'] = consume_perc * flow['packets']
flow['mtime'] = datetime.datetime.utcfromtimestamp(start_time) flow['mtime'] = datetime.datetime.utcfromtimestamp(start_time)
result = self._update_cur.execute(self._update_stmt, flow) self._update_cur.execute(self._update_stmt, flow)
if self._update_cur.rowcount == 0: if self._update_cur.rowcount == 0:
result = self._update_cur.execute(self._insert_stmt, flow) self._update_cur.execute(self._insert_stmt, flow)
# next start time # next start time
start_time += self.resolution start_time += self.resolution
...@@ -46,7 +46,8 @@ PARSE_FLOW_FIELDS = [ ...@@ -46,7 +46,8 @@ PARSE_FLOW_FIELDS = [
{'check': flowd.FIELD_FLOW_TIMES, 'target': 'netflow_ver'}] {'check': flowd.FIELD_FLOW_TIMES, 'target': 'netflow_ver'}]
# location of flowd logfiles to use # location of flowd logfiles to use
FLOWD_LOG_FILES='/var/log/flowd.log*' FLOWD_LOG_FILES = '/var/log/flowd.log*'
def parse_flow(recv_stamp): def parse_flow(recv_stamp):
""" parse flowd logs and yield records (dict type) """ parse flowd logs and yield records (dict type)
...@@ -63,7 +64,7 @@ def parse_flow(recv_stamp): ...@@ -63,7 +64,7 @@ def parse_flow(recv_stamp):
flow_record = dict() flow_record = dict()
if flow.has_field(flowd.FIELD_RECV_TIME): if flow.has_field(flowd.FIELD_RECV_TIME):
# receive timestamp # receive timestamp
flow_record['recv'] = flow.recv_sec + flow.recv_usec/1000.0 flow_record['recv'] = flow.recv_sec + flow.recv_usec / 1000.0
if flow_record['recv'] <= recv_stamp: if flow_record['recv'] <= recv_stamp:
# do not parse next flow archive (oldest reached) # do not parse next flow archive (oldest reached)
parse_done = True parse_done = True
...@@ -72,7 +73,7 @@ def parse_flow(recv_stamp): ...@@ -72,7 +73,7 @@ def parse_flow(recv_stamp):
# calculate flow start, end, duration in ms # calculate flow start, end, duration in ms
flow_record['flow_end'] = (flow.recv_sec - flow.flow_finish / 1000.0) flow_record['flow_end'] = (flow.recv_sec - flow.flow_finish / 1000.0)
flow_record['duration_ms'] = (flow.flow_finish - flow.flow_start) flow_record['duration_ms'] = (flow.flow_finish - flow.flow_start)
flow_record['flow_start'] = flow_record['flow_end'] - flow_record['duration_ms']/1000.0 flow_record['flow_start'] = flow_record['flow_end'] - flow_record['duration_ms'] / 1000.0
# handle source data # handle source data
for flow_field in PARSE_FLOW_FIELDS: for flow_field in PARSE_FLOW_FIELDS:
if flow.has_field(flow_field['check']): if flow.has_field(flow_field['check']):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment