Commit f4a388f7 authored by Alex Wenckus's avatar Alex Wenckus Committed by alex

Initial work on handling out of order requests

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@6752 b35dd754-fafc-0310-a699-88a17e54d16e
parent a4ba1587
...@@ -540,13 +540,13 @@ public class SessionManager extends BasicModule { ...@@ -540,13 +540,13 @@ public class SessionManager extends BasicModule {
return session; return session;
} }
public HttpSession createClientHttpSession(long rid, InetAddress address, StreamID id) public HttpSession createClientHttpSession(long rid, InetAddress address, StreamID id, int hold)
throws UnauthorizedException throws UnauthorizedException
{ {
if (serverName == null) { if (serverName == null) {
throw new UnauthorizedException("Server not initialized"); throw new UnauthorizedException("Server not initialized");
} }
HttpSession session = new HttpSession(serverName, address, id, rid); HttpSession session = new HttpSession(serverName, address, id, rid, hold);
Connection conn = session.getConnection(); Connection conn = session.getConnection();
conn.init(session); conn.init(session);
conn.registerCloseListener(clientSessionListener, session); conn.registerCloseListener(clientSessionListener, session);
......
...@@ -118,6 +118,7 @@ public class HttpConnection { ...@@ -118,6 +118,7 @@ public class HttpConnection {
} }
catch (HttpBindTimeoutException e) { catch (HttpBindTimeoutException e) {
this.isClosed = true; this.isClosed = true;
session.closeConnection(this);
throw e; throw e;
} }
} }
......
...@@ -42,7 +42,7 @@ public class HttpSession extends ClientSession { ...@@ -42,7 +42,7 @@ public class HttpSession extends ClientSession {
private int wait; private int wait;
private int hold = 0; private int hold = 0;
private String language; private String language;
private final Queue<HttpConnection> connectionQueue = new LinkedList<HttpConnection>(); private final ConnectionQueue connectionQueue;
private final List<Deliverable> pendingElements = new ArrayList<Deliverable>(); private final List<Deliverable> pendingElements = new ArrayList<Deliverable>();
private final List<Deliverable> sentElements = new ArrayList<Deliverable>(); private final List<Deliverable> sentElements = new ArrayList<Deliverable>();
private boolean isSecure; private boolean isSecure;
...@@ -52,14 +52,15 @@ public class HttpSession extends ClientSession { ...@@ -52,14 +52,15 @@ public class HttpSession extends ClientSession {
private boolean isClosed; private boolean isClosed;
private int inactivityTimeout; private int inactivityTimeout;
private long lastActivity; private long lastActivity;
private long lastRequestID;
private int maxRequests; private int maxRequests;
public HttpSession(String serverName, InetAddress address, StreamID streamID, long rid) { public HttpSession(String serverName, InetAddress address, StreamID streamID, long rid,
int hold) {
super(serverName, null, streamID); super(serverName, null, streamID);
conn = new HttpVirtualConnection(address); conn = new HttpVirtualConnection(address);
this.lastActivity = System.currentTimeMillis(); this.lastActivity = System.currentTimeMillis();
this.lastRequestID = rid; this.hold = hold;
this.connectionQueue = new ConnectionQueue(hold + 1, rid);
} }
/** /**
...@@ -147,18 +148,6 @@ public class HttpSession extends ClientSession { ...@@ -147,18 +148,6 @@ public class HttpSession extends ClientSession {
return wait; return wait;
} }
/**
* Specifies the maximum number of requests the connection manager is allowed to keep waiting at
* any one time during the session. (For example, if a constrained client is unable to keep open
* more than two HTTP connections to the same HTTP server simultaneously, then it SHOULD specify
* a value of "1".)
*
* @param hold the maximum number of simultaneous waiting requests.
*/
public void setHold(int hold) {
this.hold = hold;
}
/** /**
* Specifies the maximum number of requests the connection manager is allowed to keep waiting at * Specifies the maximum number of requests the connection manager is allowed to keep waiting at
* any one time during the session. (For example, if a constrained client is unable to keep open * any one time during the session. (For example, if a constrained client is unable to keep open
...@@ -285,15 +274,6 @@ public class HttpSession extends ClientSession { ...@@ -285,15 +274,6 @@ public class HttpSession extends ClientSession {
return inactivityTimeout; return inactivityTimeout;
} }
/**
* Returns the number of connections currently awaiting a response on this session.
*
* @return the number of connections currently awaiting a response on this session.
*/
public int getConnectionCount() {
return connectionQueue.size();
}
/** /**
* Returns the time in milliseconds since the epoch that this session was last active. Activity * Returns the time in milliseconds since the epoch that this session was last active. Activity
* is a request was either made or responded to. * is a request was either made or responded to.
...@@ -322,16 +302,17 @@ public class HttpSession extends ClientSession { ...@@ -322,16 +302,17 @@ public class HttpSession extends ClientSession {
* @param isPoll true if the request was a poll, no packets were sent along with the request. * @param isPoll true if the request was a poll, no packets were sent along with the request.
* @param isSecure true if the connection was secured using HTTPS. * @param isSecure true if the connection was secured using HTTPS.
* @return the created {@link org.jivesoftware.wildfire.http.HttpConnection} which represents * @return the created {@link org.jivesoftware.wildfire.http.HttpConnection} which represents
* the connection. * the connection.
* @throws HttpConnectionClosedException if the connection was closed before a response could *
* be delivered. * @throws HttpConnectionClosedException if the connection was closed before a response could be
* delivered.
* @throws HttpBindException if the connection has violated a facet of the HTTP binding * @throws HttpBindException if the connection has violated a facet of the HTTP binding
* protocol. * protocol.
*/ */
HttpConnection createConnection(long rid, boolean isPoll, boolean isSecure) HttpConnection createConnection(long rid, boolean isPoll, boolean isSecure)
throws HttpConnectionClosedException, HttpBindException { throws HttpConnectionClosedException, HttpBindException {
HttpConnection connection = new HttpConnection(rid, isSecure); HttpConnection connection = new HttpConnection(rid, isSecure);
if (rid <= lastRequestID) { if (rid <= connectionQueue.getLastRequestId()) {
Deliverable deliverable = retrieveDeliverable(rid); Deliverable deliverable = retrieveDeliverable(rid);
if (deliverable == null) { if (deliverable == null) {
Log.warn("Deliverable unavailable for " + rid); Log.warn("Deliverable unavailable for " + rid);
...@@ -340,9 +321,10 @@ public class HttpSession extends ClientSession { ...@@ -340,9 +321,10 @@ public class HttpSession extends ClientSession {
connection.deliverBody(deliverable.getDeliverable()); connection.deliverBody(deliverable.getDeliverable());
return connection; return connection;
} }
else if (rid > (lastRequestID + hold + 1)) { else if (rid > (connectionQueue.getLastRequestId() + hold + 1)) {
// TODO handle the case of greater RID which basically has it wait // TODO handle the case of greater RID which basically has it wait
Log.warn("Request " + rid + " > " + (lastRequestID + hold + 1) + ", ending session."); Log.warn("Request " + rid + " > " + (connectionQueue.getLastRequestId() + hold + 1)
+ ", ending session.");
throw new HttpBindException("Unexpected RID Error", true, 404); throw new HttpBindException("Unexpected RID Error", true, 404);
} }
...@@ -350,6 +332,10 @@ public class HttpSession extends ClientSession { ...@@ -350,6 +332,10 @@ public class HttpSession extends ClientSession {
return connection; return connection;
} }
void closeConnection(HttpConnection httpConnection) {
this.connectionQueue.updateLastRequestId(httpConnection);
}
private Deliverable retrieveDeliverable(long rid) throws HttpBindException { private Deliverable retrieveDeliverable(long rid) throws HttpBindException {
for (Deliverable delivered : sentElements) { for (Deliverable delivered : sentElements) {
if (delivered.getRequestID() == rid) { if (delivered.getRequestID() == rid) {
...@@ -388,19 +374,17 @@ public class HttpSession extends ClientSession { ...@@ -388,19 +374,17 @@ public class HttpSession extends ClientSession {
catch (HttpConnectionClosedException he) { catch (HttpConnectionClosedException he) {
throw he; throw he;
} }
connectionQueue.updateLastRequestId(connection);
} }
else { else {
// With this connection we need to check if we will have too many connections open, // With this connection we need to check if we will have too many connections open,
// closing any extras. // closing any extras.
while (connectionQueue.size() >= hold) { for (HttpConnection toClose : connectionQueue.queueConnection(connection)) {
HttpConnection toClose = connectionQueue.remove();
toClose.close(); toClose.close();
fireConnectionClosed(toClose); fireConnectionClosed(toClose);
} }
connectionQueue.offer(connection);
fireConnectionOpened(connection); fireConnectionOpened(connection);
} }
lastRequestID = connection.getRequestId();
} }
private void deliver(HttpConnection connection, String deliverable) private void deliver(HttpConnection connection, String deliverable)
...@@ -444,8 +428,8 @@ public class HttpSession extends ClientSession { ...@@ -444,8 +428,8 @@ public class HttpSession extends ClientSession {
private void deliver(Deliverable stanza) { private void deliver(Deliverable stanza) {
String deliverable = createDeliverable(Arrays.asList(stanza)); String deliverable = createDeliverable(Arrays.asList(stanza));
boolean delivered = false; boolean delivered = false;
while (!delivered && connectionQueue.size() > 0) { while (!delivered && connectionQueue.hasConnectionWaiting()) {
HttpConnection connection = connectionQueue.remove(); HttpConnection connection = connectionQueue.getConnection();
try { try {
deliver(connection, deliverable); deliver(connection, deliverable);
delivered = true; delivered = true;
...@@ -488,8 +472,8 @@ public class HttpSession extends ClientSession { ...@@ -488,8 +472,8 @@ public class HttpSession extends ClientSession {
failDelivery(); failDelivery();
} }
while (connectionQueue.size() > 0) { while (connectionQueue.hasConnectionWaiting()) {
HttpConnection toClose = connectionQueue.remove(); HttpConnection toClose = connectionQueue.getConnection();
toClose.close(); toClose.close();
fireConnectionClosed(toClose); fireConnectionClosed(toClose);
} }
...@@ -511,6 +495,8 @@ public class HttpSession extends ClientSession { ...@@ -511,6 +495,8 @@ public class HttpSession extends ClientSession {
pendingElements.clear(); pendingElements.clear();
} }
/** /**
* A virtual server connection relates to a http session which its self can relate to many http * A virtual server connection relates to a http session which its self can relate to many http
* connections. * connections.
...@@ -589,15 +575,78 @@ public class HttpSession extends ClientSession { ...@@ -589,15 +575,78 @@ public class HttpSession extends ClientSession {
private final HttpConnection[] connections; private final HttpConnection[] connections;
private final String[] requestIds; private int mark = 0;
private int pointer = 0; private long lastRequestId = -1;
public ConnectionQueue(int size) { public ConnectionQueue(int size, long firstRequestId) {
this.connections = new HttpConnection[size]; this.connections = new HttpConnection[size];
this.requestIds = new String[size]; this.lastRequestId = firstRequestId;
} }
public Collection<HttpConnection> queueConnection(HttpConnection connection)
throws HttpBindException {
long diff = connection.getRequestId() - lastRequestId;
if (diff <= 0 || diff > connections.length) {
throw new HttpBindException("Unexpected rid error", true, 404);
}
Collection<HttpConnection> closableConnections = new ArrayList<HttpConnection>();
int newPointer;
if (connections[mark] != null && diff == 1 && !connections[mark].isClosed()) {
closableConnections.add(connections[mark]);
}
else if(connections[mark] != null && !connections[mark].isClosed()) {
throw new HttpBindException("Unexpected rid error", true, 404);
}
newPointer = ((int) ((mark + (diff - 1)) % connections.length));
connections[newPointer] = connection;
return Collections.unmodifiableCollection(closableConnections);
}
public HttpConnection getConnection() {
HttpConnection toReturn;
if (connections[mark] != null && !connections[mark].isClosed()) {
int newMark = (mark + 1) % connections.length;
lastRequestId = connections[mark].getRequestId();
toReturn = connections[mark];
connections[mark] = null;
mark = newMark;
}
else {
toReturn = null;
}
return toReturn;
}
/**
* Returns either the last request serviced or, if there are connections currently queued up
* the next RID that will be servered.
*
* @return the last request serviced or the last connection made.
*/
public long getNextRID() {
return connections[mark].getRequestId();
}
public long getLastRequestId() {
return lastRequestId;
}
public boolean hasConnectionWaiting() {
return connections[mark] != null && !connections[mark].isClosed();
}
public void updateLastRequestId(HttpConnection request) {
for(int i = 0; i < connections.length; i ++) {
if(connections[i].getRequestId() == request.getRequestId()) {
connections[i] = null;
lastRequestId = request.getRequestId();
return;
}
}
}
} }
} }
...@@ -114,9 +114,8 @@ public class HttpSessionManager { ...@@ -114,9 +114,8 @@ public class HttpSessionManager {
int wait = getIntAttribute(rootNode.attributeValue("wait"), 60); int wait = getIntAttribute(rootNode.attributeValue("wait"), 60);
int hold = getIntAttribute(rootNode.attributeValue("hold"), 1); int hold = getIntAttribute(rootNode.attributeValue("hold"), 1);
HttpSession session = createSession(connection.getRequestId(), address); HttpSession session = createSession(connection.getRequestId(), address, hold);
session.setWait(Math.min(wait, getMaxWait())); session.setWait(Math.min(wait, getMaxWait()));
session.setHold(hold);
session.setSecure(connection.isSecure()); session.setSecure(connection.isSecure());
session.setMaxPollingInterval(getPollingInterval()); session.setMaxPollingInterval(getPollingInterval());
session.setMaxRequests(getMaxRequests()); session.setMaxRequests(getMaxRequests());
...@@ -234,11 +233,13 @@ public class HttpSessionManager { ...@@ -234,11 +233,13 @@ public class HttpSessionManager {
return connection; return connection;
} }
private HttpSession createSession(long rid, InetAddress address) throws UnauthorizedException { private HttpSession createSession(long rid, InetAddress address, int hold)
throws UnauthorizedException
{
// Create a ClientSession for this user. // Create a ClientSession for this user.
StreamID streamID = SessionManager.getInstance().nextStreamID(); StreamID streamID = SessionManager.getInstance().nextStreamID();
// Send to the server that a new client session has been created // Send to the server that a new client session has been created
HttpSession session = sessionManager.createClientHttpSession(rid, address, streamID); HttpSession session = sessionManager.createClientHttpSession(rid, address, streamID, hold);
// Register that the new session is associated with the specified stream ID // Register that the new session is associated with the specified stream ID
sessionMap.put(streamID.getID(), session); sessionMap.put(streamID.getID(), session);
session.addSessionCloseListener(sessionListener); session.addSessionCloseListener(sessionListener);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment