OutgoingServerSocketReader.java 5.82 KB
Newer Older
1 2 3 4 5
/**
 * $RCSfile$
 * $Revision: 1530 $
 * $Date: 2005-06-17 18:38:27 -0300 (Fri, 17 Jun 2005) $
 *
6
 * Copyright (C) 2005-2008 Jive Software. All rights reserved.
7
 *
8 9 10 11 12 13 14 15 16 17 18
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
19 20
 */

21
package org.jivesoftware.openfire.server;
22 23 24 25 26 27

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

28 29 30 31 32 33
import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.openfire.session.OutgoingServerSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

34 35 36 37 38 39 40 41 42 43 44 45 46
/**
 * An OutgoingServerSocketReader is responsible for reading and queueing the DOM Element sent by
 * a remote server. Since the DOM Elements are received using the outgoing connection only special
 * stanzas may be sent by the remote server (eg. db:result stanzas for answering if the
 * Authoritative Server verified the key sent by this server).<p>
 *
 * This class is also responsible for closing the outgoing connection if the remote server sent
 * an end of the stream element.
 *
 * @author Gaston Dombiak
 */
public class OutgoingServerSocketReader {

47 48
	private static final Logger Log = LoggerFactory.getLogger(OutgoingServerSocketReader.class);

49 50 51 52 53 54
    private OutgoingServerSession session;
    private boolean open = true;
    private XMPPPacketReader reader = null;
    /**
     * Queue that holds the elements read by the XMPPPacketReader.
     */
guus's avatar
guus committed
55
    private BlockingQueue<Element> elements = new LinkedBlockingQueue<Element>(10000);
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99

    public OutgoingServerSocketReader(XMPPPacketReader reader) {
        this.reader = reader;
        init();
    }

    /**
     * Returns the OutgoingServerSession for which this reader is working for or <tt>null</tt> if
     * a OutgoingServerSession was not created yet. While the OutgoingServerSession is being
     * created it is possible to have a reader with no session.
     *
     * @return the OutgoingServerSession for which this reader is working for or <tt>null</tt> if
     *         a OutgoingServerSession was not created yet.
     */
    public OutgoingServerSession getSession() {
        return session;
    }

    /**
     * Sets the OutgoingServerSession for which this reader is working for.
     *
     * @param session the OutgoingServerSession for which this reader is working for
     */
    public void setSession(OutgoingServerSession session) {
        this.session = session;
    }

    /**
     * Retrieves and removes the first received element that was stored in the queue, waiting
     * if necessary up to the specified wait time if no elements are present on this queue.
     *
     * @param timeout how long to wait before giving up, in units of <tt>unit</tt>.
     * @param unit a <tt>TimeUnit</tt> determining how to interpret the <tt>timeout</tt> parameter.
     * @return the head of this queue, or <tt>null</tt> if the specified waiting time elapses
     *         before an element is present.
     * @throws InterruptedException if interrupted while waiting.
     */
    public Element getElement(long timeout, TimeUnit unit) throws InterruptedException {
        return elements.poll(timeout, unit);
    }

    private void init() {
        // Create a thread that will read and store DOM Elements.
        Thread thread = new Thread("Outgoing Server Reader") {
100 101
            @Override
			public void run() {
102
                while (open) {
Gaston Dombiak's avatar
Gaston Dombiak committed
103
                    Element doc;
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
                    try {
                        doc = reader.parseDocument().getRootElement();

                        if (doc == null) {
                            // Stop reading the stream since the remote server has sent an end of
                            // stream element and probably closed the connection.
                            closeSession();
                        }
                        else {
                            elements.add(doc);
                        }
                    }
                    catch (IOException e) {
                        String message = "Finishing Outgoing Server Reader. ";
                        if (session != null) {
                            message = message + "Closing session: " + session.toString();
                        }
                        else {
                            message = message + "No session to close.";
                        }
124
                        Log.debug("OutgoingServerSocketReader: "+message, e);
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
                        closeSession();
                    }
                    catch (Exception e) {
                        String message = "Finishing Outgoing Server Reader. ";
                        if (session != null) {
                            message = message + "Closing session: " + session.toString();
                        }
                        else {
                            message = message + "No session to close.";
                        }
                        Log.error(message, e);
                        closeSession();
                    }
                }
            }
        };
        thread.setDaemon(true);
        thread.start();
    }

    private void closeSession() {
        open = false;
        if (session != null) {
148
            session.close();
149 150 151
        }
    }
}