Commit 3882ca60 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Optimized (a lot) logic for starting s2s connections to several remote servers...

Optimized (a lot) logic for starting s2s connections to several remote servers at the same time. JM-657

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@6360 b35dd754-fafc-0310-a699-88a17e54d16e
parent f7922a53
...@@ -94,7 +94,7 @@ public class OutgoingServerSession extends Session { ...@@ -94,7 +94,7 @@ public class OutgoingServerSession extends Session {
* @param hostname the hostname of the remote server. * @param hostname the hostname of the remote server.
* @return True if the domain was authenticated by the remote server. * @return True if the domain was authenticated by the remote server.
*/ */
public static boolean authenticateDomain(String domain, String hostname) { static boolean authenticateDomain(String domain, String hostname) {
if (hostname == null || hostname.length() == 0 || hostname.trim().indexOf(' ') > -1) { if (hostname == null || hostname.length() == 0 || hostname.trim().indexOf(' ') > -1) {
// Do nothing if the target hostname is empty, null or contains whitespaces // Do nothing if the target hostname is empty, null or contains whitespaces
return false; return false;
...@@ -105,11 +105,12 @@ public class OutgoingServerSession extends Session { ...@@ -105,11 +105,12 @@ public class OutgoingServerSession extends Session {
return false; return false;
} }
OutgoingServerSession session;
// Check if a session, that is using server dialback, already exists to the desired // Check if a session, that is using server dialback, already exists to the desired
// hostname (i.e. remote server). If no one exists then create a new session. The same // hostname (i.e. remote server). If no one exists then create a new session. The same
// session will be used for the same hostname for all the domains to authenticate // session will be used for the same hostname for all the domains to authenticate
SessionManager sessionManager = SessionManager.getInstance(); SessionManager sessionManager = SessionManager.getInstance();
OutgoingServerSession session = sessionManager.getOutgoingServerSession(hostname); session = sessionManager.getOutgoingServerSession(hostname);
if (session == null) { if (session == null) {
// Try locating if the remote server has previously authenticated with this server // Try locating if the remote server has previously authenticated with this server
for (IncomingServerSession incomingSession : sessionManager for (IncomingServerSession incomingSession : sessionManager
...@@ -123,8 +124,7 @@ public class OutgoingServerSession extends Session { ...@@ -123,8 +124,7 @@ public class OutgoingServerSession extends Session {
// session // session
session.addHostname(hostname); session.addHostname(hostname);
break; break;
} } else {
else {
session = null; session = null;
} }
} }
...@@ -134,7 +134,6 @@ public class OutgoingServerSession extends Session { ...@@ -134,7 +134,6 @@ public class OutgoingServerSession extends Session {
if (session == null) { if (session == null) {
int port = RemoteServerManager.getPortForServer(hostname); int port = RemoteServerManager.getPortForServer(hostname);
// No session was found to the remote server so make sure that only one is created // No session was found to the remote server so make sure that only one is created
synchronized (hostname.intern()) {
session = sessionManager.getOutgoingServerSession(hostname); session = sessionManager.getOutgoingServerSession(hostname);
if (session == null) { if (session == null) {
session = createOutgoingSession(domain, hostname, port); session = createOutgoingSession(domain, hostname, port);
...@@ -146,8 +145,7 @@ public class OutgoingServerSession extends Session { ...@@ -146,8 +145,7 @@ public class OutgoingServerSession extends Session {
// Notify the SessionManager that a new session has been created // Notify the SessionManager that a new session has been created
sessionManager.outgoingServerSessionCreated(session); sessionManager.outgoingServerSessionCreated(session);
return true; return true;
} } else {
else {
// Ensure that the hostname is not an IP address (i.e. contains chars) // Ensure that the hostname is not an IP address (i.e. contains chars)
if (!pattern.matcher(hostname).find()) { if (!pattern.matcher(hostname).find()) {
return false; return false;
...@@ -191,8 +189,7 @@ public class OutgoingServerSession extends Session { ...@@ -191,8 +189,7 @@ public class OutgoingServerSession extends Session {
// Add the new hostname to the found session // Add the new hostname to the found session
session.addHostname(newHostname); session.addHostname(newHostname);
return true; return true;
} } else {
else {
index = hostname.indexOf('.', index + 1); index = hostname.indexOf('.', index + 1);
} }
} }
...@@ -200,7 +197,6 @@ public class OutgoingServerSession extends Session { ...@@ -200,7 +197,6 @@ public class OutgoingServerSession extends Session {
} }
} }
} }
}
// A session already exists. The session was established using server dialback so // A session already exists. The session was established using server dialback so
// it is possible to do piggybacking to authenticate more domains // it is possible to do piggybacking to authenticate more domains
if (session.getAuthenticatedDomains().contains(domain)) { if (session.getAuthenticatedDomains().contains(domain)) {
......
...@@ -20,10 +20,10 @@ import org.jivesoftware.wildfire.XMPPServer; ...@@ -20,10 +20,10 @@ import org.jivesoftware.wildfire.XMPPServer;
import org.jivesoftware.wildfire.auth.UnauthorizedException; import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.xmpp.packet.*; import org.xmpp.packet.*;
import java.util.concurrent.BlockingQueue; import java.util.HashMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor; import java.util.Queue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.*;
/** /**
* An OutgoingSessionPromise provides an asynchronic way for sending packets to remote servers. * An OutgoingSessionPromise provides an asynchronic way for sending packets to remote servers.
...@@ -52,6 +52,8 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -52,6 +52,8 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
*/ */
private ThreadPoolExecutor threadPool; private ThreadPoolExecutor threadPool;
private Map<String, PacketsProcessor> packetsProcessors = new HashMap<String, PacketsProcessor>();
/** /**
* Flag that indicates if the process that consumed the queued packets should stop. * Flag that indicates if the process that consumed the queued packets should stop.
*/ */
...@@ -88,20 +90,25 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -88,20 +90,25 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
if (threadPool.getActiveCount() < threadPool.getMaximumPoolSize()) { if (threadPool.getActiveCount() < threadPool.getMaximumPoolSize()) {
// Wait until a packet is available // Wait until a packet is available
final Packet packet = packets.take(); final Packet packet = packets.take();
// Process the packet in another thread
threadPool.execute(new Runnable() { boolean newProcessor = false;
public void run() { PacketsProcessor packetsProcessor;
try { String domain = packet.getTo().getDomain();
createSessionAndSendPacket(packet); synchronized (domain.intern()) {
} packetsProcessor = packetsProcessors.get(domain);
catch (Exception e) { if (packetsProcessor == null) {
returnErrorToSender(packet); packetsProcessor =
Log.debug( new PacketsProcessor(OutgoingSessionPromise.this, domain, routingTable);
"Error sending packet to remote server: " + packet, packetsProcessors.put(domain, packetsProcessor);
e); newProcessor = true;
}
packetsProcessor.addPacket(packet);
} }
if (newProcessor) {
// Process the packet in another thread
threadPool.execute(packetsProcessor);
} }
});
} }
else { else {
// No threads are available so take a nap :) // No threads are available so take a nap :)
...@@ -125,7 +132,68 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -125,7 +132,68 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
return instance; return instance;
} }
private void createSessionAndSendPacket(Packet packet) throws Exception { /**
* Shuts down the thread that consumes the queued packets and also stops the pool
* of threads that actually send the packets to the remote servers.
*/
public void shutdown() {
threadPool.shutdown();
shutdown = true;
}
public JID getAddress() {
// TODO Will somebody send this message to me????
return null;
}
public void process(Packet packet) {
// Queue the packet. Another process will process the queued packets.
packets.add(packet.createCopy());
}
private void processorDone(PacketsProcessor packetsProcessor) {
synchronized(packetsProcessor.getDomain().intern()) {
if (packetsProcessor.isDone()) {
packetsProcessors.remove(packetsProcessor.getDomain());
}
else {
threadPool.execute(packetsProcessor);
}
}
}
private static class PacketsProcessor implements Runnable {
private OutgoingSessionPromise promise;
private String domain;
private RoutingTable routingTable;
private Queue<Packet> packets = new ConcurrentLinkedQueue<Packet>();
public PacketsProcessor(OutgoingSessionPromise promise, String domain, RoutingTable routingTable) {
this.promise = promise;
this.domain = domain;
this.routingTable = routingTable;
}
public void run() {
while (!isDone()) {
Packet packet = packets.poll();
if (packet != null) {
try {
sendPacket(packet);
}
catch (Exception e) {
returnErrorToSender(packet);
Log.debug(
"Error sending packet to remote server: " + packet,
e);
}
}
}
promise.processorDone(this);
}
private void sendPacket(Packet packet) throws Exception {
// Create a connection to the remote server from the domain where the packet has been sent // Create a connection to the remote server from the domain where the packet has been sent
boolean created = OutgoingServerSession boolean created = OutgoingServerSession
.authenticateDomain(packet.getFrom().getDomain(), packet.getTo().getDomain()); .authenticateDomain(packet.getFrom().getDomain(), packet.getTo().getDomain());
...@@ -202,22 +270,16 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -202,22 +270,16 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
} }
} }
/** public void addPacket(Packet packet) {
* Shuts down the thread that consumes the queued packets and also stops the pool packets.add(packet);
* of threads that actually send the packets to the remote servers.
*/
public void shutdown() {
threadPool.shutdown();
shutdown = true;
} }
public JID getAddress() { public String getDomain() {
// TODO Will somebody send this message to me???? return domain;
return null;
} }
public void process(Packet packet) { public boolean isDone() {
// Queue the packet. Another process will process the queued packets. return packets.isEmpty();
packets.add(packet.createCopy()); }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment