Commit 9cc65c19 authored by Ad Schellevis's avatar Ad Schellevis

(netflow, flowd agg) lesser commits

parent aa863074
......@@ -62,9 +62,10 @@ def aggregate_flowd(do_vacuum=False):
stream_agg_objects.append(agg_class(resolution))
# parse flow data and stream to registered consumers
prev_recv=metadata.last_sync()
prev_recv = metadata.last_sync()
commit_record_count = 0
for flow_record in parse_flow(prev_recv):
if flow_record is None or prev_recv != flow_record['recv']:
if flow_record is None or (prev_recv != flow_record['recv'] and commit_record_count > 100000):
# commit data on receive timestamp change or last record
for stream_agg_object in stream_agg_objects:
stream_agg_object.commit()
......@@ -76,6 +77,7 @@ def aggregate_flowd(do_vacuum=False):
# paremeters here.
flow_record_cpy = copy.copy(flow_record)
stream_agg_object.add(flow_record_cpy)
commit_record_count += 1
prev_recv = flow_record['recv']
# expire old data
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment