/**
 * $RCSfile$
 * $Revision$
 * $Date$
 *
 * Copyright (C) 2004 Jive Software. All rights reserved.
 *
 * This software is published under the terms of the GNU Public License (GPL),
 * a copy of which is included in this distribution.
 */

package org.jivesoftware.messenger.net;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Writer;
import java.net.Socket;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.XPPPacketReader;
import org.jivesoftware.messenger.Connection;
import org.jivesoftware.messenger.PacketRouter;
import org.jivesoftware.messenger.Session;
import org.jivesoftware.messenger.audit.Auditor;
import org.jivesoftware.messenger.auth.UnauthorizedException;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory;
import org.xmpp.packet.IQ;
import org.xmpp.packet.Message;
import org.xmpp.packet.Presence;
import org.xmpp.packet.Roster;

/**
 * @author Derek DeMoro
 */
public class SocketReadThread extends Thread {

    private Socket sock;
    /**
     * The utf-8 charset for decoding and encoding Jabber packet streams.
     */
    private String charset = "UTF-8";

    private Session session;
    private Connection connection;

    private static final String ETHERX_NAMESPACE = "http://etherx.jabber.org/streams";

    private String serverName;
    /**
     * Router used to route incoming packets to the correct channels.
     */
    private PacketRouter router;

    /**
     * Audits incoming data
     */
    private Auditor auditor;

    private boolean clearSignout = false;
    XmlPullParserFactory factory = null;
    XPPPacketReader reader = null;


    /**
     * Create dedicated read thread for this socket.
     *
     * @param router     The router for sending packets that were read
     * @param serverName The name of the server this socket is working for
     * @param auditor    The audit manager that will audit incoming packets
     * @param sock       The socket to read from
     * @param session    The session being read
     */
    public SocketReadThread(PacketRouter router, String serverName, Auditor auditor, Socket sock,
                            Session session) {
        super("SRT reader");
        this.serverName = serverName;
        this.router = router;
        this.auditor = auditor;
        this.session = session;
        connection = session.getConnection();
        this.sock = sock;
    }

    /**
     * A dedicated thread loop for reading the stream and sending incoming
     * packets to the appropriate router.
     */
    public void run() {
        try {
            factory = XmlPullParserFactory.newInstance();
            // factory.setNamespaceAware(true);

            reader = new XPPPacketReader();
            reader.setXPPFactory(factory);


            reader.getXPPParser().setInput(new InputStreamReader(sock.getInputStream(),
                    charset));


            // Read in the opening tag and prepare for packet stream
            createSession();

            // Read the packet stream until it ends
            if (session != null) {
                readStream();
            }

        }
        catch (EOFException eof) {
            // Normal disconnect
        }
        catch (XmlPullParserException ie) {
            // Check if the user abruptly cut the connection without sending previously an
            // unavailable presence
            if (clearSignout == false) {
                if (session != null && session.getStatus() == Session.STATUS_AUTHENTICATED) {
                    Presence presence = session.getPresence();
                    if (presence != null) {
                        // Simulate an unavailable presence sent by the user.
                        Presence packet = presence.createCopy();
                        packet.setType(Presence.Type.unavailable);
                        packet.setFrom(session.getAddress());
                        router.route(packet);
                        clearSignout = true;
                    }
                }
            }
            // It is normal for clients to abruptly cut a connection
            // rather than closing the stream document
            // Since this is normal behavior, we won't log it as an error
//            Log.error(LocaleUtils.getLocalizedString("admin.disconnect"),ie);
        }
        catch (Exception e) {
            if (session != null) {
                Log.warn(LocaleUtils.getLocalizedString("admin.error.stream"), e);
            }
        }
        finally {
            if (session != null) {
                Log.debug("Logging off " + session.getAddress() + " on " + connection);
                try {
                    // Allow everything to settle down after a disconnect
                    // e.g. presence updates to avoid sending double
                    // presence unavailable's
                    sleep(3000);
                    session.getConnection().close();
                }
                catch (Exception e) {
                    Log.warn(LocaleUtils.getLocalizedString("admin.error.connection")
                            + "\n" + sock.toString());
                }
            }
            else {
                Log.error(LocaleUtils.getLocalizedString("admin.error.connection")
                        + "\n" + sock.toString());
            }
        }
    }

    /**
     * Read the incoming stream until it ends. Much of the reading
     * will actually be done in the channel handlers as they run the
     * XPP through the data. This method mostly handles the idle waiting
     * for incoming data. To prevent clients from stalling channel handlers,
     * a watch dog timer is used. Packets that take longer than the watch
     * dog limit to read will cause the session to be closed.
     */
    private void readStream() throws Exception {
        while (true) {
            Document document = reader.parseDocument();

            if (document != null) {
                Element doc = document.getRootElement();

                String tag = doc.getName();
                if ("message".equals(tag)) {
                    Message packet = new Message(doc);
                    packet.setFrom(session.getAddress());
                    auditor.audit(packet);
                    router.route(packet);
                    session.incrementClientPacketCount();
                }
                else if ("presence".equals(tag)) {
                    Presence packet = new Presence(doc);
                    packet.setFrom(session.getAddress());
                    auditor.audit(packet);
                    router.route(packet);
                    session.incrementClientPacketCount();
                    // Update the flag that indicates if the user made a clean sign out
                    clearSignout = (Presence.Type.unavailable == packet.getType() ? true : false);
                }
                else if ("iq".equals(tag)) {
                    IQ packet = getIQ(doc);
                    packet.setFrom(session.getAddress());
                    auditor.audit(packet);
                    router.route(packet);
                    session.incrementClientPacketCount();
                }
                else {
                    throw new XmlPullParserException(LocaleUtils.getLocalizedString("admin.error.packet.tag") + tag);
                }
            }
        }
    }

    private IQ getIQ(Element doc) {
        Element query = doc.element("query");
        if (query != null && "jabber:iq:roster".equals(query.getNamespaceURI())) {
            return new Roster(doc);
        }
        else {
            return new IQ(doc);
        }
    }

    /**
     * Uses the XPP to grab the opening stream tag and create
     * an active session object. In all cases, the method obtains the
     * opening stream tag, checks for errors, and either creates a session
     * or returns an error and kills the connection. If the connection
     * remains open, the XPP will be set to be ready for the first packet.
     * A call to next() should result in an START_TAG state with the first
     * packet in the stream.
     *
     * @throws UnauthorizedException If the caller did not have permission
     *                               to use this method.
     */
    private void createSession() throws UnauthorizedException, XmlPullParserException, IOException, Exception {
        XmlPullParser xpp = reader.getXPPParser();
        for (int eventType = xpp.getEventType();
             eventType != XmlPullParser.START_TAG;
             eventType = xpp.next()) {
        }

        // Conduct error checking, the opening tag should be 'stream'
        // in the 'etherx' namespace
        if (!xpp.getName().equals("stream")) {
            throw new XmlPullParserException(LocaleUtils.getLocalizedString("admin.error.bad-stream"));
        }
        if (!xpp.getNamespace(xpp.getPrefix()).equals(ETHERX_NAMESPACE)) {
            throw new XmlPullParserException(LocaleUtils.getLocalizedString("admin.error.bad-namespace"));
        }

        Writer writer = connection.getWriter();
        String startPacket = "<?xml version='1.0' encoding='"+charset+"'?><stream:stream xmlns:stream=\"http://etherx.jabber.org/streams\" xmlns=\"jabber:client\" from=\""+serverName+"\" id=\""+session.getStreamID().toString()+"\">";
        writer.write(startPacket);
        writer.flush();
        // TODO: check for SASL support in opening stream tag
    }


}