Commit 29ccd90d authored by Dave Cridland's avatar Dave Cridland

Merge pull request #205 from surevine/dwd/OF-115

OF-115 Remove race during outgoing session creation
parents e9048188 d0667e21
......@@ -94,7 +94,7 @@ public interface RoutingTable {
* @param route the address associated to the route.
* @param destination the outgoing server session.
*/
void addServerRoute(JID route, LocalOutgoingServerSession destination);
void addServerRoute(JID route, RoutableChannelHandler destination);
/**
* Adds a route to the routing table for the specified internal or external component. <p>
......
......@@ -54,7 +54,6 @@ import org.jivesoftware.openfire.event.SessionEventDispatcher;
import org.jivesoftware.openfire.http.HttpConnection;
import org.jivesoftware.openfire.http.HttpSession;
import org.jivesoftware.openfire.multiplex.ConnectionMultiplexerManager;
import org.jivesoftware.openfire.server.OutgoingSessionPromise;
import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.ClientSessionInfo;
import org.jivesoftware.openfire.session.ComponentSession;
......@@ -1485,8 +1484,6 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
@Override
public void stop() {
Log.debug("SessionManager: Stopping server");
// Stop threads that are sending packets to remote servers
OutgoingSessionPromise.getInstance().shutdown();
if (JiveGlobals.getBooleanProperty("shutdownMessage.enabled")) {
sendServerMessage(null, LocaleUtils.getLocalizedString("admin.shutdown.now"));
}
......
/**
*
*/
package org.jivesoftware.openfire.server;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.jivesoftware.openfire.PacketException;
import org.jivesoftware.openfire.RoutableChannelHandler;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.session.ConnectionSettings;
import org.jivesoftware.openfire.session.LocalOutgoingServerSession;
import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.ServerSession;
import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.util.JiveGlobals;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence;
/**
* @author dwd
*
*/
public class LocalOutgoingServerProxy implements RoutableChannelHandler {
private static final Logger log = LoggerFactory.getLogger(LocalOutgoingServerProxy.class);
private JID domain;
private ServerSession session;
private Queue<Packet> packets;
private static ExecutorService pool = createPool();
private long failureTimestamp = -1;
private boolean isTrying;
private static ExecutorService createPool() {
// Create a pool of threads that will process queued session requests.
int maxThreads = JiveGlobals.getIntProperty(ConnectionSettings.Server.QUEUE_MAX_THREADS, 20);
if (maxThreads < 10) {
// Ensure that the max number of threads in the pool is at least 10
maxThreads = 10;
}
ExecutorService pool = Executors.newFixedThreadPool(maxThreads);
return pool;
}
public LocalOutgoingServerProxy(final JID domain) {
this.domain = domain;
this.session = null;
this.packets = null;
}
public LocalOutgoingServerProxy(final String domain) {
this.domain = new JID(domain);
this.session = null;
this.packets = null;
}
public LocalOutgoingServerProxy(final JID domain, ServerSession session) {
this.domain = domain;
this.session = null;
this.packets = null;
}
public LocalOutgoingServerProxy(final String domain, ServerSession session) {
this.domain = new JID(domain);
this.session = null;
this.packets = null;
}
/* (non-Javadoc)
* @see org.jivesoftware.openfire.ChannelHandler#process(org.xmpp.packet.Packet)
*/
@Override
public synchronized void process(final Packet packet) throws UnauthorizedException,
PacketException {
if (this.session != null) {
this.session.process(packet);
return;
}
if (packets == null) {
packets = new LinkedBlockingQueue<Packet>();
log.info("Queued packet for {}.", domain.toString());
}
packets.add(packet.createCopy());
if (isTrying == false) {
final String fromDomain = packet.getFrom().getDomain().toString();
final String toDomain = packet.getTo().getDomain().toString();
if ((failureTimestamp == -1) || ((System.currentTimeMillis() - failureTimestamp) >= 5000)) {
isTrying = true;
log.debug("Spinning up new session to {}", domain.toString());
pool.execute(new Runnable() {
public void run() {
log.debug("Initiating connection thread for {} -> {} ({})", fromDomain, toDomain, domain.toString());
try {
ServerSession s = LocalOutgoingServerSession.authenticateDomain(fromDomain, toDomain); // Long-running.
if (s != null) {
sessionReady(s);
} else {
sessionFailed();
}
} catch(Exception e) {
log.debug("Session for {} failed with:", domain.toString(), e);
sessionFailed();
}
log.debug("Finished connection thread for {}", domain.toString());
return;
}
});
} else {
sessionFailed();
}
} else {
// Session creation in progress.
packets.add(packet);
}
}
protected synchronized void sessionReady(ServerSession session) {
isTrying = false;
log.debug("Spun up new session to {}", domain.toString());
int sent = 0;
this.session = session;
while (!this.packets.isEmpty()) {
Packet packet = this.packets.remove();
this.session.process(packet);
sent = sent + 1;
}
this.packets = null;
log.debug("Done, sent {} pending stanzas.", sent);
}
protected synchronized void sessionFailed() {
isTrying = false;
log.debug("Failed to spin up new session to {}", domain.toString());
while (!this.packets.isEmpty()) {
Packet packet = this.packets.remove();
LocalSession.returnErrorToSender(packet);
}
this.packets = null;
}
/* (non-Javadoc)
* @see org.jivesoftware.openfire.RoutableChannelHandler#getAddress()
*/
@Override
public JID getAddress() {
return this.domain;
}
public ServerSession getSession() {
return this.session;
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2005-2008 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.openfire.server;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import org.jivesoftware.openfire.RoutableChannelHandler;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.session.ConnectionSettings;
import org.jivesoftware.openfire.session.LocalOutgoingServerSession;
import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence;
/**
* An OutgoingSessionPromise provides an asynchronic way for sending packets to remote servers.
* When looking for a route to a remote server that does not have an existing connection, a session
* promise is returned.
*
* This class will queue packets and process them in another thread. The processing thread will
* use a pool of thread that will actually do the hard work. The threads in the pool will try
* to connect to remote servers and deliver the packets. If an error occurred while establishing
* the connection or sending the packet an error will be returned to the sender of the packet.
*
* @author Gaston Dombiak
*/
public class OutgoingSessionPromise implements RoutableChannelHandler {
private static final Logger Log = LoggerFactory.getLogger(OutgoingSessionPromise.class);
private static OutgoingSessionPromise instance = new OutgoingSessionPromise();
/**
* Queue that holds the packets pending to be sent to remote servers.
*/
private BlockingQueue<Packet> packets = new LinkedBlockingQueue<>(10000);
/**
* Pool of threads that will create outgoing sessions to remote servers and send
* the queued packets.
*/
private ThreadPoolExecutor threadPool;
private Map<String, PacketsProcessor> packetsProcessors = new HashMap<>();
/**
* Cache (unlimited, never expire) that holds outgoing sessions to remote servers from this server.
* Key: server domain, Value: nodeID
*/
private Cache<String, byte[]> serversCache;
/**
* Flag that indicates if the process that consumed the queued packets should stop.
*/
private boolean shutdown = false;
private RoutingTable routingTable;
private OutgoingSessionPromise() {
super();
init();
}
private void init() {
serversCache = CacheFactory.createCache(RoutingTableImpl.S2S_CACHE_NAME);
routingTable = XMPPServer.getInstance().getRoutingTable();
// Create a pool of threads that will process queued packets.
int maxThreads = JiveGlobals.getIntProperty(ConnectionSettings.Server.QUEUE_MAX_THREADS, 20);
int queueSize = JiveGlobals.getIntProperty(ConnectionSettings.Server.QUEUE_SIZE, 50);
if (maxThreads < 10) {
// Ensure that the max number of threads in the pool is at least 10
maxThreads = 10;
}
threadPool =
new ThreadPoolExecutor(maxThreads/4, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadPoolExecutor.CallerRunsPolicy());
// Start the thread that will consume the queued packets. Each pending packet will
// be actually processed by a thread of the pool (when available). If an error occurs
// while creating the remote session or sending the packet then a packet with error 502
// will be sent to the sender of the packet
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (!shutdown) {
try {
if (threadPool.getActiveCount() < threadPool.getMaximumPoolSize()) {
// Wait until a packet is available
final Packet packet = packets.take();
boolean newProcessor = false;
PacketsProcessor packetsProcessor;
String domain = packet.getTo().getDomain();
synchronized (domain.intern()) {
packetsProcessor = packetsProcessors.get(domain);
if (packetsProcessor == null) {
packetsProcessor =
new PacketsProcessor(OutgoingSessionPromise.this, domain);
packetsProcessors.put(domain, packetsProcessor);
newProcessor = true;
}
packetsProcessor.addPacket(packet);
}
if (newProcessor) {
// Process the packet in another thread
threadPool.execute(packetsProcessor);
}
}
else {
// No threads are available so take a nap :)
Thread.sleep(200);
}
}
catch (InterruptedException e) {
// Do nothing
}
catch (Exception e) {
Log.error(e.getMessage(), e);
}
}
}
}, "Queued Packets Processor");
thread.setDaemon(true);
thread.start();
}
public static OutgoingSessionPromise getInstance() {
return instance;
}
/**
* Shuts down the thread that consumes the queued packets and also stops the pool
* of threads that actually send the packets to the remote servers.
*/
public void shutdown() {
threadPool.shutdown();
shutdown = true;
}
@Override
public JID getAddress() {
// TODO Will somebody send this message to me????
return null;
}
@Override
public void process(Packet packet) {
// Queue the packet. Another process will process the queued packets.
packets.add(packet.createCopy());
}
private void processorDone(PacketsProcessor packetsProcessor) {
synchronized(packetsProcessor.getDomain().intern()) {
if (packetsProcessor.isDone()) {
packetsProcessors.remove(packetsProcessor.getDomain());
}
else {
threadPool.execute(packetsProcessor);
}
}
}
private class PacketsProcessor implements Runnable {
private OutgoingSessionPromise promise;
private String domain;
private Queue<Packet> packetQueue = new ConcurrentLinkedQueue<>();
/**
* Keep track of the last time s2s failed. Once a packet failed to be sent to a
* remote server this stamp will be used so that for the next 5 seconds future packets
* for the same domain will automatically fail. After 5 seconds a new attempt to
* establish a s2s connection and deliver pendings packets will be performed.
* This optimization is good when the server is receiving many packets per second for the
* same domain. This will help reduce high CPU consumption.
*/
private long failureTimestamp = -1;
public PacketsProcessor(OutgoingSessionPromise promise, String domain) {
this.promise = promise;
this.domain = domain;
}
@Override
public void run() {
while (!isDone()) {
Packet packet = packetQueue.poll();
if (packet != null) {
// Check if s2s already failed
if (failureTimestamp > 0) {
// Check if enough time has passed to attempt a new s2s
if (System.currentTimeMillis() - failureTimestamp < 5000) {
returnErrorToSender(packet);
Log.debug(
"OutgoingSessionPromise: Error sending packet to remote server (fast discard): " +
packet);
continue;
}
else {
// Reset timestamp of last failure since we are ready to try again doing a s2s
failureTimestamp = -1;
}
}
try {
sendPacket(packet);
}
catch (Exception e) {
returnErrorToSender(packet);
Log.debug(
"OutgoingSessionPromise: Error sending packet to remote server: " + packet,
e);
// Mark the time when s2s failed
failureTimestamp = System.currentTimeMillis();
}
}
}
promise.processorDone(this);
}
private void sendPacket(Packet packet) throws Exception {
// Create a connection to the remote server from the domain where the packet has been sent
boolean created;
// Make sure that only one cluster node is creating the outgoing connection
// TODO: Evaluate why removing the oss part causes nasty s2s and lockup issues.
Lock lock = CacheFactory.getLock(domain+"oss", serversCache);
try {
lock.lock();
created = LocalOutgoingServerSession
.authenticateDomain(packet.getFrom().getDomain(), packet.getTo().getDomain());
} finally {
lock.unlock();
}
if (created) {
if (!routingTable.hasServerRoute(packet.getTo())) {
throw new Exception("Route created but not found!!!");
}
// A connection to the remote server was created so get the route and send the packet
routingTable.routePacket(packet.getTo(), packet, false);
}
else {
throw new Exception("Failed to create connection to remote server");
}
}
private void returnErrorToSender(Packet packet) {
XMPPServer server = XMPPServer.getInstance();
JID from = packet.getFrom();
JID to = packet.getTo();
if (!server.isLocal(from) && !XMPPServer.getInstance().matchesComponent(from) &&
!server.isLocal(to) && !XMPPServer.getInstance().matchesComponent(to)) {
// Do nothing since the sender and receiver of the packet that failed to reach a remote
// server are not local users. This prevents endless loops if the FROM or TO address
// are non-existen addresses
return;
}
// TODO Send correct error condition: timeout or not_found depending on the real error
try {
if (packet instanceof IQ) {
IQ reply = new IQ();
reply.setID(packet.getID());
reply.setTo(from);
reply.setFrom(to);
reply.setChildElement(((IQ) packet).getChildElement().createCopy());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Presence) {
// workaround for OF-23. "undo" the 'setFrom' to a bare JID
// by sending the error to all available resources.
final List<JID> routes = new ArrayList<>();
if (from.getResource() == null || from.getResource().trim().length() == 0) {
routes.addAll(routingTable.getRoutes(from, null));
} else {
routes.add(from);
}
for (JID route : routes) {
Presence reply = new Presence();
reply.setID(packet.getID());
reply.setTo(route);
reply.setFrom(to);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
}
else if (packet instanceof Message) {
Message reply = new Message();
reply.setID(packet.getID());
reply.setTo(from);
reply.setFrom(to);
reply.setType(((Message)packet).getType());
reply.setThread(((Message)packet).getThread());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
}
catch (Exception e) {
Log.warn("Error returning error to sender. Original packet: " + packet, e);
}
}
public void addPacket(Packet packet) {
packetQueue.add(packet);
}
public String getDomain() {
return domain;
}
public boolean isDone() {
return packetQueue.isEmpty();
}
}
}
......@@ -64,7 +64,6 @@ import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.PacketExtension;
import org.xmpp.packet.Presence;
import com.jcraft.jzlib.JZlib;
......@@ -121,15 +120,15 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
* @param hostname the hostname of the remote server.
* @return True if the domain was authenticated by the remote server.
*/
public static boolean authenticateDomain(final String domain, final String hostname) {
public static OutgoingServerSession authenticateDomain(String domain, String hostname) {
if (hostname == null || hostname.length() == 0 || hostname.trim().indexOf(' ') > -1) {
// Do nothing if the target hostname is empty, null or contains whitespaces
return false;
return null;
}
try {
// Check if the remote hostname is in the blacklist
if (!RemoteServerManager.canAccess(hostname)) {
return false;
return null;
}
OutgoingServerSession session;
......@@ -139,7 +138,7 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
SessionManager sessionManager = SessionManager.getInstance();
if (sessionManager == null) {
// Server is shutting down while we are trying to create a new s2s connection
return false;
return null;
}
session = sessionManager.getOutgoingServerSession(hostname);
if (session == null) {
......@@ -169,25 +168,25 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
session.addHostname(hostname);
// Notify the SessionManager that a new session has been created
sessionManager.outgoingServerSessionCreated((LocalOutgoingServerSession) session);
return true;
return session;
} else {
Log.warn("Fail to connect to {} for {}", hostname, domain);
return false;
return null;
}
}
// A session already exists. The session was established using server dialback so
// it is possible to do piggybacking to authenticate more domains
if (session.getAuthenticatedDomains().contains(domain) && session.getHostnames().contains(hostname)) {
// Do nothing since the domain has already been authenticated
return true;
return session;
}
// A session already exists so authenticate the domain using that session
return session.authenticateSubdomain(domain, hostname);
if (session.authenticateSubdomain(domain, hostname)) return session;
}
catch (Exception e) {
Log.error("Error authenticating domain with remote server: " + hostname, e);
}
return false;
return null;
}
/**
......@@ -604,7 +603,7 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
if (!getAuthenticatedDomains().contains(senderDomain) &&
!authenticateSubdomain(senderDomain, packet.getTo().getDomain())) {
// Return error since sender domain was not validated by remote server
returnErrorToSender(packet);
LocalSession.returnErrorToSender(packet);
return false;
}
}
......@@ -638,59 +637,6 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
return false;
}
private void returnErrorToSender(Packet packet) {
RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
if (packet.getError() != null) {
Log.debug("Possible double bounce: " + packet.toXML());
}
try {
if (packet instanceof IQ) {
if (((IQ) packet).isResponse()) {
Log.debug("XMPP specs forbid us to respond with an IQ error to: " + packet.toXML());
return;
}
IQ reply = new IQ();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setChildElement(((IQ) packet).getChildElement().createCopy());
reply.setType(IQ.Type.error);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Presence) {
if (((Presence)packet).getType() == Presence.Type.error) {
Log.debug("Double-bounce of presence: " + packet.toXML());
return;
}
Presence reply = new Presence();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setType(Presence.Type.error);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Message) {
if (((Message)packet).getType() == Message.Type.error){
Log.debug("Double-bounce of message: " + packet.toXML());
return;
}
Message reply = new Message();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setType(Message.Type.error);
reply.setThread(((Message)packet).getThread());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
}
catch (Exception e) {
Log.error("Error returning error to sender. Original packet: " + packet, e);
}
}
@Override
public Collection<String> getAuthenticatedDomains() {
return Collections.unmodifiableCollection(authenticatedDomains);
......
......@@ -27,6 +27,7 @@ import javax.net.ssl.SSLSession;
import org.dom4j.Element;
import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.XMPPServer;
......@@ -35,6 +36,7 @@ import org.jivesoftware.openfire.interceptor.InterceptorManager;
import org.jivesoftware.openfire.interceptor.PacketRejectedException;
import org.jivesoftware.openfire.net.SocketConnection;
import org.jivesoftware.openfire.net.TLSStreamHandler;
import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.util.LocaleUtils;
import org.slf4j.Logger;
......@@ -479,4 +481,57 @@ public abstract class LocalSession implements Session {
public final Locale getLanguage() {
return language;
}
public static void returnErrorToSender(Packet packet) {
RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
if (packet.getError() != null) {
Log.debug("Possible double bounce: " + packet.toXML());
}
try {
if (packet instanceof IQ) {
if (((IQ) packet).isResponse()) {
Log.debug("XMPP specs forbid us to respond with an IQ error to: " + packet.toXML());
return;
}
IQ reply = new IQ();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setChildElement(((IQ) packet).getChildElement().createCopy());
reply.setType(IQ.Type.error);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Presence) {
if (((Presence)packet).getType() == Presence.Type.error) {
Log.debug("Double-bounce of presence: " + packet.toXML());
return;
}
Presence reply = new Presence();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setType(Presence.Type.error);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Message) {
if (((Message)packet).getType() == Message.Type.error){
Log.debug("Double-bounce of message: " + packet.toXML());
return;
}
Message reply = new Message();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setType(Message.Type.error);
reply.setThread(((Message)packet).getThread());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
}
catch (Exception e) {
Log.error("Error returning error to sender. Original packet: " + packet, e);
}
}
}
......@@ -32,7 +32,7 @@ import org.jivesoftware.openfire.component.ExternalComponentManager;
import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.forward.Forwarded;
import org.jivesoftware.openfire.handler.PresenceUpdateHandler;
import org.jivesoftware.openfire.server.OutgoingSessionPromise;
import org.jivesoftware.openfire.server.LocalOutgoingServerProxy;
import org.jivesoftware.openfire.session.*;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.cache.Cache;
......@@ -118,8 +118,21 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
}
@Override
public void addServerRoute(JID route, LocalOutgoingServerSession destination) {
public void addServerRoute(JID route, RoutableChannelHandler destination) {
String address = route.getDomain();
try {
ServerSession s = (ServerSession)destination;
ServerSession old = this.getServerRoute(route);
if (s == old) {
return; // Already done.
}
if (old == null) {
return; // This will get added later.
}
destination = new LocalOutgoingServerProxy(route, s);
} catch(Exception e) {
// Just ignore this.
}
localRoutingTable.addRoute(address, destination);
Lock lock = CacheFactory.getLock(address, serversCache);
try {
......@@ -475,10 +488,31 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
}
}
else {
// Return a promise of a remote session. This object will queue packets pending
// to be sent to remote servers
OutgoingSessionPromise.getInstance().process(packet);
routed = true;
boolean retry = false;
// If we're here, it means we have no functional route. Sort it out.
final String domain = jid.getDomain();
synchronized (domain.intern()) { // Only create one route at a time.
// Retry routing, in case someone else beat us to it before we got the lock.
if (serversCache.get(jid.getDomain()) == null) {
RoutableChannelHandler route = localRoutingTable.getRoute(jid.getDomain());
if (route == null) {
LocalOutgoingServerProxy proxy = new LocalOutgoingServerProxy(jid.getDomain());
try {
proxy.process(packet); // Put ours in first.
addServerRoute(new JID(jid.getDomain()), proxy); // At this point it may receive additional packets.
} catch (UnauthorizedException e) {
Log.error("Unable to route packet through new route: {}", packet.toXML(), e);
}
}
routed = true;
} else {
retry = true;
}
}
if (retry) {
// Curses! Need to recurse.
routed = routeToRemoteDomain(jid, packet, routed);
}
}
return routed;
}
......@@ -741,7 +775,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
@Override
public OutgoingServerSession getServerRoute(JID jid) {
// Check if this session is hosted by this cluster node
OutgoingServerSession session = (OutgoingServerSession) localRoutingTable.getRoute(jid.getDomain());
RoutableChannelHandler session = localRoutingTable.getRoute(jid.getDomain());
if (session == null) {
// The session is not in this JVM so assume remote
RemoteSessionLocator locator = server.getRemoteSessionLocator();
......@@ -752,8 +786,12 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
session = locator.getOutgoingServerSession(nodeID, jid);
}
}
} else {
// Local ones are proxies.
LocalOutgoingServerProxy proxy = (LocalOutgoingServerProxy) session;
session = proxy.getSession();
}
return session;
return (OutgoingServerSession)session;
}
@Override
......@@ -1019,7 +1057,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
Lock clientLock = CacheFactory.getLock(nodeID, usersCache);
try {
clientLock.lock();
List<String> remoteClientRoutes = new ArrayList<>();
List<String> remoteClientRoutes = new ArrayList<String>();
for (Map.Entry<String, ClientRoute> entry : usersCache.entrySet()) {
if (entry.getValue().getNodeID().equals(nodeID)) {
remoteClientRoutes.add(entry.getKey());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment