package org.apache.mina.management;

import org.apache.mina.common.*;
import org.apache.mina.filter.executor.ExecutorFilter;

import java.util.Queue;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.net.SocketAddress;

/**
 * Collects statistics of an {@link org.apache.mina.common.IoService}. It's polling all the sessions of a given
 * IoService. It's attaching a {@link org.apache.mina.management.IoSessionStat} object to all the sessions polled
 * and filling the throughput values.
 *
 * Usage :
 * <pre>
 * IoService service = ...
 * MINAStatCollector collector = new MINAStatCollector( service );
 * collector.start();
 * </pre>
 *
 * By default the {@link org.apache.mina.management.MINAStatCollector} is polling the sessions every 5 seconds. You can
 * give a different polling time using a second constructor.<p>
 *
 * Note: This class is a spin-off from StatCollector present in
 * https://svn.apache.org/repos/asf/mina/branches/1.1/core/src/main/java/org/apache/mina/management.
 *
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev: 477648 $, $Date: 2006-11-21 04:33:38 -0800 (Tue, 21 Nov 2006) $
 */
public class MINAStatCollector {
    /**
     * The session attribute key for {@link org.apache.mina.management.IoSessionStat}.
     */
    public static final String KEY = MINAStatCollector.class.getName() + ".stat";


    /**
     * @noinspection StaticNonFinalField
     */
    private static volatile int nextId = 0;
    private final int id = nextId ++;

    private final IoService service;
    private Worker worker;
    private int pollingInterval = 5000;
    private Queue<IoSession> polledSessions;

    // resume of session stats, for simplifying acces to the statistics
    private AtomicLong totalProcessedSessions = new AtomicLong();
    private AtomicLong totalMsgWritten = new AtomicLong();
    private AtomicLong totalMsgRead = new AtomicLong();
    private AtomicLong totalBytesWritten = new AtomicLong();
    private AtomicLong totalBytesRead = new AtomicLong();
    private AtomicLong totalScheduledWrites = new AtomicLong();
    private AtomicLong totalQueuedEvents = new AtomicLong();

    private final IoServiceListener serviceListener = new IoServiceListener()
    {
        public void serviceActivated( IoService service, SocketAddress serviceAddress, IoHandler handler,
            IoServiceConfig config )
        {
        }

        public void serviceDeactivated( IoService service, SocketAddress serviceAddress, IoHandler handler,
            IoServiceConfig config )
        {
        }

        public void sessionCreated( IoSession session )
        {
            addSession( session );
        }

        public void sessionDestroyed( IoSession session )
        {
            removeSession( session );
        }
    };

    /**
     * Create a stat collector for the given service with a default polling time of 5 seconds.
     * @param service the IoService to inspect
     */
    public MINAStatCollector( IoService service )
    {
        this( service,5000 );
    }

    /**
     * create a stat collector for the given given service
     * @param service the IoService to inspect
     * @param pollingInterval milliseconds
     */
    public MINAStatCollector( IoService service, int pollingInterval )
    {
        this.service = service;
        this.pollingInterval = pollingInterval;
    }

    /**
     * Start collecting stats for the {@link org.apache.mina.common.IoSession} of the service.
     * New sessions or destroyed will be automaticly added or removed.
     */
    public void start()
    {
        synchronized (this)
        {
            if ( worker != null && worker.isAlive() )
                throw new RuntimeException( "Stat collecting already started" );

            // add all current sessions

            polledSessions = new ConcurrentLinkedQueue<IoSession>();

            for ( Iterator iter = service.getManagedServiceAddresses().iterator(); iter.hasNext(); )
            {
                SocketAddress element = ( SocketAddress ) iter.next();

                for ( Iterator iter2 = service.getManagedSessions( element ).iterator(); iter2.hasNext(); )
                {
                    addSession( ( IoSession ) iter2.next() );

                }
            }

            // listen for new ones
            service.addListener( serviceListener );

            // start polling
            worker = new Worker();
            worker.start();

        }

    }

    /**
     * Stop collecting stats. all the {@link org.apache.mina.management.IoSessionStat} object will be removed of the
     * polled session attachements.
     */
    public void stop()
    {
        synchronized (this)
        {
            service.removeListener( serviceListener );

            // stop worker
            worker.stop = true;
            worker.interrupt();
            while( worker.isAlive() )
            {
                try
                {
                    worker.join();
                }
                catch( InterruptedException e )
                {
                    //ignore since this is shutdown time
                }
            }

            for (IoSession session : polledSessions) {
                session.removeAttribute(KEY);
            }
            polledSessions.clear();
        }
    }

    /**
     * is the stat collector started and polling the {@link org.apache.mina.common.IoSession} of the {@link org.apache.mina.common.IoService}
     * @return true if started
     */
    public boolean isRunning()
    {
        synchronized (this)
        {
            return worker != null && worker.stop != true;
        }
    }

