Commit 2045424f authored by Guus der Kinderen's avatar Guus der Kinderen

OF-885: Improved connection handling.

parent 4e33aa0d
...@@ -600,9 +600,10 @@ public class HttpSession extends LocalClientSession { ...@@ -600,9 +600,10 @@ public class HttpSession extends LocalClientSession {
isPoll = false; isPoll = false;
else if (rootNode.attributeValue("pause") != null) else if (rootNode.attributeValue("pause") != null)
isPoll = false; isPoll = false;
HttpConnection connection = this.createConnection(rid, elements, isSecure, isPoll, context); HttpConnection connection = this.createConnection(rid, isSecure, isPoll, context);
if (elements.size() > 0) { if (elements.size() > 0) {
// creates the runnable to forward the packets // creates the runnable to forward the packets
packetsToSend.add(elements);
new HttpPacketSender(this).init(); new HttpPacketSender(this).init();
} }
...@@ -680,7 +681,6 @@ public class HttpSession extends LocalClientSession { ...@@ -680,7 +681,6 @@ public class HttpSession extends LocalClientSession {
* response. * response.
* *
* @param rid the request id related to the connection. * @param rid the request id related to the connection.
* @param packetsToBeSent any packets that this connection should send.
* @param isSecure true if the connection was secured using HTTPS. * @param isSecure true if the connection was secured using HTTPS.
* @return the created {@link org.jivesoftware.openfire.http.HttpConnection} which represents * @return the created {@link org.jivesoftware.openfire.http.HttpConnection} which represents
* the connection. * the connection.
...@@ -690,8 +690,7 @@ public class HttpSession extends LocalClientSession { ...@@ -690,8 +690,7 @@ public class HttpSession extends LocalClientSession {
* @throws HttpBindException if the connection has violated a facet of the HTTP binding * @throws HttpBindException if the connection has violated a facet of the HTTP binding
* protocol. * protocol.
*/ */
synchronized HttpConnection createConnection(long rid, Collection<Element> packetsToBeSent, synchronized HttpConnection createConnection(long rid, boolean isSecure, boolean isPoll, AsyncContext context)
boolean isSecure, boolean isPoll, AsyncContext context)
throws HttpConnectionClosedException, HttpBindException, IOException throws HttpConnectionClosedException, HttpBindException, IOException
{ {
final HttpConnection connection = new HttpConnection(rid, isSecure, sslCertificates, context); final HttpConnection connection = new HttpConnection(rid, isSecure, sslCertificates, context);
...@@ -721,16 +720,17 @@ public class HttpSession extends LocalClientSession { ...@@ -721,16 +720,17 @@ public class HttpSession extends LocalClientSession {
lastRequestID = connection.getRequestId(); lastRequestID = connection.getRequestId();
} catch (HttpConnectionClosedException e) { } catch (HttpConnectionClosedException e) {
Log.warn("Unexpected exception while processing connection timeout.", e); Log.warn("Unexpected exception while processing connection timeout.", e);
} finally {
connectionQueue.remove(connection);
fireConnectionClosed(connection);
} }
// Note that 'onComplete' will be invoked.
} }
@Override @Override
public void onError(AsyncEvent asyncEvent) throws IOException { public void onError(AsyncEvent asyncEvent) throws IOException {
Log.debug("error event " + asyncEvent); Log.debug("error event " + asyncEvent);
Log.warn("Unhandled AsyncListener error: " + asyncEvent.getThrowable()); Log.warn("Unhandled AsyncListener error: " + asyncEvent.getThrowable());
connectionQueue.remove(connection);
fireConnectionClosed(connection);
} }
@Override @Override
...@@ -754,9 +754,6 @@ public class HttpSession extends LocalClientSession { ...@@ -754,9 +754,6 @@ public class HttpSession extends LocalClientSession {
BoshBindingError.itemNotFound); BoshBindingError.itemNotFound);
} }
if (packetsToBeSent.size() > 0) {
packetsToSend.add(packetsToBeSent);
}
addConnection(connection, isPoll); addConnection(connection, isPoll);
return connection; return connection;
} }
...@@ -834,13 +831,12 @@ public class HttpSession extends LocalClientSession { ...@@ -834,13 +831,12 @@ public class HttpSession extends LocalClientSession {
// We aren't supposed to hold connections open or we already have some packets waiting // We aren't supposed to hold connections open or we already have some packets waiting
// to be sent to the client. // to be sent to the client.
if (isPollingSession() || (pendingElements.size() > 0 && connection.getRequestId() == lastRequestID + 1)) { if (isPollingSession() || (pendingElements.size() > 0 && connection.getRequestId() == lastRequestID + 1)) {
synchronized(pendingElements) { fireConnectionOpened(connection);
deliver(connection, pendingElements); synchronized(pendingElements) {
lastRequestID = connection.getRequestId(); deliver(connection, pendingElements);
pendingElements.clear(); lastRequestID = connection.getRequestId();
} pendingElements.clear();
connectionQueue.add(connection); }
Collections.sort(connectionQueue, connectionComparator);
} }
else { else {
// With this connection we need to check if we will have too many connections open, // With this connection we need to check if we will have too many connections open,
...@@ -878,7 +874,6 @@ public class HttpSession extends LocalClientSession { ...@@ -878,7 +874,6 @@ public class HttpSession extends LocalClientSession {
} }
} }
} }
fireConnectionOpened(connection);
} }
private int getOpenConnectionCount() { private int getOpenConnectionCount() {
...@@ -978,8 +973,13 @@ public class HttpSession extends LocalClientSession { ...@@ -978,8 +973,13 @@ public class HttpSession extends LocalClientSession {
private void deliver(Deliverable stanza) { private void deliver(Deliverable stanza) {
Collection<Deliverable> deliverable = Arrays.asList(stanza); Collection<Deliverable> deliverable = Arrays.asList(stanza);
boolean delivered = false; boolean delivered = false;
int pendingConnections = 0;
synchronized (connectionQueue) { synchronized (connectionQueue) {
for (HttpConnection connection : connectionQueue) { for (HttpConnection connection : connectionQueue) {
if (connection.isClosed()) {
continue;
}
pendingConnections++;
try { try {
if (connection.getRequestId() == lastRequestID + 1) { if (connection.getRequestId() == lastRequestID + 1) {
lastRequestID = connection.getRequestId(); lastRequestID = connection.getRequestId();
...@@ -998,6 +998,9 @@ public class HttpSession extends LocalClientSession { ...@@ -998,6 +998,9 @@ public class HttpSession extends LocalClientSession {
} }
if (!delivered) { if (!delivered) {
if (pendingConnections > 0) {
Log.warn("Unable to deliver a stanza (it is being queued instead), although there are available connections! RID / Connection processing is out of sync!");
}
pendingElements.add(stanza); pendingElements.add(stanza);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment