flowd_aggregate.py 6.65 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
#!/usr/local/bin/python2.7
"""
    Copyright (c) 2016 Ad Schellevis
    All rights reserved.

    Redistribution and use in source and binary forms, with or without
    modification, are permitted provided that the following conditions are met:

    1. Redistributions of source code must retain the above copyright notice,
     this list of conditions and the following disclaimer.

    2. Redistributions in binary form must reproduce the above copyright
     notice, this list of conditions and the following disclaimer in the
     documentation and/or other materials provided with the distribution.

    THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
    INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
    AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
    AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
    OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
    SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
    INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
    CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
    ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
    POSSIBILITY OF SUCH DAMAGE.

    --------------------------------------------------------------------------------------
    Aggregate flowd data for reporting
"""
import time
import datetime
import os
import sys
34 35
import signal
import glob
36 37 38
import copy
import syslog
import traceback
39
sys.path.insert(0, "/usr/local/opnsense/site-python")
40
from lib.parse import parse_flow
41
from lib.aggregate import AggMetadata
42
import lib.aggregates
43
from daemonize import Daemonize
44

45 46 47 48 49

MAX_FILE_SIZE_MB=10
MAX_LOGS=10


50
def aggregate_flowd(do_vacuum=False):
51
    """ aggregate collected flowd data
52
    :param do_vacuum: vacuum database after cleanup
53 54 55 56 57 58 59 60 61 62 63 64
    :return: None
    """
    # init metadata (progress maintenance)
    metadata = AggMetadata()

    # register aggregate classes to stream data to
    stream_agg_objects = list()
    for agg_class in lib.aggregates.get_aggregators():
        for resolution in agg_class.resolutions():
            stream_agg_objects.append(agg_class(resolution))

    # parse flow data and stream to registered consumers
65 66
    prev_recv = metadata.last_sync()
    commit_record_count = 0
67
    for flow_record in parse_flow(prev_recv):
68
        if flow_record is None or (prev_recv != flow_record['recv'] and commit_record_count > 100000):
69 70 71 72 73 74 75
            # commit data on receive timestamp change or last record
            for stream_agg_object in stream_agg_objects:
                stream_agg_object.commit()
            metadata.update_sync_time(prev_recv)
        if flow_record is not None:
            # send to aggregator
            for stream_agg_object in stream_agg_objects:
76 77
                # class add() may change the flow contents for processing, its better to isolate
                # paremeters here.
78
                flow_record_cpy = copy.copy(flow_record)
79
                stream_agg_object.add(flow_record_cpy)
80
            commit_record_count += 1
81 82 83 84
            prev_recv = flow_record['recv']

    # expire old data
    for stream_agg_object in stream_agg_objects:
85
        stream_agg_object.cleanup(do_vacuum)
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
        del stream_agg_object
    del metadata


def check_rotate():
    """ Checks if flowd log needs to be rotated, if so perform rotate.
        We keep [MAX_LOGS] number of logs containing approx. [MAX_FILE_SIZE_MB] data, the flowd data probably contains
        more detailed data then the stored aggregates.
    :return: None
    """
    if os.path.getsize("/var/log/flowd.log")/1024/1024 > MAX_FILE_SIZE_MB:
        # max filesize reached rotate
        filenames = sorted(glob.glob('/var/log/flowd.log.*'), reverse=True)
        file_sequence = len(filenames)
        for filename in filenames:
            sequence = filename.split('.')[-1]
            if sequence.isdigit():
                if file_sequence >= MAX_LOGS:
                    os.remove(filename)
                elif int(sequence) != 0:
                    os.rename(filename, filename.replace('.%s' % sequence, '.%06d' % (int(sequence)+1)))
            file_sequence -= 1
        # rename /var/log/flowd.log
        os.rename('/var/log/flowd.log', '/var/log/flowd.log.000001')
        # signal flowd for new log file
        if os.path.isfile('/var/run/flowd.pid'):
            pid = open('/var/run/flowd.pid').read().strip()
            if pid.isdigit():
                try:
                    os.kill(int(pid), signal.SIGUSR1)
                except OSError:
                    pass


class Main(object):
    def __init__(self):
        """ construct, hook signal handler and run aggregators
        :return: None
        """
        self.running = True
        signal.signal(signal.SIGTERM, self.signal_handler)
        self.run()

    def run(self):
        """ run, endless loop, until sigterm is received
        :return: None
        """
133 134
        vacuum_interval = (60*60*8) # 8 hour vacuum cycle
        vacuum_countdown = None
135
        while self.running:
136 137 138 139 140 141
            # should we perform a vacuum
            if not vacuum_countdown or vacuum_countdown < time.time():
                vacuum_countdown = time.time() + vacuum_interval
                do_vacuum = True
            else:
                do_vacuum = False
142

143
            # run aggregate
144 145 146 147 148
            try:
                aggregate_flowd(do_vacuum)
            except:
                syslog.syslog(syslog.LOG_ERR, 'flowd aggregate died with message %s' % (traceback.format_exc()))
                return
149 150 151
            # rotate if needed
            check_rotate()
            # wait for next pass, exit on sigterm
152
            for i in range(30):
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
                if self.running:
                    time.sleep(0.5)
                else:
                    break

    def signal_handler(self, sig, frame):
        """ end (run) loop on signal
        :param sig: signal
        :pram frame: frame
        :return: None
        """
        self.running = False


if len(sys.argv) > 1 and 'console' in sys.argv[1:]:
    # command line start
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
    if 'profile' in sys.argv[1:]:
        # start with profiling
        import cProfile
        import StringIO
        import pstats

        pr = cProfile.Profile(builtins=False)
        pr.enable()
        Main()
        pr.disable()
        s = StringIO.StringIO()
        sortby = 'cumulative'
        ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
        ps.print_stats()
        print s.getvalue()
    else:
        Main()
186 187 188 189
else:
    # Daemonize flowd aggregator
    daemon = Daemonize(app="flowd_aggregate", pid='/var/run/flowd_aggregate.pid', action=Main)
    daemon.start()