Commit a1b493b2 authored by Dave Cridland's avatar Dave Cridland

Revert "OF-115 Remove race during outgoing session creation"

This reverts commit d0667e21.

While I think the general code is right, it's clearly got serious shortcomings
so I think it's best to back this out for now.
parent efa38c3d
...@@ -94,7 +94,7 @@ public interface RoutingTable { ...@@ -94,7 +94,7 @@ public interface RoutingTable {
* @param route the address associated to the route. * @param route the address associated to the route.
* @param destination the outgoing server session. * @param destination the outgoing server session.
*/ */
void addServerRoute(JID route, RoutableChannelHandler destination); void addServerRoute(JID route, LocalOutgoingServerSession destination);
/** /**
* Adds a route to the routing table for the specified internal or external component. <p> * Adds a route to the routing table for the specified internal or external component. <p>
......
...@@ -54,6 +54,7 @@ import org.jivesoftware.openfire.event.SessionEventDispatcher; ...@@ -54,6 +54,7 @@ import org.jivesoftware.openfire.event.SessionEventDispatcher;
import org.jivesoftware.openfire.http.HttpConnection; import org.jivesoftware.openfire.http.HttpConnection;
import org.jivesoftware.openfire.http.HttpSession; import org.jivesoftware.openfire.http.HttpSession;
import org.jivesoftware.openfire.multiplex.ConnectionMultiplexerManager; import org.jivesoftware.openfire.multiplex.ConnectionMultiplexerManager;
import org.jivesoftware.openfire.server.OutgoingSessionPromise;
import org.jivesoftware.openfire.session.ClientSession; import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.ClientSessionInfo; import org.jivesoftware.openfire.session.ClientSessionInfo;
import org.jivesoftware.openfire.session.ComponentSession; import org.jivesoftware.openfire.session.ComponentSession;
...@@ -1484,6 +1485,8 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -1484,6 +1485,8 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
@Override @Override
public void stop() { public void stop() {
Log.debug("SessionManager: Stopping server"); Log.debug("SessionManager: Stopping server");
// Stop threads that are sending packets to remote servers
OutgoingSessionPromise.getInstance().shutdown();
if (JiveGlobals.getBooleanProperty("shutdownMessage.enabled")) { if (JiveGlobals.getBooleanProperty("shutdownMessage.enabled")) {
sendServerMessage(null, LocaleUtils.getLocalizedString("admin.shutdown.now")); sendServerMessage(null, LocaleUtils.getLocalizedString("admin.shutdown.now"));
} }
......
/**
*
*/
package org.jivesoftware.openfire.server;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.jivesoftware.openfire.PacketException;
import org.jivesoftware.openfire.RoutableChannelHandler;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.session.ConnectionSettings;
import org.jivesoftware.openfire.session.LocalOutgoingServerSession;
import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.ServerSession;
import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.util.JiveGlobals;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence;
/**
* @author dwd
*
*/
public class LocalOutgoingServerProxy implements RoutableChannelHandler {
private static final Logger log = LoggerFactory.getLogger(LocalOutgoingServerProxy.class);
private JID domain;
private ServerSession session;
private Queue<Packet> packets;
private static ExecutorService pool = createPool();
private long failureTimestamp = -1;
private boolean isTrying;
private static ExecutorService createPool() {
// Create a pool of threads that will process queued session requests.
int maxThreads = JiveGlobals.getIntProperty(ConnectionSettings.Server.QUEUE_MAX_THREADS, 20);
if (maxThreads < 10) {
// Ensure that the max number of threads in the pool is at least 10
maxThreads = 10;
}
ExecutorService pool = Executors.newFixedThreadPool(maxThreads);
return pool;
}
public LocalOutgoingServerProxy(final JID domain) {
this.domain = domain;
this.session = null;
this.packets = null;
}
public LocalOutgoingServerProxy(final String domain) {
this.domain = new JID(domain);
this.session = null;
this.packets = null;
}
public LocalOutgoingServerProxy(final JID domain, ServerSession session) {
this.domain = domain;
this.session = null;
this.packets = null;
}
public LocalOutgoingServerProxy(final String domain, ServerSession session) {
this.domain = new JID(domain);
this.session = null;
this.packets = null;
}
/* (non-Javadoc)
* @see org.jivesoftware.openfire.ChannelHandler#process(org.xmpp.packet.Packet)
*/
@Override
public synchronized void process(final Packet packet) throws UnauthorizedException,
PacketException {
if (this.session != null) {
this.session.process(packet);
return;
}
if (packets == null) {
packets = new LinkedBlockingQueue<Packet>();
log.info("Queued packet for {}.", domain.toString());
}
packets.add(packet.createCopy());
if (isTrying == false) {
final String fromDomain = packet.getFrom().getDomain().toString();
final String toDomain = packet.getTo().getDomain().toString();
if ((failureTimestamp == -1) || ((System.currentTimeMillis() - failureTimestamp) >= 5000)) {
isTrying = true;
log.debug("Spinning up new session to {}", domain.toString());
pool.execute(new Runnable() {
public void run() {
log.debug("Initiating connection thread for {} -> {} ({})", fromDomain, toDomain, domain.toString());
try {
ServerSession s = LocalOutgoingServerSession.authenticateDomain(fromDomain, toDomain); // Long-running.
if (s != null) {
sessionReady(s);
} else {
sessionFailed();
}
} catch(Exception e) {
log.debug("Session for {} failed with:", domain.toString(), e);
sessionFailed();
}
log.debug("Finished connection thread for {}", domain.toString());
return;
}
});
} else {
sessionFailed();
}
} else {
// Session creation in progress.
packets.add(packet);
}
}
protected synchronized void sessionReady(ServerSession session) {
isTrying = false;
log.debug("Spun up new session to {}", domain.toString());
int sent = 0;
this.session = session;
while (this.packets != null && !this.packets.isEmpty()) {
Packet packet = this.packets.remove();
this.session.process(packet);
sent = sent + 1;
}
this.packets = null;
log.debug("Done, sent {} pending stanzas.", sent);
}
protected synchronized void sessionFailed() {
isTrying = false;
log.debug("Failed to spin up new session to {}", domain.toString());
while (this.packets != null && !this.packets.isEmpty()) {
Packet packet = this.packets.remove();
LocalSession.returnErrorToSender(packet);
}
this.packets = null;
}
/* (non-Javadoc)
* @see org.jivesoftware.openfire.RoutableChannelHandler#getAddress()
*/
@Override
public JID getAddress() {
return this.domain;
}
public ServerSession getSession() {
return this.session;
}
}
...@@ -64,6 +64,7 @@ import org.xmpp.packet.JID; ...@@ -64,6 +64,7 @@ import org.xmpp.packet.JID;
import org.xmpp.packet.Message; import org.xmpp.packet.Message;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError; import org.xmpp.packet.PacketError;
import org.xmpp.packet.PacketExtension;
import org.xmpp.packet.Presence; import org.xmpp.packet.Presence;
import com.jcraft.jzlib.JZlib; import com.jcraft.jzlib.JZlib;
...@@ -120,15 +121,15 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou ...@@ -120,15 +121,15 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
* @param hostname the hostname of the remote server. * @param hostname the hostname of the remote server.
* @return True if the domain was authenticated by the remote server. * @return True if the domain was authenticated by the remote server.
*/ */
public static OutgoingServerSession authenticateDomain(String domain, String hostname) { public static boolean authenticateDomain(final String domain, final String hostname) {
if (hostname == null || hostname.length() == 0 || hostname.trim().indexOf(' ') > -1) { if (hostname == null || hostname.length() == 0 || hostname.trim().indexOf(' ') > -1) {
// Do nothing if the target hostname is empty, null or contains whitespaces // Do nothing if the target hostname is empty, null or contains whitespaces
return null; return false;
} }
try { try {
// Check if the remote hostname is in the blacklist // Check if the remote hostname is in the blacklist
if (!RemoteServerManager.canAccess(hostname)) { if (!RemoteServerManager.canAccess(hostname)) {
return null; return false;
} }
OutgoingServerSession session; OutgoingServerSession session;
...@@ -138,7 +139,7 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou ...@@ -138,7 +139,7 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
SessionManager sessionManager = SessionManager.getInstance(); SessionManager sessionManager = SessionManager.getInstance();
if (sessionManager == null) { if (sessionManager == null) {
// Server is shutting down while we are trying to create a new s2s connection // Server is shutting down while we are trying to create a new s2s connection
return null; return false;
} }
session = sessionManager.getOutgoingServerSession(hostname); session = sessionManager.getOutgoingServerSession(hostname);
if (session == null) { if (session == null) {
...@@ -168,25 +169,25 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou ...@@ -168,25 +169,25 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
session.addHostname(hostname); session.addHostname(hostname);
// Notify the SessionManager that a new session has been created // Notify the SessionManager that a new session has been created
sessionManager.outgoingServerSessionCreated((LocalOutgoingServerSession) session); sessionManager.outgoingServerSessionCreated((LocalOutgoingServerSession) session);
return session; return true;
} else { } else {
Log.warn("Fail to connect to {} for {}", hostname, domain); Log.warn("Fail to connect to {} for {}", hostname, domain);
return null; return false;
} }
} }
// A session already exists. The session was established using server dialback so // A session already exists. The session was established using server dialback so
// it is possible to do piggybacking to authenticate more domains // it is possible to do piggybacking to authenticate more domains
if (session.getAuthenticatedDomains().contains(domain) && session.getHostnames().contains(hostname)) { if (session.getAuthenticatedDomains().contains(domain) && session.getHostnames().contains(hostname)) {
// Do nothing since the domain has already been authenticated // Do nothing since the domain has already been authenticated
return session; return true;
} }
// A session already exists so authenticate the domain using that session // A session already exists so authenticate the domain using that session
if (session.authenticateSubdomain(domain, hostname)) return session; return session.authenticateSubdomain(domain, hostname);
} }
catch (Exception e) { catch (Exception e) {
Log.error("Error authenticating domain with remote server: " + hostname, e); Log.error("Error authenticating domain with remote server: " + hostname, e);
} }
return null; return false;
} }
/** /**
...@@ -594,7 +595,7 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou ...@@ -594,7 +595,7 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
if (!getAuthenticatedDomains().contains(senderDomain) && if (!getAuthenticatedDomains().contains(senderDomain) &&
!authenticateSubdomain(senderDomain, packet.getTo().getDomain())) { !authenticateSubdomain(senderDomain, packet.getTo().getDomain())) {
// Return error since sender domain was not validated by remote server // Return error since sender domain was not validated by remote server
LocalSession.returnErrorToSender(packet); returnErrorToSender(packet);
return false; return false;
} }
} }
...@@ -628,6 +629,59 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou ...@@ -628,6 +629,59 @@ public class LocalOutgoingServerSession extends LocalServerSession implements Ou
return false; return false;
} }
private void returnErrorToSender(Packet packet) {
RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
if (packet.getError() != null) {
Log.debug("Possible double bounce: " + packet.toXML());
}
try {
if (packet instanceof IQ) {
if (((IQ) packet).isResponse()) {
Log.debug("XMPP specs forbid us to respond with an IQ error to: " + packet.toXML());
return;
}
IQ reply = new IQ();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setChildElement(((IQ) packet).getChildElement().createCopy());
reply.setType(IQ.Type.error);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Presence) {
if (((Presence)packet).getType() == Presence.Type.error) {
Log.debug("Double-bounce of presence: " + packet.toXML());
return;
}
Presence reply = new Presence();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setType(Presence.Type.error);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Message) {
if (((Message)packet).getType() == Message.Type.error){
Log.debug("Double-bounce of message: " + packet.toXML());
return;
}
Message reply = new Message();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setType(Message.Type.error);
reply.setThread(((Message)packet).getThread());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
}
catch (Exception e) {
Log.error("Error returning error to sender. Original packet: " + packet, e);
}
}
@Override @Override
public Collection<String> getAuthenticatedDomains() { public Collection<String> getAuthenticatedDomains() {
return Collections.unmodifiableCollection(authenticatedDomains); return Collections.unmodifiableCollection(authenticatedDomains);
......
...@@ -27,7 +27,6 @@ import javax.net.ssl.SSLSession; ...@@ -27,7 +27,6 @@ import javax.net.ssl.SSLSession;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.openfire.Connection; import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.SessionManager; import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID; import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
...@@ -36,7 +35,6 @@ import org.jivesoftware.openfire.interceptor.InterceptorManager; ...@@ -36,7 +35,6 @@ import org.jivesoftware.openfire.interceptor.InterceptorManager;
import org.jivesoftware.openfire.interceptor.PacketRejectedException; import org.jivesoftware.openfire.interceptor.PacketRejectedException;
import org.jivesoftware.openfire.net.SocketConnection; import org.jivesoftware.openfire.net.SocketConnection;
import org.jivesoftware.openfire.net.TLSStreamHandler; import org.jivesoftware.openfire.net.TLSStreamHandler;
import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.openfire.streammanagement.StreamManager; import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -481,57 +479,4 @@ public abstract class LocalSession implements Session { ...@@ -481,57 +479,4 @@ public abstract class LocalSession implements Session {
public final Locale getLanguage() { public final Locale getLanguage() {
return language; return language;
} }
public static void returnErrorToSender(Packet packet) {
RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
if (packet.getError() != null) {
Log.debug("Possible double bounce: " + packet.toXML());
}
try {
if (packet instanceof IQ) {
if (((IQ) packet).isResponse()) {
Log.debug("XMPP specs forbid us to respond with an IQ error to: " + packet.toXML());
return;
}
IQ reply = new IQ();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setChildElement(((IQ) packet).getChildElement().createCopy());
reply.setType(IQ.Type.error);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Presence) {
if (((Presence)packet).getType() == Presence.Type.error) {
Log.debug("Double-bounce of presence: " + packet.toXML());
return;
}
Presence reply = new Presence();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setType(Presence.Type.error);
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
else if (packet instanceof Message) {
if (((Message)packet).getType() == Message.Type.error){
Log.debug("Double-bounce of message: " + packet.toXML());
return;
}
Message reply = new Message();
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setType(Message.Type.error);
reply.setThread(((Message)packet).getThread());
reply.setError(PacketError.Condition.remote_server_not_found);
routingTable.routePacket(reply.getTo(), reply, true);
}
}
catch (Exception e) {
Log.error("Error returning error to sender. Original packet: " + packet, e);
}
}
} }
...@@ -32,7 +32,7 @@ import org.jivesoftware.openfire.component.ExternalComponentManager; ...@@ -32,7 +32,7 @@ import org.jivesoftware.openfire.component.ExternalComponentManager;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.forward.Forwarded; import org.jivesoftware.openfire.forward.Forwarded;
import org.jivesoftware.openfire.handler.PresenceUpdateHandler; import org.jivesoftware.openfire.handler.PresenceUpdateHandler;
import org.jivesoftware.openfire.server.LocalOutgoingServerProxy; import org.jivesoftware.openfire.server.OutgoingSessionPromise;
import org.jivesoftware.openfire.session.*; import org.jivesoftware.openfire.session.*;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
...@@ -118,21 +118,8 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -118,21 +118,8 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
} }
@Override @Override
public void addServerRoute(JID route, RoutableChannelHandler destination) { public void addServerRoute(JID route, LocalOutgoingServerSession destination) {
String address = route.getDomain(); String address = route.getDomain();
try {
ServerSession s = (ServerSession)destination;
ServerSession old = this.getServerRoute(route);
if (s == old) {
return; // Already done.
}
if (old == null) {
return; // This will get added later.
}
destination = new LocalOutgoingServerProxy(route, s);
} catch(Exception e) {
// Just ignore this.
}
localRoutingTable.addRoute(address, destination); localRoutingTable.addRoute(address, destination);
Lock lock = CacheFactory.getLock(address, serversCache); Lock lock = CacheFactory.getLock(address, serversCache);
try { try {
...@@ -488,31 +475,10 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -488,31 +475,10 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
} }
} }
else { else {
boolean retry = false; // Return a promise of a remote session. This object will queue packets pending
// If we're here, it means we have no functional route. Sort it out. // to be sent to remote servers
final String domain = jid.getDomain(); OutgoingSessionPromise.getInstance().process(packet);
synchronized (domain.intern()) { // Only create one route at a time. routed = true;
// Retry routing, in case someone else beat us to it before we got the lock.
if (serversCache.get(jid.getDomain()) == null) {
RoutableChannelHandler route = localRoutingTable.getRoute(jid.getDomain());
if (route == null) {
LocalOutgoingServerProxy proxy = new LocalOutgoingServerProxy(jid.getDomain());
try {
proxy.process(packet); // Put ours in first.
addServerRoute(new JID(jid.getDomain()), proxy); // At this point it may receive additional packets.
} catch (UnauthorizedException e) {
Log.error("Unable to route packet through new route: {}", packet.toXML(), e);
}
}
routed = true;
} else {
retry = true;
}
}
if (retry) {
// Curses! Need to recurse.
routed = routeToRemoteDomain(jid, packet, routed);
}
} }
return routed; return routed;
} }
...@@ -775,7 +741,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -775,7 +741,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
@Override @Override
public OutgoingServerSession getServerRoute(JID jid) { public OutgoingServerSession getServerRoute(JID jid) {
// Check if this session is hosted by this cluster node // Check if this session is hosted by this cluster node
RoutableChannelHandler session = localRoutingTable.getRoute(jid.getDomain()); OutgoingServerSession session = (OutgoingServerSession) localRoutingTable.getRoute(jid.getDomain());
if (session == null) { if (session == null) {
// The session is not in this JVM so assume remote // The session is not in this JVM so assume remote
RemoteSessionLocator locator = server.getRemoteSessionLocator(); RemoteSessionLocator locator = server.getRemoteSessionLocator();
...@@ -786,12 +752,8 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -786,12 +752,8 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
session = locator.getOutgoingServerSession(nodeID, jid); session = locator.getOutgoingServerSession(nodeID, jid);
} }
} }
} else {
// Local ones are proxies.
LocalOutgoingServerProxy proxy = (LocalOutgoingServerProxy) session;
session = proxy.getSession();
} }
return (OutgoingServerSession)session; return session;
} }
@Override @Override
...@@ -1057,7 +1019,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -1057,7 +1019,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
Lock clientLock = CacheFactory.getLock(nodeID, usersCache); Lock clientLock = CacheFactory.getLock(nodeID, usersCache);
try { try {
clientLock.lock(); clientLock.lock();
List<String> remoteClientRoutes = new ArrayList<String>(); List<String> remoteClientRoutes = new ArrayList<>();
for (Map.Entry<String, ClientRoute> entry : usersCache.entrySet()) { for (Map.Entry<String, ClientRoute> entry : usersCache.entrySet()) {
if (entry.getValue().getNodeID().equals(nodeID)) { if (entry.getValue().getNodeID().equals(nodeID)) {
remoteClientRoutes.add(entry.getKey()); remoteClientRoutes.add(entry.getKey());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment