Commit d0667e21 authored by Dave Cridland's avatar Dave Cridland

OF-115 Remove race during outgoing session creation

This one is hard to duplicate in the lab, so this may not actually cure the
issue, however I suspect it does.

The existing code is based around a route of last resort, the
OutgoingSessionPromise, which builds a helper - the PacketsProcessor - and
creates the new session. The PacketsProcessors operate from within a thread
pool, and thus have contended execution time.

As new packets arrives from onward transmission during the session setup, the
PacketsProcessor queues them for retransmission. However, the act of session
creation also creates the outgoing route, meaning that later packets will
bypass the queue, and be transmitted out of order.

This should give the impression of a randomly reordered set of stanzas, but
would in fact be two consecutive sequences of ordered stanzas which are
interleaved, hence if the two sequences are A-E, F-J, one might see:

A, B, F, G, C, H, I, D, J, E

The fix here is two ensure that all packets are queued until the queue itself
is empty. In the solution provided, an additional helper class acts as the
route's RoutableChannelHandler, and stores packets in a queue until the real
ServerSession can catch up with the pending traffic. After that it simply
passes the process() call argument through.
parent 4d6a8055
...@@ -94,7 +94,7 @@ public interface RoutingTable { ...@@ -94,7 +94,7 @@ public interface RoutingTable {
* @param route the address associated to the route. * @param route the address associated to the route.
* @param destination the outgoing server session. * @param destination the outgoing server session.
*/ */
void addServerRoute(JID route, LocalOutgoingServerSession destination); void addServerRoute(JID route, RoutableChannelHandler destination);
/** /**
* Adds a route to the routing table for the specified internal or external component. <p> * Adds a route to the routing table for the specified internal or external component. <p>
......
...@@ -54,7 +54,6 @@ import org.jivesoftware.openfire.event.SessionEventDispatcher; ...@@ -54,7 +54,6 @@ import org.jivesoftware.openfire.event.SessionEventDispatcher;
import org.jivesoftware.openfire.http.HttpConnection; import org.jivesoftware.openfire.http.HttpConnection;
import org.jivesoftware.openfire.http.HttpSession; import org.jivesoftware.openfire.http.HttpSession;
import org.jivesoftware.openfire.multiplex.ConnectionMultiplexerManager; import org.jivesoftware.openfire.multiplex.ConnectionMultiplexerManager;
import org.jivesoftware.openfire.server.OutgoingSessionPromise;
import org.jivesoftware.openfire.session.ClientSession; import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.ClientSessionInfo; import org.jivesoftware.openfire.session.ClientSessionInfo;
import org.jivesoftware.openfire.session.ComponentSession; import org.jivesoftware.openfire.session.ComponentSession;
...@@ -1488,8 +1487,6 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -1488,8 +1487,6 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
@Override @Override
public void stop() { public void stop() {
Log.debug("SessionManager: Stopping server"); Log.debug("SessionManager: Stopping server");
// Stop threads that are sending packets to remote servers
OutgoingSessionPromise.getInstance().shutdown();
if (JiveGlobals.getBooleanProperty("shutdownMessage.enabled")) { if (JiveGlobals.getBooleanProperty("shutdownMessage.enabled")) {
sendServerMessage(null, LocaleUtils.getLocalizedString("admin.shutdown.now")); sendServerMessage(null, LocaleUtils.getLocalizedString("admin.shutdown.now"));
} }
......
/**
*
*/
package org.jivesoftware.openfire.server;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.jivesoftware.openfire.PacketException;
import org.jivesoftware.openfire.RoutableChannelHandler;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.session.ConnectionSettings;
import org.jivesoftware.openfire.session.LocalOutgoingServerSession;
import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.ServerSession;
import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.util.JiveGlobals;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence;
/**
* @author dwd
*
*/
public class LocalOutgoingServerProxy implements RoutableChannelHandler {
private static final Logger log = LoggerFactory.getLogger(LocalOutgoingServerProxy.class);
private JID domain;
private ServerSession session;
private Queue<Packet> packets;
private static ExecutorService pool = createPool();
private long failureTimestamp = -1;
private boolean isTrying;
private static ExecutorService createPool() {
// Create a pool of threads that will process queued session requests.
int maxThreads = JiveGlobals.getIntProperty(ConnectionSettings.Server.QUEUE_MAX_THREADS, 20);
if (maxThreads < 10) {
// Ensure that the max number of threads in the pool is at least 10
maxThreads = 10;
}
ExecutorService pool = Executors.newFixedThreadPool(maxThreads);
return pool;
}
public LocalOutgoingServerProxy(final JID domain) {
this.domain = domain;
this.session = null;
this.packets = null;
}
public LocalOutgoingServerProxy(final String domain) {
this.domain = new JID(domain);
this.session = null;
this.packets = null;
}
public LocalOutgoingServerProxy(final JID domain, ServerSession session) {
this.domain = domain;
this.session = null;
this.packets = null;
}
public LocalOutgoingServerProxy(final String domain, ServerSession session) {
this.domain = new JID(domain);
this.session = null;
this.packets = null;
}
/* (non-Javadoc)
* @see org.jivesoftware.openfire.ChannelHandler#process(org.xmpp.packet.Packet)
*/
@Override
public synchronized void process(final Packet packet) throws UnauthorizedException,
PacketException {
if (this.session != null) {
this.session.process(packet);
return;
}
if (packets == null) {
packets = new LinkedBlockingQueue<Packet>();
log.info("Queued packet for {}.", domain.toString());
}
packets.add(packet.createCopy());
if (isTrying == false) {
final String fromDomain = packet.getFrom().getDomain().toString();
final String toDomain = packet.getTo().getDomain().toString();
if ((failureTimestamp == -1) || ((System.currentTimeMillis() - failureTimestamp) >= 5000)) {
isTrying = true;
log.debug("Spinning up new session to {}", domain.toString());
pool.execute(new Runnable() {
public void run() {
log.debug("Initiating connection thread for {} -> {} ({})", fromDomain, toDomain, domain.toString());
try {
ServerSession s = LocalOutgoingServerSession.authenticateDomain(fromDomain, toDomain); // Long-running.
if (s != null) {
sessionReady(s);
} else {
sessionFailed();
}
} catch(Exception e) {
log.debug("Session for {} failed with:", domain.toString(), e);
sessionFailed();
}
log.debug("Finished connection thread for {}", domain.toString());
return;
}
});
} else {
sessionFailed();
}
} else {
// Session creation in progress.
packets.add(packet);
}
}
protected synchronized void sessionReady(ServerSession session) {
isTrying = false;
log.debug("Spun up new session to {}", domain.toString());
int sent = 0;
this.session = session;
while (!this.packets.isEmpty()) {
Packet packet = this.packets.remove();
this.session.process(packet);
sent = sent + 1;
}
this.packets = null;
log.debug("Done, sent {} pending stanzas.", sent);
}
protected synchronized void sessionFailed() {
isTrying = false;
log.debug("Failed to spin up new session to {}", domain.toString());
while (!this.packets.isEmpty()) {
Packet packet = this.packets.remove();
LocalSession.returnErrorToSender(packet);
}
this.packets = null;
}
/* (non-Javadoc)
* @see org.jivesoftware.openfire.RoutableChannelHandler#getAddress()
*/
@Override
public JID getAddress() {
return this.domain;
}
public ServerSession getSession() {
return this.session;
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2005-2008 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.openfire.server;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import org.jivesoftware.openfire.RoutableChannelHandler;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.session.ConnectionSettings;
import org.jivesoftware.openfire.session.LocalOutgoingServerSession;
import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence;
/**
* An OutgoingSessionPromise provides an asynchronic way for sending packets to remote servers.
* When looking for a route to a remote server that does not have an existing connection, a session
* promise is returned.
*
* This class will queue packets and process them in another thread. The processing thread will
* use a pool of thread that will actually do the hard work. The threads in the pool will try
* to connect to remote servers and deliver the packets. If an error occurred while establishing
* the connection or sending the packet an error will be returned to the sender of the packet.
*
* @author Gaston Dombiak
*/
public class OutgoingSessionPromise implements RoutableChannelHandler {
private static final Logger Log = LoggerFactory.getLogger(OutgoingSessionPromise.class);
private static OutgoingSessionPromise instance = new OutgoingSessionPromise();
/**
* Queue that holds the packets pending to be sent to remote servers.
*/
private BlockingQueue<Packet> packets = new LinkedBlockingQueue<>(10000);
/**
* Pool of threads that will create outgoing sessions to remote servers and send
* the queued packets.
*/
private ThreadPoolExecutor threadPool;
private Map<String, PacketsProcessor> packetsProcessors = new HashMap<>();
/**
* Cache (unlimited, never expire) that holds outgoing sessions to remote servers from this server.
* Key: server domain, Value: nodeID
*/
private Cache<String, byte[]> serversCache;
/**
* Flag that indicates if the process that consumed the queued packets should stop.
*/
private boolean shutdown = false;
private RoutingTable routingTable;
private OutgoingSessionPromise() {
super();
init();
}
private void init() {
serversCache = CacheFactory.createCache(RoutingTableImpl.S2S_CACHE_NAME);
routingTable = XMPPServer.getInstance().getRoutingTable();
// Create a pool of threads that will process queued packets.
int maxThreads = JiveGlobals.getIntProperty(ConnectionSettings.Server.QUEUE_MAX_THREADS, 20);
int queueSize = JiveGlobals.getIntProperty(ConnectionSettings.Server.QUEUE_SIZE, 50);
if (maxThreads < 10) {
// Ensure that the max number of threads in the pool is at least 10
maxThreads = 10;
}
threadPool =
new ThreadPoolExecutor(maxThreads/4, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadPoolExecutor.CallerRunsPolicy());
// Start the thread that will consume the queued packets. Each pending packet will
// be actually processed by a thread of the pool (when available). If an error occurs
// while creating the remote session or sending the packet then a packet with error 502
// will be sent to the sender of the packet
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (!shutdown) {
try {
if (threadPool.getActiveCount() < threadPool.getMaximumPoolSize()) {
// Wait until a packet is available
final Packet packet = packets.take();
boolean newProcessor = false;
PacketsProcessor packetsProcessor;
String domain = packet.getTo().getDomain();
synchronized (domain.intern()) {
packetsProcessor = packetsProcessors.get(domain);
if (packetsProcessor == null) {
packetsProcessor =
new PacketsProcessor(OutgoingSessionPromise.this, domain);
packetsProcessors.put(domain, packetsProcessor);
newProcessor = true;
}
packetsProcessor.addPacket(packet);
}
if (newProcessor) {
// Process the packet in another thread
threadPool.execute(packetsProcessor);
}
}
else {
// No threads are available so take a nap :)
Thread.sleep(200);
}
}
catch (InterruptedException e) {
// Do nothing
}
catch (Exception e) {
Log.error(e.getMessage(), e);
}
}
}
}, "Queued Packets Processor");
thread.setDaemon(true);
thread.start();
}
public static OutgoingSessionPromise getInstance() {
return instance;
}
/**
* Shuts down the thread that consumes the queued packets and also stops the pool
* of threads that actually send the packets to the remote servers.
*/
public void shutdown() {
threadPool.shutdown();
shutdown = true;
}
@Override
public JID getAddress() {
// TODO Will somebody send this message to me????
return null;
}
@Override
public void process(Packet packet) {
// Queue the packet. Another process will process the queued packets.
packets.add(packet.createCopy());
}
private void processorDone(PacketsProcessor packetsProcessor) {
synchronized(packetsProcessor.getDomain().intern()) {
if (packetsProcessor.isDone()) {
packetsProcessors.remove(packetsProcessor.getDomain());
}
else {
threadPool.execute(packetsProcessor);
}
}
}
private class PacketsProcessor implements Runnable {
private OutgoingSessionPromise promise;
private String domain;
private Queue<Packet> packetQueue = new ConcurrentLinkedQueue<>();
/**
* Keep track of the last time s2s failed. Once a packet failed to be sent to a
* remote server this stamp will be used so that for the next 5 seconds future packets
* for the same domain will automatically fail. After 5 seconds a new attempt to
* establish a s2s connection and deliver pendings packets will be performed.
* This optimization is good when the server is receiving many packets per second for the
* same domain. This will help reduce high CPU consumption.
*/
private long failureTimestamp = -1;
public PacketsProcessor(OutgoingSessionPromise promise, String domain) {
this.promise = promise;
this.domain = domain;
}
@Override
public void run() {
while (!isDone()) {
Packet packet = packetQueue.poll();
if (packet != null) {
// Check if s2s already failed
if (failureTimestamp > 0) {
// Check if enough time has passed to attempt a new s2s
if (System.currentTimeMillis() - failureTimestamp < 5000) {
returnErrorToSender(packet);
Log.debug(
"OutgoingSessionPromise: Error sending packet to remote server (fast discard): " +
packet);
continue;
}
else {
// Reset timestamp of last failure since we are ready to try again doing a s2s
failureTimestamp = -1;
}
}
try {
sendPacket(packet);
}
catch (Exception e) {
returnErrorToSender(packet);
Log.debug(
"OutgoingSessionPromise: Error sending packet to remote server: " + packet,
e);
// Mark the time when s2s failed
failureTimestamp = System.currentTimeMillis();
}
}
}
promise.processorDone(this);
}
private void sendPacket(Packet packet) throws Exception {
// Create a connection to the remote server from the domain where the packet has been sent
boolean created;
// Make sure that only one cluster node is creating the outgoing connection
// TODO: Evaluate why removing the oss part causes nasty s2s and lockup issues.
Lock lock = CacheFactory.getLock(domain+"oss", serversCache);
try {
lock.lock();
created = LocalOutgoingServerSession
.authenticateDomain(packet.getFrom().getDomain(), packet.getTo().getDomain());
} finally {
lock.unlock();
}
if (created) {
if (!routingTable.hasServerRoute(packet.getTo())) {
throw new Exception("Route created but not found!!!");
}
// A connection to the remote server was created so get the route and send the packet
routingTable.routePacket(packet.getTo(), packet, false);
}
else {
throw new Exception("Failed to create connection to remote server");
}
}
private void returnErrorToSender(Packet packet) {
XMPPServer server = XMPPServer.getInstance();
JID from = packet.getFrom();
JID to = packet.getTo();
if (!server.isLocal(from) && !XMPPServer.getInstance().matchesComponent(from) &&
!server.isLocal(to) && !XMPPServer.getInstance().matchesComponent(to)) {
// Do nothing since the sender and receiver of the packet that failed to reach a remote
// server are not local users. This prevents endless loops if the FROM or TO address
// are non-existen addresses
return;
}
// TODO Send correct error condition: timeout or not_found depending on the real error
try {
if (packet instanceof IQ) {
IQ reply = new IQ();
reply.setID(packet.getID());
reply.setTo(from);
reply.setFrom(to);
reply.setChildElement(((IQ) packet).getChildElement().createCopy());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Presence) {
// workaround for OF-23. "undo" the 'setFrom' to a bare JID
// by sending the error to all available resources.
final List<JID> routes = new ArrayList<>();
if (from.getResource() == null || from.getResource().trim().length() == 0) {
routes.addAll(routingTable.getRoutes(from, null));
} else {
routes.add(from);
}
for (JID route : routes) {
Presence reply = new Presence();
reply.setID(packet.getID());
reply.setTo(route);
reply.setFrom(to);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
}
else if (packet instanceof Message) {
Message reply = new Message();
reply.setID(packet.getID());
reply.setTo(from);
reply.setFrom(to);
reply.setType(((Message)packet).getType());
reply.setThread(((Message)packet).getThread());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
}
catch (Exception e) {
Log.warn("Error returning error to sender. Original packet: " + packet, e);
}
}
public void addPacket(Packet packet) {
packetQueue.add(packet);
}
public String getDomain() {
return domain;
}
public boolean isDone() {
return packetQueue.isEmpty();
}
}
}
...@@ -64,7 +64,6 @@ import org.xmpp.packet.JID; ...@@ -64,7 +64,6 @@ import org.xmpp.packet.JID;
import org.xmpp.packet.Message; import org.xmpp.packet.Message;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError; import org.xmpp.packet.PacketError;
import org.xmpp.packet.PacketExtension;
import org.xmpp.packet.Presence; import org.xmpp.packet.Presence;
import com.jcraft.jzlib.JZlib; import com.jcraft.jzlib.JZlib;
...@@ -121,15 +120,15 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou ...@@ -121,15 +120,15 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
* @param hostname the hostname of the remote server. * @param hostname the hostname of the remote server.
* @return True if the domain was authenticated by the remote server. * @return True if the domain was authenticated by the remote server.
*/ */
public static boolean authenticateDomain(final String domain, final String hostname) { public static OutgoingServerSession authenticateDomain(String domain, String hostname) {
if (hostname == null || hostname.length() == 0 || hostname.trim().indexOf(' ') > -1) { if (hostname == null || hostname.length() == 0 || hostname.trim().indexOf(' ') > -1) {
// Do nothing if the target hostname is empty, null or contains whitespaces // Do nothing if the target hostname is empty, null or contains whitespaces
return false; return null;
} }
try { try {
// Check if the remote hostname is in the blacklist // Check if the remote hostname is in the blacklist
if (!RemoteServerManager.canAccess(hostname)) { if (!RemoteServerManager.canAccess(hostname)) {
return false; return null;
} }
OutgoingServerSession session; OutgoingServerSession session;
...@@ -139,7 +138,7 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou ...@@ -139,7 +138,7 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
SessionManager sessionManager = SessionManager.getInstance(); SessionManager sessionManager = SessionManager.getInstance();
if (sessionManager == null) { if (sessionManager == null) {
// Server is shutting down while we are trying to create a new s2s connection // Server is shutting down while we are trying to create a new s2s connection
return false; return null;
} }
session = sessionManager.getOutgoingServerSession(hostname); session = sessionManager.getOutgoingServerSession(hostname);
if (session == null) { if (session == null) {
...@@ -169,25 +168,25 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou ...@@ -169,25 +168,25 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
session.addHostname(hostname); session.addHostname(hostname);
// Notify the SessionManager that a new session has been created // Notify the SessionManager that a new session has been created
sessionManager.outgoingServerSessionCreated((LocalOutgoingServerSession) session); sessionManager.outgoingServerSessionCreated((LocalOutgoingServerSession) session);
return true; return session;
} else { } else {
Log.warn("Fail to connect to {} for {}", hostname, domain); Log.warn("Fail to connect to {} for {}", hostname, domain);
return false; return null;
} }
} }
// A session already exists. The session was established using server dialback so // A session already exists. The session was established using server dialback so
// it is possible to do piggybacking to authenticate more domains // it is possible to do piggybacking to authenticate more domains
if (session.getAuthenticatedDomains().contains(domain) && session.getHostnames().contains(hostname)) { if (session.getAuthenticatedDomains().contains(domain) && session.getHostnames().contains(hostname)) {
// Do nothing since the domain has already been authenticated // Do nothing since the domain has already been authenticated
return true; return session;
} }
// A session already exists so authenticate the domain using that session // A session already exists so authenticate the domain using that session
return session.authenticateSubdomain(domain, hostname); if (session.authenticateSubdomain(domain, hostname)) return session;
} }
catch (Exception e) { catch (Exception e) {
Log.error("Error authenticating domain with remote server: " + hostname, e); Log.error("Error authenticating domain with remote server: " + hostname, e);
} }
return false; return null;
} }
/** /**
...@@ -604,7 +603,7 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou ...@@ -604,7 +603,7 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
if (!getAuthenticatedDomains().contains(senderDomain) && if (!getAuthenticatedDomains().contains(senderDomain) &&
!authenticateSubdomain(senderDomain, packet.getTo().getDomain())) { !authenticateSubdomain(senderDomain, packet.getTo().getDomain())) {
// Return error since sender domain was not validated by remote server // Return error since sender domain was not validated by remote server
returnErrorToSender(packet); LocalSession.returnErrorToSender(packet);
return false; return false;
} }
} }
...@@ -638,59 +637,6 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou ...@@ -638,59 +637,6 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
return false; return false;
} }
private void returnErrorToSender(Packet packet) {
RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
if (packet.getError() != null) {
Log.debug("Possible double bounce: " + packet.toXML());
}
try {
if (packet instanceof IQ) {
if (((IQ) packet).isResponse()) {
Log.debug("XMPP specs forbid us to respond with an IQ error to: " + packet.toXML());
return;
}
IQ reply = new IQ();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setChildElement(((IQ) packet).getChildElement().createCopy());
reply.setType(IQ.Type.error);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Presence) {
if (((Presence)packet).getType() == Presence.Type.error) {
Log.debug("Double-bounce of presence: " + packet.toXML());
return;
}
Presence reply = new Presence();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setType(Presence.Type.error);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Message) {
if (((Message)packet).getType() == Message.Type.error){
Log.debug("Double-bounce of message: " + packet.toXML());
return;
}
Message reply = new Message();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setType(Message.Type.error);
reply.setThread(((Message)packet).getThread());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
}
catch (Exception e) {
Log.error("Error returning error to sender. Original packet: " + packet, e);
}
}
@Override @Override
public Collection<String> getAuthenticatedDomains() { public Collection<String> getAuthenticatedDomains() {
return Collections.unmodifiableCollection(authenticatedDomains); return Collections.unmodifiableCollection(authenticatedDomains);
......
...@@ -27,6 +27,7 @@ import javax.net.ssl.SSLSession; ...@@ -27,6 +27,7 @@ import javax.net.ssl.SSLSession;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.openfire.Connection; import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.SessionManager; import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID; import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
...@@ -35,6 +36,7 @@ import org.jivesoftware.openfire.interceptor.InterceptorManager; ...@@ -35,6 +36,7 @@ import org.jivesoftware.openfire.interceptor.InterceptorManager;
import org.jivesoftware.openfire.interceptor.PacketRejectedException; import org.jivesoftware.openfire.interceptor.PacketRejectedException;
import org.jivesoftware.openfire.net.SocketConnection; import org.jivesoftware.openfire.net.SocketConnection;
import org.jivesoftware.openfire.net.TLSStreamHandler; import org.jivesoftware.openfire.net.TLSStreamHandler;
import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.openfire.streammanagement.StreamManager; import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -479,4 +481,57 @@ public abstract class LocalSession implements Session { ...@@ -479,4 +481,57 @@ public abstract class LocalSession implements Session {
public final Locale getLanguage() { public final Locale getLanguage() {
return language; return language;
} }
public static void returnErrorToSender(Packet packet) {
RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
if (packet.getError() != null) {
Log.debug("Possible double bounce: " + packet.toXML());
}
try {
if (packet instanceof IQ) {
if (((IQ) packet).isResponse()) {
Log.debug("XMPP specs forbid us to respond with an IQ error to: " + packet.toXML());
return;
}
IQ reply = new IQ();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setChildElement(((IQ) packet).getChildElement().createCopy());
reply.setType(IQ.Type.error);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Presence) {
if (((Presence)packet).getType() == Presence.Type.error) {
Log.debug("Double-bounce of presence: " + packet.toXML());
return;
}
Presence reply = new Presence();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setType(Presence.Type.error);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Message) {
if (((Message)packet).getType() == Message.Type.error){
Log.debug("Double-bounce of message: " + packet.toXML());
return;
}
Message reply = new Message();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setType(Message.Type.error);
reply.setThread(((Message)packet).getThread());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
}
catch (Exception e) {
Log.error("Error returning error to sender. Original packet: " + packet, e);
}
}
} }
...@@ -32,7 +32,7 @@ import org.jivesoftware.openfire.component.ExternalComponentManager; ...@@ -32,7 +32,7 @@ import org.jivesoftware.openfire.component.ExternalComponentManager;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.forward.Forwarded; import org.jivesoftware.openfire.forward.Forwarded;
import org.jivesoftware.openfire.handler.PresenceUpdateHandler; import org.jivesoftware.openfire.handler.PresenceUpdateHandler;
import org.jivesoftware.openfire.server.OutgoingSessionPromise; import org.jivesoftware.openfire.server.LocalOutgoingServerProxy;
import org.jivesoftware.openfire.session.*; import org.jivesoftware.openfire.session.*;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
...@@ -118,8 +118,21 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -118,8 +118,21 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
} }
@Override @Override
public void addServerRoute(JID route, LocalOutgoingServerSession destination) { public void addServerRoute(JID route, RoutableChannelHandler destination) {
String address = route.getDomain(); String address = route.getDomain();
try {
ServerSession s = (ServerSession)destination;
ServerSession old = this.getServerRoute(route);
if (s == old) {
return; // Already done.
}
if (old == null) {
return; // This will get added later.
}
destination = new LocalOutgoingServerProxy(route, s);
} catch(Exception e) {
// Just ignore this.
}
localRoutingTable.addRoute(address, destination); localRoutingTable.addRoute(address, destination);
Lock lock = CacheFactory.getLock(address, serversCache); Lock lock = CacheFactory.getLock(address, serversCache);
try { try {
...@@ -475,10 +488,31 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -475,10 +488,31 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
} }
} }
else { else {
// Return a promise of a remote session. This object will queue packets pending boolean retry = false;
// to be sent to remote servers // If we're here, it means we have no functional route. Sort it out.
OutgoingSessionPromise.getInstance().process(packet); final String domain = jid.getDomain();
routed = true; synchronized (domain.intern()) { // Only create one route at a time.
// Retry routing, in case someone else beat us to it before we got the lock.
if (serversCache.get(jid.getDomain()) == null) {
RoutableChannelHandler route = localRoutingTable.getRoute(jid.getDomain());
if (route == null) {
LocalOutgoingServerProxy proxy = new LocalOutgoingServerProxy(jid.getDomain());
try {
proxy.process(packet); // Put ours in first.
addServerRoute(new JID(jid.getDomain()), proxy); // At this point it may receive additional packets.
} catch (UnauthorizedException e) {
Log.error("Unable to route packet through new route: {}", packet.toXML(), e);
}
}
routed = true;
} else {
retry = true;
}
}
if (retry) {
// Curses! Need to recurse.
routed = routeToRemoteDomain(jid, packet, routed);
}
} }
return routed; return routed;
} }
...@@ -741,7 +775,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -741,7 +775,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
@Override @Override
public OutgoingServerSession getServerRoute(JID jid) { public OutgoingServerSession getServerRoute(JID jid) {
// Check if this session is hosted by this cluster node // Check if this session is hosted by this cluster node
OutgoingServerSession session = (OutgoingServerSession) localRoutingTable.getRoute(jid.getDomain()); RoutableChannelHandler session = localRoutingTable.getRoute(jid.getDomain());
if (session == null) { if (session == null) {
// The session is not in this JVM so assume remote // The session is not in this JVM so assume remote
RemoteSessionLocator locator = server.getRemoteSessionLocator(); RemoteSessionLocator locator = server.getRemoteSessionLocator();
...@@ -752,8 +786,12 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -752,8 +786,12 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
session = locator.getOutgoingServerSession(nodeID, jid); session = locator.getOutgoingServerSession(nodeID, jid);
} }
} }
} else {
// Local ones are proxies.
LocalOutgoingServerProxy proxy = (LocalOutgoingServerProxy) session;
session = proxy.getSession();
} }
return session; return (OutgoingServerSession)session;
} }
@Override @Override
...@@ -1019,7 +1057,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -1019,7 +1057,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
Lock clientLock = CacheFactory.getLock(nodeID, usersCache); Lock clientLock = CacheFactory.getLock(nodeID, usersCache);
try { try {
clientLock.lock(); clientLock.lock();
List<String> remoteClientRoutes = new ArrayList<>(); List<String> remoteClientRoutes = new ArrayList<String>();
for (Map.Entry<String, ClientRoute> entry : usersCache.entrySet()) { for (Map.Entry<String, ClientRoute> entry : usersCache.entrySet()) {
if (entry.getValue().getNodeID().equals(nodeID)) { if (entry.getValue().getNodeID().equals(nodeID)) {
remoteClientRoutes.add(entry.getKey()); remoteClientRoutes.add(entry.getKey());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment