Commit 4a3a693f authored by Ad Schellevis's avatar Ad Schellevis

(netflow) add rc script flowd_aggregate, finish initial flowd aggregator version

parent 9989097d
#!/bin/sh
#
# $FreeBSD$
#
# PROVIDE: flowd_aggregate
# REQUIRE: SERVERS
# KEYWORD: shutdown
#
. /etc/rc.subr
name=flowd_aggregate
rcvar=flowd_aggregate_enable
command=/usr/local/opnsense/scripts/netflow/flowd_aggregate.py
command_interpreter=/usr/local/bin/python2.7
pidfile="/var/run/${name}.pid"
load_rc_config $name
# Set defaults
: ${flowd_aggregate_enable:=NO}
stop_cmd=flowd_aggregate_stop
# kill configd
flowd_aggregate_stop()
{
if [ -z "$rc_pid" ]; then
[ -n "$rc_fast" ] && return 0
_run_rc_notrunning
return 1
fi
echo -n "Stopping ${name}."
# first ask gently to exit
kill -15 ${rc_pid}
# wait max 5 seconds for gentle exit
for i in $(seq 1 50);
do
if [ -z "`/bin/ps -ax | /usr/bin/awk '{print $1;}' | /usr/bin/grep "^${rc_pid}"`" ]; then
break
fi
sleep 0.1
done
# kill any remaining configd processes (if still running)
for flowd_aggregate_pid in `/bin/ps -ax | grep 'flowd_aggregate.py' | /usr/bin/awk '{print $1;}' `
do
kill -9 $flowd_aggregate_pid >/dev/null 2>&1
done
echo "..done"
}
run_rc_command $1
......@@ -31,23 +31,36 @@ import time
import datetime
import os
import sys
import signal
import glob
sys.path.insert(0, "/usr/local/opnsense/site-python")
from lib.parse import parse_flow
from lib.aggregate import BaseFlowAggregator, AggMetadata
from lib.aggregate import AggMetadata
import lib.aggregates
from daemonize import Daemonize
# init metadata (progress maintenance)
metadata = AggMetadata()
# register aggregate classes to stream data to
stream_agg_objects = list()
resolutions = [60, 60*5]
for agg_class in lib.aggregates.get_aggregators():
for resolution in resolutions:
MAX_FILE_SIZE_MB=10
MAX_LOGS=10
def aggregate_flowd():
""" aggregate collected flowd data
:return: None
"""
# init metadata (progress maintenance)
metadata = AggMetadata()
# register aggregate classes to stream data to
stream_agg_objects = list()
resolutions = [60, 60*5]
for agg_class in lib.aggregates.get_aggregators():
for resolution in agg_class.resolutions():
stream_agg_objects.append(agg_class(resolution))
# parse flow data and stream to registered consumers
prev_recv=metadata.last_sync()
for flow_record in parse_flow(prev_recv):
# parse flow data and stream to registered consumers
prev_recv=metadata.last_sync()
for flow_record in parse_flow(prev_recv):
if flow_record is None or prev_recv != flow_record['recv']:
# commit data on receive timestamp change or last record
for stream_agg_object in stream_agg_objects:
......@@ -58,3 +71,82 @@ for flow_record in parse_flow(prev_recv):
for stream_agg_object in stream_agg_objects:
stream_agg_object.add(flow_record)
prev_recv = flow_record['recv']
# expire old data
for stream_agg_object in stream_agg_objects:
stream_agg_object.cleanup()
del stream_agg_object
del metadata
def check_rotate():
""" Checks if flowd log needs to be rotated, if so perform rotate.
We keep [MAX_LOGS] number of logs containing approx. [MAX_FILE_SIZE_MB] data, the flowd data probably contains
more detailed data then the stored aggregates.
:return: None
"""
if os.path.getsize("/var/log/flowd.log")/1024/1024 > MAX_FILE_SIZE_MB:
# max filesize reached rotate
filenames = sorted(glob.glob('/var/log/flowd.log.*'), reverse=True)
file_sequence = len(filenames)
for filename in filenames:
sequence = filename.split('.')[-1]
if sequence.isdigit():
if file_sequence >= MAX_LOGS:
os.remove(filename)
elif int(sequence) != 0:
os.rename(filename, filename.replace('.%s' % sequence, '.%06d' % (int(sequence)+1)))
file_sequence -= 1
# rename /var/log/flowd.log
os.rename('/var/log/flowd.log', '/var/log/flowd.log.000001')
# signal flowd for new log file
if os.path.isfile('/var/run/flowd.pid'):
pid = open('/var/run/flowd.pid').read().strip()
if pid.isdigit():
try:
os.kill(int(pid), signal.SIGUSR1)
except OSError:
pass
class Main(object):
def __init__(self):
""" construct, hook signal handler and run aggregators
:return: None
"""
self.running = True
signal.signal(signal.SIGTERM, self.signal_handler)
self.run()
def run(self):
""" run, endless loop, until sigterm is received
:return: None
"""
while self.running:
# run aggregate
aggregate_flowd()
# rotate if needed
check_rotate()
# wait for next pass, exit on sigterm
for i in range(120):
if self.running:
time.sleep(0.5)
else:
break
def signal_handler(self, sig, frame):
""" end (run) loop on signal
:param sig: signal
:pram frame: frame
:return: None
"""
self.running = False
if len(sys.argv) > 1 and 'console' in sys.argv[1:]:
# command line start
Main()
else:
# Daemonize flowd aggregator
daemon = Daemonize(app="flowd_aggregate", pid='/var/run/flowd_aggregate.pid', action=Main)
daemon.start()
......@@ -49,6 +49,13 @@ class AggMetadata(object):
# cache known tables
self._update_known_tables()
def __del__(self):
""" close database on destruct
:return: None
"""
if self._db_connection is not None:
self._db_connection.close()
def _update_known_tables(self):
""" request known tables
"""
......@@ -123,6 +130,13 @@ class BaseFlowAggregator(object):
self._open_db()
self._fetch_known_targets()
def __del__(self):
""" close database on destruct
:return: None
"""
if self._db_connection is not None:
self._db_connection.close()
def _fetch_known_targets(self):
""" read known target table names from the sqlite db
:return: None
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment