SocketReader.java 20.6 KB
Newer Older
1
/**
2
 * Copyright (C) 2005-2008 Jive Software. All rights reserved.
3
 *
4 5 6 7 8 9 10 11 12 13 14
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
15 16
 */

17
package org.jivesoftware.openfire.net;
18

19 20 21
import java.io.IOException;
import java.net.Socket;

22 23
import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader;
24 25 26
import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.PacketRouter;
import org.jivesoftware.openfire.RoutingTable;
27
import org.jivesoftware.openfire.StreamIDFactory;
28
import org.jivesoftware.openfire.auth.UnauthorizedException;
29
import org.jivesoftware.openfire.session.LocalSession;
30
import org.jivesoftware.openfire.session.Session;
31
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
Gaston Dombiak's avatar
Gaston Dombiak committed
32
import org.jivesoftware.util.LocaleUtils;
33 34
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
35 36 37
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory;
38 39 40 41 42 43 44
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence;
import org.xmpp.packet.Roster;
import org.xmpp.packet.StreamError;
45 46 47 48 49 50 51 52 53

/**
 * A SocketReader creates the appropriate {@link Session} based on the defined namespace in the
 * stream element and will then keep reading and routing the received packets.
 *
 * @author Gaston Dombiak
 */
public abstract class SocketReader implements Runnable {

54 55
	private static final Logger Log = LoggerFactory.getLogger(SocketReader.class);

56 57 58 59
    /**
     * The utf-8 charset for decoding and encoding Jabber packet streams.
     */
    private static String CHARSET = "UTF-8";
60 61 62 63 64 65

    /**
     * A factory that generates random stream IDs
     */
    private static final StreamIDFactory STREAM_ID_FACTORY = new BasicStreamIDFactory();

66 67 68 69 70
    /**
     * Reuse the same factory for all the connections.
     */
    private static XmlPullParserFactory factory = null;

Gaston Dombiak's avatar
Gaston Dombiak committed
71 72 73
    /**
     * Session associated with the socket reader.
     */
74
    protected LocalSession session;
Gaston Dombiak's avatar
Gaston Dombiak committed
75 76 77
    /**
     * Reference to the physical connection.
     */
78
    protected SocketConnection connection;
Gaston Dombiak's avatar
Gaston Dombiak committed
79 80 81
    /**
     * Server name for which we are attending clients.
     */
82 83 84 85 86 87
    protected String serverName;

    /**
     * Router used to route incoming packets to the correct channels.
     */
    private PacketRouter router;
Gaston Dombiak's avatar
Gaston Dombiak committed
88 89 90 91 92 93 94
    /**
     * Routing table used for checking whether a domain is known or not.
     */
    private RoutingTable routingTable;
    /**
     * Specifies whether the socket is using blocking or non-blocking connections.
     */
95
    private SocketReadingMode readingMode;
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
    XMPPPacketReader reader = null;
    protected boolean open;

    static {
        try {
            factory = XmlPullParserFactory.newInstance(MXParser.class.getName(), null);
        }
        catch (XmlPullParserException e) {
            Log.error("Error creating a parser factory", e);
        }
    }

    /**
     * Creates a dedicated reader for a socket.
     *
     * @param router the router for sending packets that were read.
Gaston Dombiak's avatar
Gaston Dombiak committed
112
     * @param routingTable the table that keeps routes to registered services.
113 114 115
     * @param serverName the name of the server this socket is working for.
     * @param socket the socket to read from.
     * @param connection the connection being read.
116
     * @param useBlockingMode true means that the server will use a thread per connection.
117
     */
Gaston Dombiak's avatar
Gaston Dombiak committed
118 119
    public SocketReader(PacketRouter router, RoutingTable routingTable, String serverName,
            Socket socket, SocketConnection connection, boolean useBlockingMode) {
120 121
        this.serverName = serverName;
        this.router = router;
Gaston Dombiak's avatar
Gaston Dombiak committed
122
        this.routingTable = routingTable;
123 124 125
        this.connection = connection;

        connection.setSocketReader(this);
126 127 128 129 130 131

        // Reader is associated with a new XMPPPacketReader
        reader = new XMPPPacketReader();
        reader.setXPPFactory(factory);

        // Set the blocking reading mode to use
132
        readingMode = new BlockingReadingMode(socket, this);
133 134 135 136 137 138
    }

    /**
     * A dedicated thread loop for reading the stream and sending incoming
     * packets to the appropriate router.
     */
139
    @Override
140
    public void run() {
141 142
        readingMode.run();
    }
143

144 145 146 147
    protected void process(Element doc) throws Exception {
        if (doc == null) {
            return;
        }
148

149 150 151
        String tag = doc.getName();
        if ("message".equals(tag)) {
            Message packet;
152
            try {
153
                packet = new Message(doc);
154
            }
155
            catch(IllegalArgumentException e) {
156
                Log.debug("SocketReader: Rejecting packet. JID malformed", e);
157 158 159 160 161 162 163 164
                // The original packet contains a malformed JID so answer with an error.
                Message reply = new Message();
                reply.setID(doc.attributeValue("id"));
                reply.setTo(session.getAddress());
                reply.getElement().addAttribute("from", doc.attributeValue("to"));
                reply.setError(PacketError.Condition.jid_malformed);
                session.process(reply);
                return;
165
            }
166
            processMessage(packet);
167
        }
168 169 170 171
        else if ("presence".equals(tag)) {
            Presence packet;
            try {
                packet = new Presence(doc);
172
            }
173
            catch (IllegalArgumentException e) {
174
                Log.debug("SocketReader: Rejecting packet. JID malformed", e);
175 176 177 178 179 180 181
                // The original packet contains a malformed JID so answer an error
                Presence reply = new Presence();
                reply.setID(doc.attributeValue("id"));
                reply.setTo(session.getAddress());
                reply.getElement().addAttribute("from", doc.attributeValue("to"));
                reply.setError(PacketError.Condition.jid_malformed);
                session.process(reply);
182 183
                return;
            }
184 185 186
            // Check that the presence type is valid. If not then assume available type
            try {
                packet.getType();
187
            }
188
            catch (IllegalArgumentException e) {
189
                Log.debug("Invalid presence (type): " + packet);
190 191 192
                // The presence packet contains an invalid presence type so replace it with
                // an available presence type
                packet.setType(null);
193
            }
194 195 196
            // Check that the presence show is valid. If not then assume available show value
            try {
                packet.getShow();
197
            }
198
            catch (IllegalArgumentException e) {
199
                Log.debug("Invalid presence (show): " + packet);
200 201 202
                // The presence packet contains an invalid presence show so replace it with
                // an available presence show
                packet.setShow(null);
203
            }
204 205 206 207 208 209
            if (session.getStatus() == Session.STATUS_CLOSED && packet.isAvailable()) {
                // Ignore available presence packets sent from a closed session. A closed
                // session may have buffered data pending to be processes so we want to ignore
                // just Presences of type available
                Log.warn("Ignoring available presence packet of closed session: " + packet);
                return;
210
            }
211 212 213 214 215 216
            processPresence(packet);
        }
        else if ("iq".equals(tag)) {
            IQ packet;
            try {
                packet = getIQ(doc);
217
            }
218
            catch(IllegalArgumentException e) {
219
                Log.debug("SocketReader: Rejecting packet. JID malformed", e);
220 221 222 223
                // The original packet contains a malformed JID so answer an error
                IQ reply = new IQ();
                if (!doc.elements().isEmpty()) {
                    reply.setChildElement(((Element) doc.elements().get(0)).createCopy());
224
                }
225 226 227 228
                reply.setID(doc.attributeValue("id"));
                reply.setTo(session.getAddress());
                if (doc.attributeValue("to") != null) {
                    reply.getElement().addAttribute("from", doc.attributeValue("to"));
229
                }
230 231 232
                reply.setError(PacketError.Condition.jid_malformed);
                session.process(reply);
                return;
233
            }
234
            processIQ(packet);
235
        }
236 237 238 239 240 241
        else
        {
            if (!processUnknowPacket(doc)) {
                Log.warn(LocaleUtils.getLocalizedString("admin.error.packet.tag") +
                        doc.asXML());
                open = false;
242 243 244 245 246 247
            }
        }
    }

    /**
     * Process the received IQ packet. Registered
248
     * {@link org.jivesoftware.openfire.interceptor.PacketInterceptor} will be invoked before
249 250 251 252 253 254 255
     * and after the packet was routed.<p>
     *
     * Subclasses may redefine this method for different reasons such as modifying the sender
     * of the packet to avoid spoofing, rejecting the packet or even process the packet in
     * another thread.
     *
     * @param packet the received packet.
256
     * @throws UnauthorizedException if the connection required security but was not secured.
257 258 259 260 261 262 263 264
     */
    protected void processIQ(IQ packet) throws UnauthorizedException {
        // Ensure that connection was secured if TLS was required
        if (connection.getTlsPolicy() == Connection.TLSPolicy.required &&
                !connection.isSecure()) {
            closeNeverSecuredConnection();
            return;
        }
265 266
        router.route(packet);
        session.incrementClientPacketCount();
267 268 269 270
    }

    /**
     * Process the received Presence packet. Registered
271
     * {@link org.jivesoftware.openfire.interceptor.PacketInterceptor} will be invoked before
272 273 274 275 276 277 278
     * and after the packet was routed.<p>
     *
     * Subclasses may redefine this method for different reasons such as modifying the sender
     * of the packet to avoid spoofing, rejecting the packet or even process the packet in
     * another thread.
     *
     * @param packet the received packet.
279
     * @throws UnauthorizedException if the connection required security but was not secured.
280 281 282 283 284 285 286 287
     */
    protected void processPresence(Presence packet) throws UnauthorizedException {
        // Ensure that connection was secured if TLS was required
        if (connection.getTlsPolicy() == Connection.TLSPolicy.required &&
                !connection.isSecure()) {
            closeNeverSecuredConnection();
            return;
        }
288 289
        router.route(packet);
        session.incrementClientPacketCount();
290 291 292 293
    }

    /**
     * Process the received Message packet. Registered
294
     * {@link org.jivesoftware.openfire.interceptor.PacketInterceptor} will be invoked before
295 296 297 298 299 300 301
     * and after the packet was routed.<p>
     *
     * Subclasses may redefine this method for different reasons such as modifying the sender
     * of the packet to avoid spoofing, rejecting the packet or even process the packet in
     * another thread.
     *
     * @param packet the received packet.
302
     * @throws UnauthorizedException if the connection required security but was not secured.
303 304 305 306 307 308 309 310
     */
    protected void processMessage(Message packet) throws UnauthorizedException {
        // Ensure that connection was secured if TLS was required
        if (connection.getTlsPolicy() == Connection.TLSPolicy.required &&
                !connection.isSecure()) {
            closeNeverSecuredConnection();
            return;
        }
311 312
        router.route(packet);
        session.incrementClientPacketCount();
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
    }

    /**
     * Returns true if a received packet of an unkown type (i.e. not a Message, Presence
     * or IQ) has been processed. If the packet was not processed then an exception will
     * be thrown which will make the thread to stop processing further packets.
     *
     * @param doc the DOM element of an unkown type.
     * @return  true if a received packet has been processed.
     */
    abstract boolean processUnknowPacket(Element doc);

    /**
     * Returns the last time a full Document was read or a heartbeat was received. Hearbeats
     * are represented as whitespaces received while a Document is not being parsed.
     *
     * @return the time in milliseconds when the last document or heartbeat was received.
     */
    long getLastActive() {
        return reader.getLastActive();
    }

335 336 337 338 339 340 341
    /**
     * Returns a name that identifies the type of reader and the unique instance.
     *
     * @return a name that identifies the type of reader and the unique instance.
     */
    abstract String getName();

342 343 344 345
    /**
     * Close the connection since TLS was mandatory and the entity never negotiated TLS. Before
     * closing the connection a stream error will be sent to the entity.
     */
346
    void closeNeverSecuredConnection() {
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
        // Set the not_authorized error
        StreamError error = new StreamError(StreamError.Condition.not_authorized);
        // Deliver stanza
        connection.deliverRawText(error.toXML());
        // Close the underlying connection
        connection.close();
        // Log a warning so that admins can track this case from the server side
        Log.warn("TLS was required by the server and connection was never secured. " +
                "Closing connection : " + connection);
    }

    private IQ getIQ(Element doc) {
        Element query = doc.element("query");
        if (query != null && "jabber:iq:roster".equals(query.getNamespaceURI())) {
            return new Roster(doc);
        }
        else {
            return new IQ(doc);
        }
    }

    /**
     * Uses the XPP to grab the opening stream tag and create an active session
     * object. The session to create will depend on the sent namespace. In all
     * cases, the method obtains the opening stream tag, checks for errors, and
     * either creates a session or returns an error and kills the connection.
     * If the connection remains open, the XPP will be set to be ready for the
     * first packet. A call to next() should result in an START_TAG state with
     * the first packet in the stream.
376 377 378 379
     *
     * @throws UnauthorizedException if the connection required security but was not secured.
     * @throws XmlPullParserException if there was an XML error while creating the session.
     * @throws IOException if an IO error occured while creating the session.
380
     */
381 382
    protected void createSession()
            throws UnauthorizedException, XmlPullParserException, IOException {
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
        XmlPullParser xpp = reader.getXPPParser();
        for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
            eventType = xpp.next();
        }

