Commit 5adcad6b authored by Dave Cridland's avatar Dave Cridland

OF-115 Process S2S stanzas from a session serially

This patch removes the thread pool from the ServerSocketReader, causing
stanzas from a particular S2S session to be processed serially on the same
thread used by the ServerSocketReader.

Each ServerSocketReader currently has a thread to itself, so this is unlikely
to have a serious impact on performance, but does have a very positive impact
on correctness.
parent 288fa128
......@@ -61,23 +61,9 @@ public class ServerSocketReader extends SocketReader {
private static final Logger Log = LoggerFactory.getLogger(ServerSocketReader.class);
/**
* Pool of threads that are available for processing the requests.
*/
private ThreadPoolExecutor threadPool;
public ServerSocketReader(PacketRouter router, RoutingTable routingTable, String serverName,
Socket socket, SocketConnection connection, boolean useBlockingMode) {
super(router, routingTable, serverName, socket, connection, useBlockingMode);
// Create a pool of threads that will process received packets. If more threads are
// required then the command will be executed on the SocketReader process
int coreThreads = JiveGlobals.getIntProperty("xmpp.server.processing.core.threads", 2);
int maxThreads = JiveGlobals.getIntProperty("xmpp.server.processing.max.threads", 50);
int queueSize = JiveGlobals.getIntProperty("xmpp.server.processing.queue", 50);
threadPool =
new ThreadPoolExecutor(coreThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
......@@ -89,18 +75,13 @@ public class ServerSocketReader extends SocketReader {
protected void processIQ(final IQ packet) throws UnauthorizedException {
try {
packetReceived(packet);
// Process the packet in another thread
threadPool.execute(new Runnable() {
public void run() {
try {
ServerSocketReader.super.processIQ(packet);
super.processIQ(packet);
}
catch (UnauthorizedException e) {
Log.error("Error processing packet", e);
}
}
});
}
catch (PacketRejectedException e) {
Log.debug("IQ rejected: " + packet.toXML(), e);
}
......@@ -115,18 +96,13 @@ public class ServerSocketReader extends SocketReader {
protected void processPresence(final Presence packet) throws UnauthorizedException {
try {
packetReceived(packet);
// Process the packet in another thread
threadPool.execute(new Runnable() {
public void run() {
try {
ServerSocketReader.super.processPresence(packet);
super.processPresence(packet);
}
catch (UnauthorizedException e) {
Log.error("Error processing packet", e);
}
}
});
}
catch (PacketRejectedException e) {
Log.debug("Presence rejected: " + packet.toXML(), e);
}
......@@ -141,9 +117,6 @@ public class ServerSocketReader extends SocketReader {
protected void processMessage(final Message packet) throws UnauthorizedException {
try {
packetReceived(packet);
// Process the packet in another thread
threadPool.execute(new Runnable() {
public void run() {
try {
ServerSocketReader.super.processMessage(packet);
}
......@@ -151,8 +124,6 @@ public class ServerSocketReader extends SocketReader {
Log.error("Error processing packet", e);
}
}
});
}
catch (PacketRejectedException e) {
Log.debug("Message rejected: " + packet.toXML(), e);
}
......@@ -220,9 +191,6 @@ public class ServerSocketReader extends SocketReader {
@Override
protected void shutdown() {
super.shutdown();
// Shutdown the pool of threads that are processing packets sent by
// the remote server
threadPool.shutdown();
}
@Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment