OutgoingSessionPromise.java 14.5 KB
Newer Older
1 2 3 4 5
/**
 * $RCSfile: $
 * $Revision: $
 * $Date: $
 *
6
 * Copyright (C) 2005-2008 Jive Software. All rights reserved.
7
 *
8 9 10 11 12 13 14 15 16 17 18
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
19 20
 */

21
package org.jivesoftware.openfire.server;
22

23
import java.util.ArrayList;
24
import java.util.HashMap;
25
import java.util.List;
26 27 28 29 30 31 32 33 34
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

35 36 37
import org.jivesoftware.openfire.RoutableChannelHandler;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer;
38
import org.jivesoftware.openfire.session.ConnectionSettings;
39
import org.jivesoftware.openfire.session.LocalOutgoingServerSession;
40
import org.jivesoftware.openfire.spi.RoutingTableImpl;
Gaston Dombiak's avatar
Gaston Dombiak committed
41
import org.jivesoftware.util.JiveGlobals;
42 43
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
44 45 46 47 48 49 50 51
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence;
52 53 54 55 56 57 58 59

/**
 * An OutgoingSessionPromise provides an asynchronic way for sending packets to remote servers.
 * When looking for a route to a remote server that does not have an existing connection, a session
 * promise is returned.
 *
 * This class will queue packets and process them in another thread. The processing thread will
 * use a pool of thread that will actually do the hard work. The threads in the pool will try
guus's avatar
guus committed
60
 * to connect to remote servers and deliver the packets. If an error occurred while establishing
61 62 63 64 65 66
 * the connection or sending the packet an error will be returned to the sender of the packet.
 *
 * @author Gaston Dombiak
 */
public class OutgoingSessionPromise implements RoutableChannelHandler {

67 68
	private static final Logger Log = LoggerFactory.getLogger(OutgoingSessionPromise.class);

69 70 71 72 73
    private static OutgoingSessionPromise instance = new OutgoingSessionPromise();

    /**
     * Queue that holds the packets pending to be sent to remote servers.
     */
guus's avatar
guus committed
74
    private BlockingQueue<Packet> packets = new LinkedBlockingQueue<Packet>(10000);
75 76 77 78 79 80 81

    /**
     * Pool of threads that will create outgoing sessions to remote servers and send
     * the queued packets.
     */
    private ThreadPoolExecutor threadPool;

82 83
    private Map<String, PacketsProcessor> packetsProcessors = new HashMap<String, PacketsProcessor>();

84 85 86 87 88
    /**
     * Cache (unlimited, never expire) that holds outgoing sessions to remote servers from this server.
     * Key: server domain, Value: nodeID
     */
    private Cache<String, byte[]> serversCache;
89 90 91 92 93 94 95 96 97 98 99 100
    /**
     * Flag that indicates if the process that consumed the queued packets should stop.
     */
    private boolean shutdown = false;
    private RoutingTable routingTable;

    private OutgoingSessionPromise() {
        super();
        init();
    }

    private void init() {
101
        serversCache = CacheFactory.createCache(RoutingTableImpl.S2S_CACHE_NAME);
102 103
        routingTable = XMPPServer.getInstance().getRoutingTable();
        // Create a pool of threads that will process queued packets.
104 105
        int maxThreads = JiveGlobals.getIntProperty(ConnectionSettings.Server.QUEUE_MAX_THREADS, 20);
        int queueSize = JiveGlobals.getIntProperty(ConnectionSettings.Server.QUEUE_SIZE, 50);
106 107 108 109 110
        if (maxThreads < 10) {
            // Ensure that the max number of threads in the pool is at least 10
            maxThreads = 10;
        }
        threadPool =
God Ly's avatar
God Ly committed
111
                new ThreadPoolExecutor(maxThreads/4, maxThreads, 60, TimeUnit.SECONDS,
112 113
                        new LinkedBlockingQueue<Runnable>(queueSize),
                        new ThreadPoolExecutor.CallerRunsPolicy());
114 115 116 117 118 119 120 121 122 123 124 125

        // Start the thread that will consume the queued packets. Each pending packet will
        // be actually processed by a thread of the pool (when available). If an error occurs
        // while creating the remote session or sending the packet then a packet with error 502
        // will be sent to the sender of the packet
        Thread thread = new Thread(new Runnable() {
            public void run() {
                while (!shutdown) {
                    try {
                        if (threadPool.getActiveCount() < threadPool.getMaximumPoolSize()) {
                            // Wait until a packet is available
                            final Packet packet = packets.take();
126 127 128 129 130 131 132 133

                            boolean newProcessor = false;
                            PacketsProcessor packetsProcessor;
                            String domain = packet.getTo().getDomain();
                            synchronized (domain.intern()) {
                                packetsProcessor = packetsProcessors.get(domain);
                                if (packetsProcessor == null) {
                                    packetsProcessor =
134
                                            new PacketsProcessor(OutgoingSessionPromise.this, domain);
135 136
                                    packetsProcessors.put(domain, packetsProcessor);
                                    newProcessor = true;
137
                                }
138 139 140 141 142 143 144
                                packetsProcessor.addPacket(packet);
                            }

                            if (newProcessor) {
                                // Process the packet in another thread
                                threadPool.execute(packetsProcessor);
                            }
145 146 147 148 149 150 151
                        }
                        else {
                            // No threads are available so take a nap :)
                            Thread.sleep(200);
                        }
                    }
                    catch (InterruptedException e) {
152
                        // Do nothing
153 154
                    }
                    catch (Exception e) {
155
                        Log.error(e.getMessage(), e);
156 157 158 159 160 161 162 163 164 165 166 167 168
                    }
                }
            }
        }, "Queued Packets Processor");
        thread.setDaemon(true);
        thread.start();

    }

    public static OutgoingSessionPromise getInstance() {
        return instance;
    }

169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
    /**
     * Shuts down the thread that consumes the queued packets and also stops the pool
     * of threads that actually send the packets to the remote servers.
     */
    public void shutdown() {
        threadPool.shutdown();
        shutdown = true;
    }

    public JID getAddress() {
        // TODO Will somebody send this message to me????
        return null;
    }

    public void process(Packet packet) {
        // Queue the packet. Another process will process the queued packets.
        packets.add(packet.createCopy());
    }

    private void processorDone(PacketsProcessor packetsProcessor) {
        synchronized(packetsProcessor.getDomain().intern()) {
            if (packetsProcessor.isDone()) {
                packetsProcessors.remove(packetsProcessor.getDomain());
192 193
            }
            else {
194
                threadPool.execute(packetsProcessor);
195 196 197 198
            }
        }
    }

199
    private class PacketsProcessor implements Runnable {
200 201 202

        private OutgoingSessionPromise promise;
        private String domain;
203
        private Queue<Packet> packetQueue = new ConcurrentLinkedQueue<Packet>();
204 205 206 207 208 209 210 211 212
        /**
         * Keep track of the last time s2s failed. Once a packet failed to be sent to a
         * remote server this stamp will be used so that for the next 5 seconds future packets
         * for the same domain will automatically fail. After 5 seconds a new attempt to
         * establish a s2s connection and deliver pendings packets will be performed.
         * This optimization is good when the server is receiving many packets per second for the
         * same domain. This will help reduce high CPU consumption.
         */
        private long failureTimestamp = -1;
213

214
        public PacketsProcessor(OutgoingSessionPromise promise, String domain) {
215 216
            this.promise = promise;
            this.domain = domain;
217 218
        }

219 220
        public void run() {
            while (!isDone()) {
221
                Packet packet = packetQueue.poll();
222
                if (packet != null) {
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
                    // Check if s2s already failed
                    if (failureTimestamp > 0) {
                        // Check if enough time has passed to attempt a new s2s
                        if (System.currentTimeMillis() - failureTimestamp < 5000) {
                            returnErrorToSender(packet);
                            Log.debug(
                                    "OutgoingSessionPromise: Error sending packet to remote server (fast discard): " +
                                            packet);
                            continue;
                        }
                        else {
                            // Reset timestamp of last failure since we are ready to try again doing a s2s
                            failureTimestamp = -1;
                        }
                    }
238 239 240 241 242 243
                    try {
                        sendPacket(packet);
                    }
                    catch (Exception e) {
                        returnErrorToSender(packet);
                        Log.debug(
244
                                "OutgoingSessionPromise: Error sending packet to remote server: " + packet,
245
                                e);
246 247
                        // Mark the time when s2s failed
                        failureTimestamp = System.currentTimeMillis();
248
                    }
249 250
                }
            }
251 252 253 254 255
            promise.processorDone(this);
        }

        private void sendPacket(Packet packet) throws Exception {
            // Create a connection to the remote server from the domain where the packet has been sent
256 257
            boolean created;
            // Make sure that only one cluster node is creating the outgoing connection
258 259
            // TODO: Evaluate why removing the oss part causes nasty s2s and lockup issues.
            Lock lock = CacheFactory.getLock(domain+"oss", serversCache);
260 261 262 263 264 265 266
            try {
                lock.lock();
                created = LocalOutgoingServerSession
                        .authenticateDomain(packet.getFrom().getDomain(), packet.getTo().getDomain());
            } finally {
                lock.unlock();
            }
267
            if (created) {
268 269 270
                if (!routingTable.hasServerRoute(packet.getTo())) {
                    throw new Exception("Route created but not found!!!");
                }
271
                // A connection to the remote server was created so get the route and send the packet
Gaston Dombiak's avatar
Gaston Dombiak committed
272
                routingTable.routePacket(packet.getTo(), packet, false);
273
            }
274 275 276
            else {
                throw new Exception("Failed to create connection to remote server");
            }
277
        }
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294

        private void returnErrorToSender(Packet packet) {
            XMPPServer server = XMPPServer.getInstance();
            JID from = packet.getFrom();
            JID to = packet.getTo();
            if (!server.isLocal(from) && !XMPPServer.getInstance().matchesComponent(from) &&
                    !server.isLocal(to) && !XMPPServer.getInstance().matchesComponent(to)) {
                // Do nothing since the sender and receiver of the packet that failed to reach a remote
                // server are not local users. This prevents endless loops if the FROM or TO address
                // are non-existen addresses
                return;
            }

            // TODO Send correct error condition: timeout or not_found depending on the real error
            try {
                if (packet instanceof IQ) {
                    IQ reply = new IQ();
295
                    reply.setID(packet.getID());
296 297 298 299
                    reply.setTo(from);
                    reply.setFrom(to);
                    reply.setChildElement(((IQ) packet).getChildElement().createCopy());
                    reply.setError(PacketError.Condition.remote_server_not_found);
Gaston Dombiak's avatar
Gaston Dombiak committed
300
                    routingTable.routePacket(reply.getTo(), reply, true);
301 302
                }
                else if (packet instanceof Presence) {
303 304 305
                	// workaround for OF-23. "undo" the 'setFrom' to a bare JID 
                	// by sending the error to all available resources.
                	final List<JID> routes = new ArrayList<JID>(); 
guus's avatar
guus committed
306 307
                	if (from.getResource() == null || from.getResource().trim().length() == 0) {
                    	routes.addAll(routingTable.getRoutes(from, null));
308
                    } else {
guus's avatar
guus committed
309
                    	routes.add(from);
310 311 312 313 314
                    }
                	
                	for (JID route : routes) {
	                    Presence reply = new Presence();
	                    reply.setID(packet.getID());
guus's avatar
guus committed
315 316
	                    reply.setTo(route);
	                    reply.setFrom(to);
317 318 319
	                    reply.setError(PacketError.Condition.remote_server_not_found);
	                    routingTable.routePacket(reply.getTo(), reply, true);
                	}
320 321 322 323 324 325 326 327 328
                }
                else if (packet instanceof Message) {
                    Message reply = new Message();
                    reply.setID(packet.getID());
                    reply.setTo(from);
                    reply.setFrom(to);
                    reply.setType(((Message)packet).getType());
                    reply.setThread(((Message)packet).getThread());
                    reply.setError(PacketError.Condition.remote_server_not_found);
Gaston Dombiak's avatar
Gaston Dombiak committed
329
                    routingTable.routePacket(reply.getTo(), reply, true);
330 331 332 333 334
                }
            }
            catch (Exception e) {
                Log.warn("Error returning error to sender. Original packet: " + packet, e);
            }
Gaston Dombiak's avatar
Gaston Dombiak committed
335
        }
336

337
        public void addPacket(Packet packet) {
338
            packetQueue.add(packet);
339
        }
340

341 342 343
        public String getDomain() {
            return domain;
        }
344

345
        public boolean isDone() {
346
            return packetQueue.isEmpty();
347
        }
348 349
    }
}