ConnectionHandler.java 10 KB
Newer Older
1 2 3 4
/**
 * $Revision: $
 * $Date: $
 *
5
 * Copyright (C) 2005-2008 Jive Software. All rights reserved.
6
 *
7 8 9 10 11 12 13 14 15 16 17
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
18 19
 */

20
package org.jivesoftware.openfire.nio;
21

22 23 24 25
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

26 27 28
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
29
import org.apache.mina.filter.codec.ProtocolDecoderException;
30
import org.dom4j.io.XMPPPacketReader;
31 32 33 34
import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.net.MXParser;
import org.jivesoftware.openfire.net.ServerTrafficCounter;
import org.jivesoftware.openfire.net.StanzaHandler;
35 36
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
37 38
import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory;
39
import org.xmpp.packet.StreamError;
40

41 42
import javax.net.ssl.SSLHandshakeException;

43 44 45 46 47 48 49 50
/**
 * A ConnectionHandler is responsible for creating new sessions, destroying sessions and delivering
 * received XML stanzas to the proper StanzaHandler.
 *
 * @author Gaston Dombiak
 */
public abstract class ConnectionHandler extends IoHandlerAdapter {

51 52
	private static final Logger Log = LoggerFactory.getLogger(ConnectionHandler.class);

53 54 55 56 57
    /**
     * The utf-8 charset for decoding and encoding Jabber packet streams.
     */
    static final String CHARSET = "UTF-8";
    static final String XML_PARSER = "XML-PARSER";
58 59
    protected static final String HANDLER = "HANDLER";
    protected static final String CONNECTION = "CONNECTION";
60 61

    protected String serverName;
62 63 64 65 66 67 68 69 70 71
    private static final ThreadLocal<XMPPPacketReader> PARSER_CACHE = new ThreadLocal<XMPPPacketReader>()
            {
               @Override
               protected XMPPPacketReader initialValue()
               {
                  final XMPPPacketReader parser = new XMPPPacketReader();
                  parser.setXPPFactory( factory );
                  return parser;
               }
            };
72 73 74 75
    /**
     * Reuse the same factory for all the connections.
     */
    private static XmlPullParserFactory factory = null;
76

77 78 79 80 81 82 83 84 85
    static {
        try {
            factory = XmlPullParserFactory.newInstance(MXParser.class.getName(), null);
            factory.setNamespaceAware(true);
        }
        catch (XmlPullParserException e) {
            Log.error("Error creating a parser factory", e);
        }
    }
86 87 88 89 90

    protected ConnectionHandler(String serverName) {
        this.serverName = serverName;
    }

91 92
    @Override
	public void sessionOpened(IoSession session) throws Exception {
93
        // Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter.
94
        final XMLLightweightParser parser = new XMLLightweightParser(CHARSET);
95 96
        session.setAttribute(XML_PARSER, parser);
        // Create a new NIOConnection for the new session
97
        final NIOConnection connection = createNIOConnection(session);
98 99
        session.setAttribute(CONNECTION, connection);
        session.setAttribute(HANDLER, createStanzaHandler(connection));
100 101 102 103 104
        // Set the max time a connection can be idle before closing it. This amount of seconds
        // is divided in two, as Openfire will ping idle clients first (at 50% of the max idle time)
        // before disconnecting them (at 100% of the max idle time). This prevents Openfire from
        // removing connections without warning.
        final int idleTime = getMaxIdleTime() / 2;
105
        if (idleTime > 0) {
106
            session.getConfig().setIdleTime(IdleStatus.READER_IDLE, idleTime);
107 108 109
        }
    }

110 111
    @Override
	public void sessionClosed(IoSession session) throws Exception {
112 113 114 115 116 117
        // Get the connection for this session
        Connection connection = (Connection) session.getAttribute(CONNECTION);
        // Inform the connection that it was closed
        connection.close();
    }

118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
    /**
	 * Invoked when a MINA session has been idle for half of the allowed XMPP
	 * session idle time as specified by {@link #getMaxIdleTime()}. This method
	 * will be invoked each time that such a period passes (even if no IO has
	 * occurred in between).
	 * 
	 * Openfire will disconnect a session the second time this method is
	 * invoked, if no IO has occurred between the first and second invocation.
	 * This allows extensions of this class to use the first invocation to check
	 * for livelyness of the MINA session (e.g by polling the remote entity, as
	 * {@link ClientConnectionHandler} does).
	 * 
	 * @see org.apache.mina.common.IoHandlerAdapter#sessionIdle(org.apache.mina.common.IoSession,
	 *      org.apache.mina.common.IdleStatus)
	 */
133 134
    @Override
	public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
135 136 137 138 139 140 141 142
        if (session.getIdleCount(status) > 1) {
            // Get the connection for this session
            final Connection connection = (Connection) session.getAttribute(CONNECTION);
	        // Close idle connection
	        if (Log.isDebugEnabled()) {
	            Log.debug("ConnectionHandler: Closing connection that has been idle: " + connection);
	        }
	        connection.close();
143 144 145
        }
    }

