/** * $RCSfile$ * $Revision$ * $Date$ * * Copyright (C) 2004 Jive Software. All rights reserved. * * This software is published under the terms of the GNU Public License (GPL), * a copy of which is included in this distribution. */ package org.jivesoftware.messenger.net; import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.Log; import org.jivesoftware.messenger.*; import org.jivesoftware.messenger.audit.Auditor; import org.jivesoftware.messenger.auth.UnauthorizedException; import java.io.EOFException; import java.io.InputStreamReader; import java.net.Socket; import javax.xml.stream.*; /** * @author Iain Shigeoka */ public class SocketReadThread extends Thread { private Socket sock; private XMLInputFactory xppFactory; private XMLStreamReader xpp; private Session session; private Connection connection; private static final String ETHERX_NAMESPACE = "http://etherx.jabber.org/streams"; private String serverName; /** * Router used to route incoming packets to the correct channels. */ private PacketRouter router; /** * Audits incoming data */ private Auditor auditor; private PacketFactory packetFactory; private boolean clearSignout = false; /** * Create dedicated read thread for this socket. * * @param router The router for sending packets that were read * @param serverName The name of the server this socket is working for * @param auditor The audit manager that will audit incoming packets * @param sock The socket to read from * @param session The session being read */ public SocketReadThread(PacketRouter router, PacketFactory packetFactory, String serverName, Auditor auditor, Socket sock, Session session) { super("SRT reader"); this.serverName = serverName; this.router = router; this.packetFactory = packetFactory; this.auditor = auditor; this.session = session; connection = session.getConnection(); this.sock = sock; xppFactory = XMLInputFactory.newInstance(); xppFactory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.TRUE); } /** * A dedicated thread loop for reading the stream and sending incoming * packets to the appropriate router. */ public void run() { try { xpp = xppFactory.createXMLStreamReader(new InputStreamReader(sock.getInputStream())); // TODO: need to force to UTF-8 ? // Read in the opening tag and prepare for packet stream createSession(); // Read the packet stream until it ends if (session != null) { readStream(); } } catch (EOFException eof) { // Normal disconnect } catch (XMLStreamException ie) { // Check if the user abruptly cut the connection without sending previously an // unavailable presence if (clearSignout == false) { if (session != null && session.getStatus() == Session.STATUS_AUTHENTICATED) { Presence presence = session.getPresence(); if (presence != null) { // Simulate an unavailable presence sent by the user. Presence packet = (Presence) presence.createDeepCopy(); packet.setType(Presence.UNAVAILABLE); try { packet.setAvailable(false); packet.setVisible(false); } catch (UnauthorizedException e) {} packet.setOriginatingSession(session); packet.setSender(session.getAddress()); packet.setSending(false); router.route(packet); clearSignout = true; } } } // It is normal for clients to abruptly cut a connection // rather than closing the stream document // Since this is normal behavior, we won't log it as an error // Log.error(LocaleUtils.getLocalizedString("admin.disconnect"),ie); } catch (Exception e) { if (session != null) { Log.warn(LocaleUtils.getLocalizedString("admin.error.stream"), e); } } finally { if (session != null) { //Log.info("Logging off " + session.getAddress() + " on " + connection); try { // Allow everything to settle down after a disconnect // e.g. presence updates to avoid sending double // presence unavailable's sleep(3000); session.getConnection().close(); } catch (Exception e) { Log.warn(LocaleUtils.getLocalizedString("admin.error.connection") + "\n" + sock.toString()); } } else { Log.error(LocaleUtils.getLocalizedString("admin.error.connection") + "\n" + sock.toString()); } } } /** * Read the incoming stream until it ends. Much of the reading * will actually be done in the channel handlers as they run the * XPP through the data. This method mostly handles the idle waiting * for incoming data. To prevent clients from stalling channel handlers, * a watch dog timer is used. Packets that take longer than the watch * dog limit to read will cause the session to be closed. * * @throws XMLStreamException if there is trouble reading from the socket */ private void readStream() throws UnauthorizedException, XMLStreamException { while (true) { for (int eventType = xpp.next(); eventType != XMLStreamConstants.START_ELEMENT; eventType = xpp.next()) { if (eventType == XMLStreamConstants.CHARACTERS) { if (!xpp.isWhiteSpace()) { throw new XMLStreamException(LocaleUtils.getLocalizedString("admin.error.packet.text")); } } else if (eventType == XMLStreamConstants.END_DOCUMENT) { return; } } String tag = xpp.getLocalName(); if ("message".equals(tag)) { Message packet = packetFactory.getMessage(xpp); packet.setOriginatingSession(session); packet.setSender(session.getAddress()); packet.setSending(false); auditor.audit(packet); router.route(packet); session.incrementClientPacketCount(); } else if ("presence".equals(tag)) { Presence packet = packetFactory.getPresence(xpp); packet.setOriginatingSession(session); packet.setSender(session.getAddress()); packet.setSending(false); auditor.audit(packet); router.route(packet); session.incrementClientPacketCount(); // Update the flag that indicates if the user made a clean sign out clearSignout = (Presence.UNAVAILABLE == packet.getType() ? true : false); } else if ("iq".equals(tag)) { IQ packet = packetFactory.getIQ(xpp); packet.setOriginatingSession(session); packet.setSender(session.getAddress()); packet.setSending(false); auditor.audit(packet); router.route(packet); session.incrementClientPacketCount(); } else { throw new XMLStreamException(LocaleUtils.getLocalizedString("admin.error.packet.tag") + tag); } } } /** * Uses the XPP to grab the opening stream tag and create * an active session object. In all cases, the method obtains the * opening stream tag, checks for errors, and either creates a session * or returns an error and kills the connection. If the connection * remains open, the XPP will be set to be ready for the first packet. * A call to next() should result in an START_TAG state with the first * packet in the stream. * * @throws UnauthorizedException If the caller did not have permission * to use this method. * @throws XMLStreamException If the stream is not valid XML */ private void createSession() throws UnauthorizedException, XMLStreamException { for (int eventType = xpp.getEventType(); eventType != XMLStreamConstants.START_ELEMENT; eventType = xpp.next()) { } // Conduct error checking, the opening tag should be 'stream' // in the 'etherx' namespace if (!xpp.getLocalName().equals("stream")) { throw new XMLStreamException(LocaleUtils.getLocalizedString("admin.error.bad-stream")); } if (!xpp.getNamespaceURI(xpp.getPrefix()).equals(ETHERX_NAMESPACE)) { throw new XMLStreamException(LocaleUtils.getLocalizedString("admin.error.bad-namespace")); } XMLStreamWriter xser = connection.getSerializer(); xser.writeStartDocument(); xser.writeStartElement("stream", "stream", "http://etherx.jabber.org/streams"); xser.writeNamespace("stream", "http://etherx.jabber.org/streams"); xser.writeDefaultNamespace("jabber:client"); xser.writeAttribute("from", serverName); xser.writeAttribute("id", session.getStreamID().toString()); xser.writeCharacters(" "); xser.flush(); // TODO: check for SASL support in opening stream tag } }