Commit 8c554c16 authored by daryl herzmann's avatar daryl herzmann Committed by GitHub

Merge pull request #787 from guusdk/OF-1324_Limit-outbound-s2s-queue

OF-1324 limit outbound s2s queue
parents 24785a49 1d1c6ad8
...@@ -21,11 +21,7 @@ import java.util.HashMap; ...@@ -21,11 +21,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import org.jivesoftware.openfire.RoutableChannelHandler; import org.jivesoftware.openfire.RoutableChannelHandler;
...@@ -195,11 +191,14 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -195,11 +191,14 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
} }
} }
private class PacketsProcessor implements Runnable { private class PacketsProcessor implements Runnable
{
private final Logger Log = LoggerFactory.getLogger( PacketsProcessor.class );
private OutgoingSessionPromise promise; private OutgoingSessionPromise promise;
private String domain; private String domain;
private Queue<Packet> packetQueue = new ConcurrentLinkedQueue<>(); private Queue<Packet> packetQueue = new ArrayBlockingQueue<>( JiveGlobals.getIntProperty(ConnectionSettings.Server.QUEUE_SIZE, 50) );
/** /**
* Keep track of the last time s2s failed. Once a packet failed to be sent to a * Keep track of the last time s2s failed. Once a packet failed to be sent to a
* remote server this stamp will be used so that for the next 5 seconds future packets * remote server this stamp will be used so that for the next 5 seconds future packets
...@@ -225,9 +224,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -225,9 +224,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
// Check if enough time has passed to attempt a new s2s // Check if enough time has passed to attempt a new s2s
if (System.currentTimeMillis() - failureTimestamp < 5000) { if (System.currentTimeMillis() - failureTimestamp < 5000) {
returnErrorToSender(packet); returnErrorToSender(packet);
Log.debug( Log.debug( "Error sending packet to domain '{}' (fast discard): {}", domain, packet );
"OutgoingSessionPromise: Error sending packet to remote server (fast discard): " +
packet);
continue; continue;
} }
else { else {
...@@ -240,9 +237,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -240,9 +237,7 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
} }
catch (Exception e) { catch (Exception e) {
returnErrorToSender(packet); returnErrorToSender(packet);
Log.debug( Log.debug( "Error sending packet to domain '{}': {}", domain, packet, e );
"OutgoingSessionPromise: Error sending packet to remote server: " + packet,
e);
// Mark the time when s2s failed // Mark the time when s2s failed
failureTimestamp = System.currentTimeMillis(); failureTimestamp = System.currentTimeMillis();
} }
...@@ -330,12 +325,17 @@ public class OutgoingSessionPromise implements RoutableChannelHandler { ...@@ -330,12 +325,17 @@ public class OutgoingSessionPromise implements RoutableChannelHandler {
} }
} }
catch (Exception e) { catch (Exception e) {
Log.warn("Error returning error to sender. Original packet: " + packet, e); Log.warn( "An exception occurred while trying to returning a remote-server-not-found error (for domain '{}') to the original sender. Original packet: {}", domain, packet, e );
} }
} }
public void addPacket(Packet packet) { void addPacket( Packet packet )
packetQueue.add(packet); {
if ( !packetQueue.offer( packet ) )
{
returnErrorToSender(packet);
Log.debug( "Error sending packet to domain '{}' (outbound queue full): {}", domain, packet );
}
} }
public String getDomain() { public String getDomain() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment