Commit 2d62c7e1 authored by Dave Cridland's avatar Dave Cridland

OF-115 Process S2S stanzas from a session serially

This patch removes the thread pool from the ServerSocketReader, causing
stanzas from a particular S2S session to be processed serially on the same
thread used by the ServerSocketReader.

Each ServerSocketReader currently has a thread to itself, so this is unlikely
to have a serious impact on performance, but does have a very positive impact
on correctness.
parent 21f57aa5
...@@ -61,23 +61,9 @@ public class ServerSocketReader extends SocketReader { ...@@ -61,23 +61,9 @@ public class ServerSocketReader extends SocketReader {
private static final Logger Log = LoggerFactory.getLogger(ServerSocketReader.class); private static final Logger Log = LoggerFactory.getLogger(ServerSocketReader.class);
/**
* Pool of threads that are available for processing the requests.
*/
private ThreadPoolExecutor threadPool;
public ServerSocketReader(PacketRouter router, RoutingTable routingTable, String serverName, public ServerSocketReader(PacketRouter router, RoutingTable routingTable, String serverName,
Socket socket, SocketConnection connection, boolean useBlockingMode) { Socket socket, SocketConnection connection, boolean useBlockingMode) {
super(router, routingTable, serverName, socket, connection, useBlockingMode); super(router, routingTable, serverName, socket, connection, useBlockingMode);
// Create a pool of threads that will process received packets. If more threads are
// required then the command will be executed on the SocketReader process
int coreThreads = JiveGlobals.getIntProperty("xmpp.server.processing.core.threads", 2);
int maxThreads = JiveGlobals.getIntProperty("xmpp.server.processing.max.threads", 50);
int queueSize = JiveGlobals.getIntProperty("xmpp.server.processing.queue", 50);
threadPool =
new ThreadPoolExecutor(coreThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadPoolExecutor.CallerRunsPolicy());
} }
/** /**
...@@ -89,19 +75,13 @@ public class ServerSocketReader extends SocketReader { ...@@ -89,19 +75,13 @@ public class ServerSocketReader extends SocketReader {
protected void processIQ(final IQ packet) throws UnauthorizedException { protected void processIQ(final IQ packet) throws UnauthorizedException {
try { try {
packetReceived(packet); packetReceived(packet);
// Process the packet in another thread
threadPool.execute(new Runnable() {
@Override
public void run() {
try { try {
ServerSocketReader.super.processIQ(packet); super.processIQ(packet);
} }
catch (UnauthorizedException e) { catch (UnauthorizedException e) {
Log.error("Error processing packet", e); Log.error("Error processing packet", e);
} }
} }
});
}
catch (PacketRejectedException e) { catch (PacketRejectedException e) {
Log.debug("IQ rejected: " + packet.toXML(), e); Log.debug("IQ rejected: " + packet.toXML(), e);
} }
...@@ -116,19 +96,13 @@ public class ServerSocketReader extends SocketReader { ...@@ -116,19 +96,13 @@ public class ServerSocketReader extends SocketReader {
protected void processPresence(final Presence packet) throws UnauthorizedException { protected void processPresence(final Presence packet) throws UnauthorizedException {
try { try {
packetReceived(packet); packetReceived(packet);
// Process the packet in another thread
threadPool.execute(new Runnable() {
@Override
public void run() {
try { try {
ServerSocketReader.super.processPresence(packet); super.processPresence(packet);
} }
catch (UnauthorizedException e) { catch (UnauthorizedException e) {
Log.error("Error processing packet", e); Log.error("Error processing packet", e);
} }
} }
});
}
catch (PacketRejectedException e) { catch (PacketRejectedException e) {
Log.debug("Presence rejected: " + packet.toXML(), e); Log.debug("Presence rejected: " + packet.toXML(), e);
} }
...@@ -143,10 +117,6 @@ public class ServerSocketReader extends SocketReader { ...@@ -143,10 +117,6 @@ public class ServerSocketReader extends SocketReader {
protected void processMessage(final Message packet) throws UnauthorizedException { protected void processMessage(final Message packet) throws UnauthorizedException {
try { try {
packetReceived(packet); packetReceived(packet);
// Process the packet in another thread
threadPool.execute(new Runnable() {
@Override
public void run() {
try { try {
ServerSocketReader.super.processMessage(packet); ServerSocketReader.super.processMessage(packet);
} }
...@@ -154,8 +124,6 @@ public class ServerSocketReader extends SocketReader { ...@@ -154,8 +124,6 @@ public class ServerSocketReader extends SocketReader {
Log.error("Error processing packet", e); Log.error("Error processing packet", e);
} }
} }
});
}
catch (PacketRejectedException e) { catch (PacketRejectedException e) {
Log.debug("Message rejected: " + packet.toXML(), e); Log.debug("Message rejected: " + packet.toXML(), e);
} }
...@@ -223,9 +191,6 @@ public class ServerSocketReader extends SocketReader { ...@@ -223,9 +191,6 @@ public class ServerSocketReader extends SocketReader {
@Override @Override
protected void shutdown() { protected void shutdown() {
super.shutdown(); super.shutdown();
// Shutdown the pool of threads that are processing packets sent by
// the remote server
threadPool.shutdown();
} }
@Override @Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment