Commit 5958fbbf authored by Dave Cridland's avatar Dave Cridland

Merge pull request #404 from igniterealtime/of-115-inbound-3.10

OF-115 Process S2S stanzas from a session serially
parents 288fa128 5adcad6b
...@@ -61,23 +61,9 @@ public class ServerSocketReader extends SocketReader { ...@@ -61,23 +61,9 @@ public class ServerSocketReader extends SocketReader {
private static final Logger Log = LoggerFactory.getLogger(ServerSocketReader.class); private static final Logger Log = LoggerFactory.getLogger(ServerSocketReader.class);
/**
* Pool of threads that are available for processing the requests.
*/
private ThreadPoolExecutor threadPool;
public ServerSocketReader(PacketRouter router, RoutingTable routingTable, String serverName, public ServerSocketReader(PacketRouter router, RoutingTable routingTable, String serverName,
Socket socket, SocketConnection connection, boolean useBlockingMode) { Socket socket, SocketConnection connection, boolean useBlockingMode) {
super(router, routingTable, serverName, socket, connection, useBlockingMode); super(router, routingTable, serverName, socket, connection, useBlockingMode);
// Create a pool of threads that will process received packets. If more threads are
// required then the command will be executed on the SocketReader process
int coreThreads = JiveGlobals.getIntProperty("xmpp.server.processing.core.threads", 2);
int maxThreads = JiveGlobals.getIntProperty("xmpp.server.processing.max.threads", 50);
int queueSize = JiveGlobals.getIntProperty("xmpp.server.processing.queue", 50);
threadPool =
new ThreadPoolExecutor(coreThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadPoolExecutor.CallerRunsPolicy());
} }
/** /**
...@@ -89,17 +75,12 @@ public class ServerSocketReader extends SocketReader { ...@@ -89,17 +75,12 @@ public class ServerSocketReader extends SocketReader {
protected void processIQ(final IQ packet) throws UnauthorizedException { protected void processIQ(final IQ packet) throws UnauthorizedException {
try { try {
packetReceived(packet); packetReceived(packet);
// Process the packet in another thread try {
threadPool.execute(new Runnable() { super.processIQ(packet);
public void run() { }
try { catch (UnauthorizedException e) {
ServerSocketReader.super.processIQ(packet); Log.error("Error processing packet", e);
} }
catch (UnauthorizedException e) {
Log.error("Error processing packet", e);
}
}
});
} }
catch (PacketRejectedException e) { catch (PacketRejectedException e) {
Log.debug("IQ rejected: " + packet.toXML(), e); Log.debug("IQ rejected: " + packet.toXML(), e);
...@@ -115,17 +96,12 @@ public class ServerSocketReader extends SocketReader { ...@@ -115,17 +96,12 @@ public class ServerSocketReader extends SocketReader {
protected void processPresence(final Presence packet) throws UnauthorizedException { protected void processPresence(final Presence packet) throws UnauthorizedException {
try { try {
packetReceived(packet); packetReceived(packet);
// Process the packet in another thread try {
threadPool.execute(new Runnable() { super.processPresence(packet);
public void run() { }
try { catch (UnauthorizedException e) {
ServerSocketReader.super.processPresence(packet); Log.error("Error processing packet", e);
} }
catch (UnauthorizedException e) {
Log.error("Error processing packet", e);
}
}
});
} }
catch (PacketRejectedException e) { catch (PacketRejectedException e) {
Log.debug("Presence rejected: " + packet.toXML(), e); Log.debug("Presence rejected: " + packet.toXML(), e);
...@@ -141,17 +117,12 @@ public class ServerSocketReader extends SocketReader { ...@@ -141,17 +117,12 @@ public class ServerSocketReader extends SocketReader {
protected void processMessage(final Message packet) throws UnauthorizedException { protected void processMessage(final Message packet) throws UnauthorizedException {
try { try {
packetReceived(packet); packetReceived(packet);
// Process the packet in another thread try {
threadPool.execute(new Runnable() { ServerSocketReader.super.processMessage(packet);
public void run() { }
try { catch (UnauthorizedException e) {
ServerSocketReader.super.processMessage(packet); Log.error("Error processing packet", e);
} }
catch (UnauthorizedException e) {
Log.error("Error processing packet", e);
}
}
});
} }
catch (PacketRejectedException e) { catch (PacketRejectedException e) {
Log.debug("Message rejected: " + packet.toXML(), e); Log.debug("Message rejected: " + packet.toXML(), e);
...@@ -220,9 +191,6 @@ public class ServerSocketReader extends SocketReader { ...@@ -220,9 +191,6 @@ public class ServerSocketReader extends SocketReader {
@Override @Override
protected void shutdown() { protected void shutdown() {
super.shutdown(); super.shutdown();
// Shutdown the pool of threads that are processing packets sent by
// the remote server
threadPool.shutdown();
} }
@Override @Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment