/**
 * $RCSfile$
 * $Revision: $
 * $Date: $
 *
 * Copyright (C) 2006 Jive Software. All rights reserved.
 *
 * This software is published under the terms of the GNU Public License (GPL),
 * a copy of which is included in this distribution.
 */

package org.jivesoftware.wildfire.net;

import com.jcraft.jzlib.JZlib;
import com.jcraft.jzlib.ZInputStream;
import org.dom4j.Element;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.SessionManager;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.SocketChannel;

/**
 * Process incoming packets using a non-blocking model.
 *
 * @author Daniele Piras
 */
class NonBlockingReadingMode extends SocketReadingMode {

    // DANIELE: Socket read timeout in milliseconds
    private static int READ_TIMEOUT = 0;

    private static String STREAM_START = "<stream:stream";

    // DANIELE: Semaphore to avoid concurrent reading operation from different thread
    private boolean isReading;

    // DANIELE: lightweight xml parser.
    private XMLLightweightParser xmlLightWeightParser;

    // DANIELE: Channel for socket connection
    private SocketChannel socketChannel;

    // DANIELE: Indicate if the reading operations has been scheduled into the executor.
    // this is very important because if all reading thread are busy is used to avoid
    // to reinsert into the queue the reading operation.
    private boolean isScheduled = false;

    // DANIELE: Indicate if a session is already created
    private boolean sessionCreated = false;

    // DANIELE: Indicate if a stream:stream is arrived to complete a sals authentication
    private boolean awaytingSasl = false;

    // DANIELE: Indicate if a stream:stream is arrived to complete compression
    private boolean awaitingForCompleteCompression = false;

    private StreamReader streamReader;

    public NonBlockingReadingMode(Socket socket, SocketReader socketReader) {
        super(socket, socketReader);
        // DANIELE: Initialization
        // Setting timeout for reading operations.
        try {
            socket.setSoTimeout(READ_TIMEOUT);
        }
        catch (SocketException e) {
            // There is an exception...
            Log.warn(e);
        }

        socketChannel = socket.getChannel();

        // Initialize XML light weight parser
        xmlLightWeightParser = new XMLLightweightParser(socketChannel, CHARSET);


        isReading = false;
        socketReader.open = true;

        streamReader = new StreamReader();
    }

    /* DANIELE:
     * Method that verify if the client has data in the channel and in this case
     * call an executor to perform reading operations.
     */
    void run() {
        try {
            // Check if the socket is open
            if (socketReader.open) {
                // Verify semaphore and if there are data into the socket.
                if (!isReading && !isScheduled) {
                    try {
                        // Semaphore to avoid concurrent schedule of the same read operation.
                        isScheduled = true;
                        // Schedule execution with executor
                        IOExecutor.execute(streamReader);
                    }
                    catch (Exception e) {
                        if (socketReader.session != null) {
                            Log.warn(LocaleUtils.getLocalizedString("admin.error.stream") +
                                    ". Session: " +
                                    socketReader.session, e);
                        }
                    }
                }
            }
        }
        catch (Exception e) {
            socketReader.shutdown();
            // There is an exception...
            Log.error(e);
        }
        if (!socketReader.open) {
            socketReader.shutdown();
        }
    }

    protected void tlsNegotiated() throws XmlPullParserException, IOException {
        XmlPullParser xpp = socketReader.reader.getXPPParser();
        InputStream is = socketReader.connection.getTLSStreamHandler().getInputStream();
        xpp.setInput(new InputStreamReader(is, CHARSET));
        xmlLightWeightParser.setInput( is, CHARSET );
        super.tlsNegotiated();
    }

    protected boolean compressClient(Element doc) throws IOException, XmlPullParserException {
        boolean answer = super.compressClient(doc);
        if (answer) {
            XmlPullParser xpp = socketReader.reader.getXPPParser();
            // Reset the parser since a new stream header has been sent from the client
            if (socketReader.connection.getTLSStreamHandler() == null) {
                InputStream is;
                if (socketChannel != null) {
                    // DANIELE: Create an inputstream using the utility class ChannelInputStream.
                    is = new ChannelInputStream(socketChannel);
                }
                else {
                    is = socket.getInputStream();
                }
                is = ServerTrafficCounter.wrapInputStream(is);

                ZInputStream in = new ZInputStream(is);
                in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
                xpp.setInput(new InputStreamReader(in, CHARSET));
            }
            else {
                ZInputStream in = new ZInputStream(
                        socketReader.connection.getTLSStreamHandler().getInputStream());
                in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
                xpp.setInput(new InputStreamReader(in, CHARSET));
                xmlLightWeightParser.setInput( in, CHARSET );
            }
        }
        return answer;
    }

    class StreamReader implements Runnable {

        /*
         * This method is invoked when client send data to the channel.
         *
         */
        public void run() {
            try {
                // If no other reading operations are perform
                if (!isReading) {
                    // Change the semaphore status
                    isReading = true;
                    // Call the XML light-wieght parser to read data...
                    xmlLightWeightParser.read();
                    // Check if the parser has found a complete message...
                    if (xmlLightWeightParser.areThereMsgs()) {
                        // Process every message found
                        String[] msgs = xmlLightWeightParser.getMsgs();
                        for (int i = 0; i < msgs.length; i++) {
                            //System.out.println( "Processing " + msgs[ i ] );
                            readStream(msgs[i]);
                        }
                    }
                }
            }
            catch (IOException e) {
                if (socketReader.session != null) {
                    // DANIELE: Remove session from SessionManager. I don't know if
                    // this is the easy way.
                    // TODO Review this. Closing the connection should be used???
                    SessionManager.getInstance().removeSession(
                            SessionManager.getInstance().getSession(
                                    socketReader.session.getAddress()));
                }
                try {
                    xmlLightWeightParser.getChannel().close();
                }
                catch (IOException e1) {
                }
                // System.out.println( "Client disconnecting" );
            }
            catch (Exception e) {
                if (socketReader.session != null) {
                    Log.warn(LocaleUtils.getLocalizedString("admin.error.stream") + ". Session: " +
                            socketReader.session, e);
                }
                e.printStackTrace();
            }
            finally {
                isReading = false;
                isScheduled = false;
            }
        }

        /**
         * Process a single message
         */
        private void readStream(String msg) throws Exception {

            if (msg.trim().startsWith(STREAM_START)) {
                // Found an stream:stream tag...
                if (!sessionCreated) {
                    sessionCreated = true;
                    socketReader.reader.getXPPParser().setInput(new StringReader(
                            msg + ((msg.indexOf("</stream:stream") == -1) ? "</stream:stream>" :
                                    "")));
                    socketReader.createSession();
                }
                else if (awaytingSasl) {
                    awaytingSasl = false;
                    saslSuccessful();
                }
                else if (awaitingForCompleteCompression) {
                    awaitingForCompleteCompression = false;
                    compressionSuccessful();
                }
                return;
            }

            // Create dom in base on the string.
            Element doc = socketReader.reader.parseDocument(msg).getRootElement();
            if (doc == null) {
                // No document found.
                return;
            }
            String tag = doc.getName();
            if ("starttls".equals(tag)) {
                // Negotiate TLS
                if (negotiateTLS()) {
                    tlsNegotiated();
                }
                else {
                    socketReader.open = false;
                    socketReader.session = null;
                }
            }
            else if ("auth".equals(tag)) {
                // User is trying to authenticate using SASL
                if (authenticateClient(doc)) {
                    // SASL authentication was successful so open a new stream and offer
                    // resource binding and session establishment (to client sessions only)
                    awaytingSasl = true;
                }
                else if (socketReader.connection.isClosed()) {
                    socketReader.open = false;
                    socketReader.session = null;
                }
            }
            else if ("compress".equals(tag))
            {
                // Client is trying to initiate compression
                if (compressClient(doc)) {
                    // Compression was successful so open a new stream and offer
                    // resource binding and session establishment (to client sessions only)
                    awaitingForCompleteCompression = true;
                }
            }
            else {
                socketReader.process(doc);
            }
        }
    }
}