        // Check that the TO attribute of the stream header matches the server name or a valid
        // subdomain. If the value of the 'to' attribute is not valid then return a host-unknown
        // error and close the underlying connection.
        String host = reader.getXPPParser().getAttributeValue("", "to");
        if (validateHost() && isHostUnknown(host)) {
            StringBuilder sb = new StringBuilder(250);
            sb.append("<?xml version='1.0' encoding='");
            sb.append(CHARSET);
            sb.append("'?>");
            // Append stream header
            sb.append("<stream:stream ");
399
            sb.append("from=\"").append(host).append("\" ");
400
            sb.append("id=\"").append( STREAM_ID_FACTORY.createStreamID() ).append( "\" " );
401 402 403 404 405 406 407 408 409 410 411
            sb.append("xmlns=\"").append(xpp.getNamespace(null)).append("\" ");
            sb.append("xmlns:stream=\"").append(xpp.getNamespace("stream")).append("\" ");
            sb.append("version=\"1.0\">");
            // Set the host_unknown error
            StreamError error = new StreamError(StreamError.Condition.host_unknown);
            sb.append(error.toXML());
            // Deliver stanza
            connection.deliverRawText(sb.toString());
            // Close the underlying connection
            connection.close();
            // Log a warning so that admins can track this cases from the server side
412 413
            Log.warn("Closing session due to incorrect hostname in stream header. Host: " + host +
                    ". Connection: " + connection);
414 415 416 417 418
        }

        // Create the correct session based on the sent namespace. At this point the server
        // may offer the client to secure the connection. If the client decides to secure
        // the connection then a <starttls> stanza should be received
419
        else if (!createSession(xpp.getNamespace(null))) {
420 421 422 423 424 425 426 427
            // No session was created because of an invalid namespace prefix so answer a stream
            // error and close the underlying connection
            StringBuilder sb = new StringBuilder(250);
            sb.append("<?xml version='1.0' encoding='");
            sb.append(CHARSET);
            sb.append("'?>");
            // Append stream header
            sb.append("<stream:stream ");
428
            sb.append("from=\"").append(host).append("\" ");
429
            sb.append("id=\"").append( STREAM_ID_FACTORY.createStreamID() ).append( "\" " );
430 431 432 433 434 435 436 437 438 439
            sb.append("xmlns=\"").append(xpp.getNamespace(null)).append("\" ");
            sb.append("xmlns:stream=\"").append(xpp.getNamespace("stream")).append("\" ");
            sb.append("version=\"1.0\">");
            // Include the bad-namespace-prefix in the response
            StreamError error = new StreamError(StreamError.Condition.bad_namespace_prefix);
            sb.append(error.toXML());
            connection.deliverRawText(sb.toString());
            // Close the underlying connection
            connection.close();
            // Log a warning so that admins can track this cases from the server side
440 441
            Log.warn("Closing session due to bad_namespace_prefix in stream header. Prefix: " +
                    xpp.getNamespace(null) + ". Connection: " + connection);
442 443 444 445 446 447 448 449 450 451 452 453 454 455
        }
    }

    private boolean isHostUnknown(String host) {
        if (host == null) {
            // Answer false since when using server dialback the stream header will not
            // have a TO attribute
            return false;
        }
        if (serverName.equals(host)) {
            // requested host matched the server name
            return false;
        }
        // Check if the host matches a subdomain of this host
Gaston Dombiak's avatar
Gaston Dombiak committed
456
        return !routingTable.hasComponentRoute(new JID(host));
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
    }

    /**
     * Returns the stream namespace. (E.g. jabber:client, jabber:server, etc.).
     *
     * @return the stream namespace.
     */
    abstract String getNamespace();

    /**
     * Returns true if the value of the 'to' attribute in the stream header should be
     * validated. If the value of the 'to' attribute is not valid then a host-unknown error
     * will be returned and the underlying connection will be closed.
     *
     * @return true if the value of the 'to' attribute in the initial stream header should be
     *         validated.
     */
    abstract boolean validateHost();

    /**
     * Notification message indicating that the SocketReader is shutting down. The thread will
     * stop reading and processing new requests. Subclasses may want to redefine this message
     * for releasing any resource they might need.
     */
    protected void shutdown() {
    }

    /**
485
     * Creates the appropriate {@link org.jivesoftware.openfire.session.Session} subclass based on the specified namespace.
486 487 488
     *
     * @param namespace the namespace sent in the stream element. eg. jabber:client.
     * @return the created session or null.
489 490 491
     * @throws UnauthorizedException if the connection required security but was not secured.
     * @throws XmlPullParserException if there was an XML error while creating the session.
     * @throws IOException if an IO error occured while creating the session.
492
     */
493
    abstract boolean createSession(String namespace) throws UnauthorizedException,
494
            XmlPullParserException, IOException;
495 496 497 498

    public String getExtraNamespaces() {
        return null;
    }
499
}