146 147
    @Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
148 149
        if (cause instanceof IOException) {
            // TODO Verify if there were packets pending to be sent and decide what to do with them
150
            Log.info("ConnectionHandler reports IOException for session: " + session, cause);
151 152 153
            if (cause instanceof SSLHandshakeException) {
                session.close(true);
            }
154
        }
155 156
        else if (cause instanceof ProtocolDecoderException) {
            Log.warn("Closing session due to exception: " + session, cause);
157 158 159 160
            
            // PIO-524: Determine stream:error message.
            final StreamError error;
            if (cause.getCause() != null && cause.getCause() instanceof XMLNotWellFormedException) {
161
            	error = new StreamError(StreamError.Condition.not_well_formed);
162 163 164
            } else {
            	error = new StreamError(StreamError.Condition.internal_server_error);
            }
165

166 167
            final Connection connection = (Connection) session.getAttribute(CONNECTION);
            connection.deliverRawText(error.toXML());
168
            session.close(true);
169
        }
170
        else {
171
            Log.error("ConnectionHandler reports unexpected exception for session: " + session, cause);
172 173 174
        }
    }

175 176
    @Override
	public void messageReceived(IoSession session, Object message) throws Exception {
177 178
        // Get the stanza handler for this session
        StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
179 180 181 182
        // Get the parser to use to process stanza. For optimization there is going
        // to be a parser for each running thread. Each Filter will be executed
        // by the Executor placed as the first Filter. So we can have a parser associated
        // to each Thread
183
        final XMPPPacketReader parser = PARSER_CACHE.get();
184 185
        // Update counter of read btyes
        updateReadBytesCounter(session);
186
        //System.out.println("RCVD: " + message);
187 188
        // Let the stanza handler process the received stanza
        try {
189
            handler.process((String) message, parser);
190 191 192 193 194 195 196
        } catch (Exception e) {
            Log.error("Closing connection due to error while processing message: " + message, e);
            Connection connection = (Connection) session.getAttribute(CONNECTION);
            connection.close();
        }
    }

197 198
    @Override
	public void messageSent(IoSession session, Object message) throws Exception {
199 200 201
        super.messageSent(session, message);
        // Update counter of written btyes
        updateWrittenBytesCounter(session);
202
        //System.out.println("SENT: " + Charset.forName("UTF-8").decode(((ByteBuffer)message).buf()));
203 204
    }

205 206 207 208 209 210 211 212 213 214 215
    abstract NIOConnection createNIOConnection(IoSession session);

    abstract StanzaHandler createStanzaHandler(NIOConnection connection);

    /**
     * Returns the max number of seconds a connection can be idle (both ways) before
     * being closed.<p>
     *
     * @return the max number of seconds a connection can be idle.
     */
    abstract int getMaxIdleTime();
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255

    /**
     * Updates the system counter of read bytes. This information is used by the incoming
     * bytes statistic.
     *
     * @param session the session that read more bytes from the socket.
     */
    private void updateReadBytesCounter(IoSession session) {
        long currentBytes = session.getReadBytes();
        Long prevBytes = (Long) session.getAttribute("_read_bytes");
        long delta;
        if (prevBytes == null) {
            delta = currentBytes;
        }
        else {
            delta = currentBytes - prevBytes;
        }
        session.setAttribute("_read_bytes", currentBytes);
        ServerTrafficCounter.incrementIncomingCounter(delta);
    }

    /**
     * Updates the system counter of written bytes. This information is used by the outgoing
     * bytes statistic.
     *
     * @param session the session that wrote more bytes to the socket.
     */
    private void updateWrittenBytesCounter(IoSession session) {
        long currentBytes = session.getWrittenBytes();
        Long prevBytes = (Long) session.getAttribute("_written_bytes");
        long delta;
        if (prevBytes == null) {
            delta = currentBytes;
        }
        else {
            delta = currentBytes - prevBytes;
        }
        session.setAttribute("_written_bytes", currentBytes);
        ServerTrafficCounter.incrementOutgoingCounter(delta);
    }
256
}