/** * $RCSfile$ * $Revision$ * $Date$ * * Copyright (C) 2004 Jive Software. All rights reserved. * * This software is published under the terms of the GNU Public License (GPL), * a copy of which is included in this distribution. */ package org.jivesoftware.messenger.net; import org.dom4j.Element; import org.dom4j.io.XPPPacketReader; import org.jivesoftware.messenger.*; import org.jivesoftware.messenger.auth.UnauthorizedException; import org.jivesoftware.messenger.interceptor.InterceptorManager; import org.jivesoftware.messenger.interceptor.PacketRejectedException; import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.Log; import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserFactory; import org.xmpp.packet.*; import java.io.EOFException; import java.io.IOException; import java.io.InputStreamReader; import java.io.Writer; import java.net.Socket; import java.net.SocketException; /** * Reads XMPP XML from a socket. * * @author Derek DeMoro */ public class SocketReadThread extends Thread { /** * The utf-8 charset for decoding and encoding Jabber packet streams. */ private static String CHARSET = "UTF-8"; /** * Reuse the same factory for all the connections. */ private static XmlPullParserFactory factory = null; private Socket sock; private Session session; private Connection connection; private String serverName; /** * Router used to route incoming packets to the correct channels. */ private PacketRouter router; private boolean clearSignout = false; XPPPacketReader reader = null; static { try { factory = XmlPullParserFactory.newInstance(); } catch (XmlPullParserException e) { Log.error("Error creating a parser factory", e); } } /** * Create dedicated read thread for this socket. * * @param router The router for sending packets that were read * @param serverName The name of the server this socket is working for * @param sock The socket to read from * @param conn The connection being read */ public SocketReadThread(PacketRouter router, String serverName, Socket sock, Connection conn) { super("SRT reader"); this.serverName = serverName; this.router = router; this.connection = conn; this.sock = sock; } /** * A dedicated thread loop for reading the stream and sending incoming * packets to the appropriate router. */ public void run() { try { reader = new XPPPacketReader(); reader.setXPPFactory(factory); reader.getXPPParser().setInput(new InputStreamReader(sock.getInputStream(), CHARSET)); // Read in the opening tag and prepare for packet stream createSession(); // Read the packet stream until it ends if (session != null) { readStream(); } } catch (EOFException eof) { // Normal disconnect } catch (SocketException se) { // The socket was closed. The server may close the connection for several reasons (e.g. // user requested to remove his account). Do nothing here. } catch (XmlPullParserException ie) { // Check if the user abruptly cut the connection without sending previously an // unavailable presence if (clearSignout == false) { if (session != null && session.getStatus() == Session.STATUS_AUTHENTICATED) { if (session instanceof ClientSession) { Presence presence = ((ClientSession) session).getPresence(); if (presence != null) { // Simulate an unavailable presence sent by the user. Presence packet = presence.createCopy(); packet.setType(Presence.Type.unavailable); packet.setFrom(session.getAddress()); router.route(packet); clearSignout = true; } } } } // It is normal for clients to abruptly cut a connection // rather than closing the stream document // Since this is normal behavior, we won't log it as an error // Log.error(LocaleUtils.getLocalizedString("admin.disconnect"),ie); } catch (Exception e) { if (session != null) { Log.warn(LocaleUtils.getLocalizedString("admin.error.stream"), e); } } finally { if (session != null) { Log.debug("Logging off " + session.getAddress() + " on " + connection); try { // Allow everything to settle down after a disconnect // e.g. presence updates to avoid sending double // presence unavailable's sleep(3000); session.getConnection().close(); } catch (Exception e) { Log.warn(LocaleUtils.getLocalizedString("admin.error.connection") + "\n" + sock.toString()); } } else { Log.error(LocaleUtils.getLocalizedString("admin.error.connection") + "\n" + sock.toString()); } } } /** * Read the incoming stream until it ends. Much of the reading * will actually be done in the channel handlers as they run the * XPP through the data. This method mostly handles the idle waiting * for incoming data. To prevent clients from stalling channel handlers, * a watch dog timer is used. Packets that take longer than the watch * dog limit to read will cause the session to be closed. */ private void readStream() throws Exception { while (true) { Element doc = reader.parseDocument().getRootElement(); if (doc == null) { // Stop reading the stream since the client has sent an end of stream element and // probably closed the connection return; } String tag = doc.getName(); if ("message".equals(tag)) { Message packet = null; try { packet = new Message(doc); } catch(IllegalArgumentException e) { // The original packet contains a malformed JID so answer an error Message reply = new Message(); reply.setID(doc.attributeValue("id")); reply.setTo(session.getAddress()); reply.getElement().addAttribute("from", doc.attributeValue("to")); reply.setError(PacketError.Condition.jid_malformed); session.process(reply); continue; } packet.setFrom(session.getAddress()); try { // Invoke the interceptors before we process the read packet InterceptorManager.getInstance().invokeInterceptors(packet, session, true, false); router.route(packet); // Invoke the interceptors after we have processed the read packet InterceptorManager.getInstance().invokeInterceptors(packet, session, true, true); session.incrementClientPacketCount(); } catch (PacketRejectedException e) { // An interceptor rejected this packet so answer a not_allowed error Message reply = new Message(); reply.setID(packet.getID()); reply.setTo(session.getAddress()); reply.setFrom(packet.getTo()); reply.setError(PacketError.Condition.not_allowed); session.process(reply); } } else if ("presence".equals(tag)) { Presence packet = null; try { packet = new Presence(doc); } catch(IllegalArgumentException e) { // The original packet contains a malformed JID so answer an error Presence reply = new Presence(); reply.setID(doc.attributeValue("id")); reply.setTo(session.getAddress()); reply.getElement().addAttribute("from", doc.attributeValue("to")); reply.setError(PacketError.Condition.jid_malformed); session.process(reply); continue; } packet.setFrom(session.getAddress()); try { // Invoke the interceptors before we process the read packet InterceptorManager.getInstance().invokeInterceptors(packet, session, true, false); router.route(packet); // Invoke the interceptors after we have processed the read packet InterceptorManager.getInstance().invokeInterceptors(packet, session, true, true); session.incrementClientPacketCount(); // Update the flag that indicates if the user made a clean sign out clearSignout = (Presence.Type.unavailable == packet.getType() ? true : false); } catch (PacketRejectedException e) { // An interceptor rejected this packet so answer a not_allowed error Presence reply = new Presence(); reply.setID(packet.getID()); reply.setTo(session.getAddress()); reply.setFrom(packet.getTo()); reply.setError(PacketError.Condition.not_allowed); session.process(reply); } } else if ("iq".equals(tag)) { IQ packet = null; try { packet = getIQ(doc); } catch(IllegalArgumentException e) { // The original packet contains a malformed JID so answer an error IQ reply = new IQ(); if (!doc.elements().isEmpty()) { reply.setChildElement(((Element) doc.elements().get(0)).createCopy()); } reply.setID(doc.attributeValue("id")); reply.setTo(session.getAddress()); if (doc.attributeValue("to") != null) { reply.getElement().addAttribute("from", doc.attributeValue("to")); } reply.setError(PacketError.Condition.jid_malformed); session.process(reply); continue; } packet.setFrom(session.getAddress()); try { // Invoke the interceptors before we process the read packet InterceptorManager.getInstance().invokeInterceptors(packet, session, true, false); router.route(packet); // Invoke the interceptors after we have processed the read packet InterceptorManager.getInstance().invokeInterceptors(packet, session, true, true); session.incrementClientPacketCount(); } catch (PacketRejectedException e) { // An interceptor rejected this packet so answer a not_allowed error IQ reply = new IQ(); reply.setChildElement(packet.getChildElement().createCopy()); reply.setID(packet.getID()); reply.setTo(session.getAddress()); reply.setFrom(packet.getTo()); reply.setError(PacketError.Condition.not_allowed); session.process(reply); } } else { throw new XmlPullParserException(LocaleUtils.getLocalizedString("admin.error.packet.tag") + tag); } } } private IQ getIQ(Element doc) { Element query = doc.element("query"); if (query != null && "jabber:iq:roster".equals(query.getNamespaceURI())) { return new Roster(doc); } else { return new IQ(doc); } } /** * Uses the XPP to grab the opening stream tag and create an active session * object. The session to create will depend on the sent namespace. In all * cases, the method obtains the opening stream tag, checks for errors, and * either creates a session or returns an error and kills the connection. * If the connection remains open, the XPP will be set to be ready for the * first packet. A call to next() should result in an START_TAG state with * the first packet in the stream. */ private void createSession() throws UnauthorizedException, XmlPullParserException, IOException { XmlPullParser xpp = reader.getXPPParser(); for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) { eventType = xpp.next(); } // Create the correct session based on the sent namespace if ("jabber:client".equals(xpp.getNamespace(null))) { // The connected client is a regular client so create a ClientSession session = ClientSession.createSession(serverName, reader, connection); } else if ("jabber:component:accept".equals(xpp.getNamespace(null))) { // The connected client is a component so create a ComponentSession session = ComponentSession.createSession(serverName, reader, connection); } else { Writer writer = connection.getWriter(); StringBuilder sb = new StringBuilder(); sb.append("<?xml version='1.0' encoding='"); sb.append(CHARSET); sb.append("'?>"); // Include the bad-namespace-prefix in the response sb.append("<stream:error>"); sb.append("<bad-namespace-prefix xmlns=\"urn:ietf:params:xml:ns:xmpp-streams\"/>"); sb.append("</stream:error>"); sb.append("</stream:stream>"); writer.write(sb.toString()); writer.flush(); // Close the underlying connection connection.close(); } } }