    private void addSession( IoSession session )
    {
        IoSessionStat sessionStats = new IoSessionStat();
        sessionStats.lastPollingTime = System.currentTimeMillis();
        session.setAttribute( KEY, sessionStats );
        totalProcessedSessions.incrementAndGet();
        polledSessions.add( session );
    }

    private void removeSession( IoSession session )
    {
        // remove the session from the list of polled sessions
        polledSessions.remove( session );

        // add the bytes processed between last polling and session closing
        // prevent non seen byte with non-connected protocols like HTTP and datagrams
        IoSessionStat sessStat = ( IoSessionStat ) session.getAttribute( KEY );
        session.removeAttribute( KEY );

        totalMsgWritten.addAndGet(session.getWrittenMessages() - sessStat.lastMessageWrite);
        totalMsgRead.addAndGet(session.getReadMessages() - sessStat.lastMessageRead);
        totalBytesWritten.addAndGet(session.getWrittenBytes() - sessStat.lastByteWrite);
        totalBytesRead.addAndGet(session.getReadBytes() - sessStat.lastByteRead);
    }


    /**
     * total number of sessions processed by the stat collector
     * @return number of sessions
     */
    public long getTotalProcessedSessions()
    {
    	return totalProcessedSessions.longValue();
    }

	public long getBytesRead()
	{
		return totalBytesRead.get();
	}

	public long getBytesWritten()
	{
		return totalBytesWritten.get();
	}

	public long getMsgRead()
	{
		return totalMsgRead.get();
	}

	public long getMsgWritten()
	{
		return totalMsgWritten.get();
	}

    public long getScheduledWrites() {
        return totalScheduledWrites.get();
    }

    public long getQueuedEvents() {
        return totalQueuedEvents.get();
    }

    public long getSessionCount()
	{
		return polledSessions.size();
	}

    private class Worker extends Thread
    {

        boolean stop = false;

        private Worker()
        {
            super( "StatCollectorWorker-"+id );
        }

        public void run()
        {
            while ( !stop )
            {
                // wait polling time
                try
                {
                    Thread.sleep( pollingInterval );
                }
                catch ( InterruptedException e )
                {
                }

                long tmpMsgWritten = 0l;
                long tmpMsgRead = 0l;
                long tmpBytesWritten = 0l;
                long tmpBytesRead = 0l;
                long tmpScheduledWrites = 0l;
                long tmpQueuevedEvents = 0l;

                for (IoSession session : polledSessions)
                {
                	// upadating individual session statistics
                    IoSessionStat sessStat = ( IoSessionStat ) session.getAttribute( KEY );

                    long currentTimestamp = System.currentTimeMillis();
                    // Calculate delta
                    float pollDelta = (currentTimestamp - sessStat.lastPollingTime) / 1000f;
                    // Store last polling time of this session
                    sessStat.lastPollingTime = currentTimestamp;

                    long readBytes = session.getReadBytes();
                    long writtenBytes = session.getWrittenBytes();
                    long readMessages = session.getReadMessages();
                    long writtenMessages = session.getWrittenMessages();
                    sessStat.byteReadThroughput = (readBytes - sessStat.lastByteRead) / pollDelta;
                    sessStat.byteWrittenThroughput = (writtenBytes - sessStat.lastByteWrite) / pollDelta;
                    sessStat.messageReadThroughput = (readMessages - sessStat.lastMessageRead) / pollDelta;
                    sessStat.messageWrittenThroughput = (writtenMessages - sessStat.lastMessageWrite) / pollDelta;

                    tmpMsgWritten += (writtenMessages - sessStat.lastMessageWrite);
                    tmpMsgRead += (readMessages - sessStat.lastMessageRead);
                    tmpBytesWritten += (writtenBytes - sessStat.lastByteWrite);
                    tmpBytesRead += (readBytes - sessStat.lastByteRead);
                    tmpScheduledWrites += session.getScheduledWriteRequests();

                    ExecutorFilter executorFilter =
                            (ExecutorFilter) session.getFilterChain().get(ExecutorThreadModel.class.getName());
                    if (executorFilter != null) {
                        tmpQueuevedEvents += executorFilter.getEventQueueSize(session);
                    }

                    sessStat.lastByteRead = readBytes;
                    sessStat.lastByteWrite = writtenBytes;
                    sessStat.lastMessageRead = readMessages;
                    sessStat.lastMessageWrite = writtenMessages;

                }

                totalMsgWritten.addAndGet(tmpMsgWritten);
                totalMsgRead.addAndGet(tmpMsgRead);
                totalBytesWritten.addAndGet(tmpBytesWritten);
                totalBytesRead.addAndGet(tmpBytesRead);
                totalScheduledWrites.set(tmpScheduledWrites);
                totalQueuedEvents.set(tmpQueuevedEvents);
            }
        }
    }
}