Commit 13562e90 authored by Ad Schellevis's avatar Ad Schellevis

(netflow) document parse_flow

parent 68d3a415
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
import flowd import flowd
import glob import glob
# define field
PARSE_FLOW_FIELDS = [ PARSE_FLOW_FIELDS = [
{'check': flowd.FIELD_OCTETS, 'target': 'octets'}, {'check': flowd.FIELD_OCTETS, 'target': 'octets'},
{'check': flowd.FIELD_PACKETS, 'target': 'packets'}, {'check': flowd.FIELD_PACKETS, 'target': 'packets'},
...@@ -44,30 +45,38 @@ PARSE_FLOW_FIELDS = [ ...@@ -44,30 +45,38 @@ PARSE_FLOW_FIELDS = [
{'check': flowd.FIELD_GATEWAY_ADDR, 'target': 'gateway_addr'}, {'check': flowd.FIELD_GATEWAY_ADDR, 'target': 'gateway_addr'},
{'check': flowd.FIELD_FLOW_TIMES, 'target': 'netflow_ver'}] {'check': flowd.FIELD_FLOW_TIMES, 'target': 'netflow_ver'}]
# location of flowd logfiles to use
FLOWD_LOG_FILES='/var/log/flowd.log*' FLOWD_LOG_FILES='/var/log/flowd.log*'
def parse_flow(recv_stamp): def parse_flow(recv_stamp):
""" parse flowd logs and yield records (dict type)
:param recv_stamp: last receive timestamp (recv)
:return: iterator flow details
"""
parse_done = False parse_done = False
for filename in sorted(glob.glob(FLOWD_LOG_FILES)): for filename in sorted(glob.glob(FLOWD_LOG_FILES)):
if parse_done: if parse_done:
# log file contains older data (recv_stamp), break
break break
flog = flowd.FlowLog(filename) flog = flowd.FlowLog(filename)
for flow in flog: for flow in flog:
flow_record = dict() flow_record = dict()
if flow.has_field(flowd.FIELD_RECV_TIME): if flow.has_field(flowd.FIELD_RECV_TIME):
# receive timestamp
flow_record['recv'] = flow.recv_sec + flow.recv_usec/1000.0 flow_record['recv'] = flow.recv_sec + flow.recv_usec/1000.0
if flow_record['recv'] <= recv_stamp: if flow_record['recv'] <= recv_stamp:
# do not parse next flow archive (oldest reached) # do not parse next flow archive (oldest reached)
parse_done = True parse_done = True
continue continue
if flow.has_field(flowd.FIELD_FLOW_TIMES): if flow.has_field(flowd.FIELD_FLOW_TIMES):
# calculate flow start, end, duration in ms
flow_record['flow_end'] = (flow.recv_sec - flow.flow_finish / 1000.0) flow_record['flow_end'] = (flow.recv_sec - flow.flow_finish / 1000.0)
flow_record['duration_ms'] = (flow.flow_finish - flow.flow_start) flow_record['duration_ms'] = (flow.flow_finish - flow.flow_start)
flow_record['flow_start'] = flow_record['flow_end'] - flow_record['duration_ms']/1000.0 flow_record['flow_start'] = flow_record['flow_end'] - flow_record['duration_ms']/1000.0
# handle source data
for flow_field in PARSE_FLOW_FIELDS: for flow_field in PARSE_FLOW_FIELDS:
if flow.has_field(flow_field['check']): if flow.has_field(flow_field['check']):
flow_record[flow_field['target']] = getattr(flow, flow_field['target']) flow_record[flow_field['target']] = getattr(flow, flow_field['target'])
else: else:
flow_record[flow_field['target']] = None flow_record[flow_field['target']] = None
yield flow_record yield flow_record
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment