Commit 64c33cc9 authored by Alex Wenckus's avatar Alex Wenckus Committed by alex

fix for multiple simultaneous connection issues. JM-955

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@6984 b35dd754-fafc-0310-a699-88a17e54d16e
parent 4cc39134
...@@ -114,8 +114,16 @@ public class HttpBindServlet extends HttpServlet { ...@@ -114,8 +114,16 @@ public class HttpBindServlet extends HttpServlet {
return false; return false;
} }
synchronized (session) { synchronized (session) {
respond(response, session.getResponse((Long) request.getAttribute("request")) try {
.getBytes("utf-8")); respond(response, session.getResponse((Long) request.getAttribute("request"))
.getBytes("utf-8"));
}
catch (HttpBindException e) {
response.sendError(e.getHttpError(), e.getMessage());
if(e.shouldCloseSession()) {
session.close();
}
}
} }
return true; return true;
} }
......
...@@ -55,6 +55,13 @@ public class HttpSession extends ClientSession { ...@@ -55,6 +55,13 @@ public class HttpSession extends ClientSession {
private long lastRequestID; private long lastRequestID;
private int maxRequests; private int maxRequests;
private static final Comparator<HttpConnection> connectionComparator
= new Comparator<HttpConnection>() {
public int compare(HttpConnection o1, HttpConnection o2) {
return (int) (o1.getRequestId() - o2.getRequestId());
}
};
public HttpSession(String serverName, InetAddress address, StreamID streamID, long rid) { public HttpSession(String serverName, InetAddress address, StreamID streamID, long rid) {
super(serverName, null, streamID); super(serverName, null, streamID);
conn = new HttpVirtualConnection(address); conn = new HttpVirtualConnection(address);
...@@ -311,24 +318,34 @@ public class HttpSession extends ClientSession { ...@@ -311,24 +318,34 @@ public class HttpSession extends ClientSession {
} }
} }
public String getResponse(long requestID) { public String getResponse(long requestID) throws HttpBindException {
for (HttpConnection connection : connectionQueue) { for (HttpConnection connection : connectionQueue) {
if (connection.getRequestId() == requestID) { if (connection.getRequestId() == requestID) {
String response; if(requestID > lastRequestID + 1) {
try { throw new HttpBindException("Invalid RID error.", true, 404);
response = connection.getResponse();
}
catch (HttpBindTimeoutException e) {
response = createEmptyBody();
} }
connectionQueue.remove(connection); connectionQueue.remove(connection);
fireConnectionClosed(connection); fireConnectionClosed(connection);
return response; if(requestID > lastRequestID) {
lastRequestID = connection.getRequestId();
}
return getResponse(connection);
} }
} }
throw new InternalError("Could not locate connection: " + requestID); throw new InternalError("Could not locate connection: " + requestID);
} }
private String getResponse(HttpConnection connection) {
String response;
try {
response = connection.getResponse();
}
catch (HttpBindTimeoutException e) {
response = createEmptyBody();
}
return response;
}
/** /**
* Sets whether the initial request on the session was secure. * Sets whether the initial request on the session was secure.
* *
...@@ -347,9 +364,10 @@ public class HttpSession extends ClientSession { ...@@ -347,9 +364,10 @@ public class HttpSession extends ClientSession {
* @param isPoll true if the request was a poll, no packets were sent along with the request. * @param isPoll true if the request was a poll, no packets were sent along with the request.
* @param isSecure true if the connection was secured using HTTPS. * @param isSecure true if the connection was secured using HTTPS.
* @return the created {@link org.jivesoftware.wildfire.http.HttpConnection} which represents * @return the created {@link org.jivesoftware.wildfire.http.HttpConnection} which represents
* the connection. * the connection.
* @throws HttpConnectionClosedException if the connection was closed before a response could *
* be delivered. * @throws HttpConnectionClosedException if the connection was closed before a response could be
* delivered.
* @throws HttpBindException if the connection has violated a facet of the HTTP binding * @throws HttpBindException if the connection has violated a facet of the HTTP binding
* protocol. * protocol.
*/ */
...@@ -402,12 +420,13 @@ public class HttpSession extends ClientSession { ...@@ -402,12 +420,13 @@ public class HttpSession extends ClientSession {
connection.setSession(this); connection.setSession(this);
// We aren't supposed to hold connections open or we already have some packets waiting // We aren't supposed to hold connections open or we already have some packets waiting
// to be sent to the client. // to be sent to the client.
if (hold <= 0 || pendingElements.size() > 0) { if (hold <= 0 || (pendingElements.size() > 0 && connection.getRequestId()
== lastRequestID + 1)) {
String deliverable = createDeliverable(pendingElements); String deliverable = createDeliverable(pendingElements);
try { try {
fireConnectionOpened(connection); fireConnectionOpened(connection);
deliver(connection, deliverable); deliver(connection, deliverable);
fireConnectionClosed(connection); lastRequestID = connection.getRequestId();
pendingElements.clear(); pendingElements.clear();
} }
catch (HttpConnectionClosedException he) { catch (HttpConnectionClosedException he) {
...@@ -417,15 +436,18 @@ public class HttpSession extends ClientSession { ...@@ -417,15 +436,18 @@ public class HttpSession extends ClientSession {
else { else {
// With this connection we need to check if we will have too many connections open, // With this connection we need to check if we will have too many connections open,
// closing any extras. // closing any extras.
while (connectionQueue.size() >= hold) {
HttpConnection toClose = connectionQueue.remove(0); // Number of current connections open plus the current one tells us how many that
// we need to close.
int connectionsToClose = (connectionQueue.size() + 1) - hold;
for (int i = 0; i < connectionsToClose; i++) {
HttpConnection toClose = connectionQueue.get(i);
toClose.close(); toClose.close();
fireConnectionClosed(toClose);
} }
connectionQueue.add(connection); connectionQueue.add(connection);
Collections.sort(connectionQueue, connectionComparator);
fireConnectionOpened(connection); fireConnectionOpened(connection);
} }
lastRequestID = connection.getRequestId();
} }
private void deliver(HttpConnection connection, String deliverable) private void deliver(HttpConnection connection, String deliverable)
...@@ -471,9 +493,12 @@ public class HttpSession extends ClientSession { ...@@ -471,9 +493,12 @@ public class HttpSession extends ClientSession {
boolean delivered = false; boolean delivered = false;
for (HttpConnection connection : connectionQueue) { for (HttpConnection connection : connectionQueue) {
try { try {
deliver(connection, deliverable); if (connection.getRequestId() == lastRequestID + 1) {
delivered = true; deliver(connection, deliverable);
break; delivered = true;
lastRequestID = connection.getRequestId();
break;
}
} }
catch (HttpConnectionClosedException e) { catch (HttpConnectionClosedException e) {
/* Connection was closed, try the next one */ /* Connection was closed, try the next one */
...@@ -609,24 +634,4 @@ public class HttpSession extends ClientSession { ...@@ -609,24 +634,4 @@ public class HttpSession extends ClientSession {
return (int) (o.getRequestID() - requestID); return (int) (o.getRequestID() - requestID);
} }
} }
/**
* A queue of all current connections. Connections are dealt with in order of their request
* IDs.
*/
private class ConnectionQueue {
private final HttpConnection[] connections;
private final String[] requestIds;
private int pointer = 0;
public ConnectionQueue(int size) {
this.connections = new HttpConnection[size];
this.requestIds = new String[size];
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment