Commit efe7d498 authored by Tom Evans's avatar Tom Evans Committed by tevans

OF-573: Improve synchronization model for http-bind to reduce thread blocking;...

OF-573: Improve synchronization model for http-bind to reduce thread blocking; include static web context resources for http-bind in RPM build

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@13305 b35dd754-fafc-0310-a699-88a17e54d16e
parent 309ba9a6
......@@ -72,7 +72,6 @@ rm -rf $RPM_BUILD_ROOT%{homedir}/resources/nativeAuth/osx-ppc
rm -rf $RPM_BUILD_ROOT%{homedir}/resources/nativeAuth/solaris-sparc
rm -rf $RPM_BUILD_ROOT%{homedir}/resources/nativeAuth/win32-x86
rm -f $RPM_BUILD_ROOT%{homedir}/lib/*.dll
rm -rf $RPM_BUILD_ROOT%{homedir}/resources/spank
%clean
rm -rf $RPM_BUILD_ROOT
......@@ -129,6 +128,10 @@ exit 0
%dir %{homedir}/resources/nativeAuth/linux-i386
%{homedir}/resources/nativeAuth/linux-i386/*
%dir %{homedir}/resources/security
%dir %{homedir}/resources/spank
%{homedir}/resources/spank/index.html
%dir %{homedir}/resources/spank/WEB-INF
%{homedir}/resources/spank/WEB-INF/web.xml
%config(noreplace) %{homedir}/resources/security/keystore
%config(noreplace) %{homedir}/resources/security/truststore
%config(noreplace) %{homedir}/resources/security/client.truststore
......
......@@ -69,6 +69,10 @@ public final class HttpBindManager {
public static final int HTTP_BIND_SECURE_PORT_DEFAULT = 7443;
public static final String HTTP_BIND_THREADS = "httpbind.client.processing.threads";
public static final int HTTP_BIND_THREADS_DEFAULT = 254;
// http binding CORS default properties
public static final String HTTP_BIND_CORS_ENABLED = "httpbind.CORS.enabled";
......@@ -378,7 +382,8 @@ public final class HttpBindManager {
*/
private synchronized void configureHttpBindServer(int port, int securePort) {
httpBindServer = new Server();
final QueuedThreadPool tp = new QueuedThreadPool(254);
final QueuedThreadPool tp = new QueuedThreadPool(
JiveGlobals.getIntProperty(HTTP_BIND_THREADS, HTTP_BIND_THREADS_DEFAULT));
tp.setName("Jetty-QTP-BOSH");
httpBindServer.setThreadPool(tp);
......
......@@ -29,7 +29,7 @@ import java.security.cert.X509Certificate;
/**
* Represents one HTTP connection with a client using the HTTP Binding service. The client will wait
* on {@link #getResponse()} until the server forwards a message to it or the wait time on the
* session timesout.
* session timeout.
*
* @author Alexander Wenckus
*/
......@@ -79,7 +79,7 @@ public class HttpConnection {
/**
* Returns true if this connection has been closed, either a response was delivered to the
* client or the server closed the connection abrubtly.
* client or the server closed the connection abruptly.
*
* @return true if this connection has been closed.
*/
......@@ -107,22 +107,23 @@ public class HttpConnection {
*
* @param body the XMPP content to be forwarded to the client inside of a body tag.
*
* @throws HttpConnectionClosedException when this connection to the client has already recieved
* @throws HttpConnectionClosedException when this connection to the client has already received
* a deliverable to forward to the client
*/
public void deliverBody(String body) throws HttpConnectionClosedException {
if(body == null) {
throw new IllegalArgumentException("Body cannot be null!");
}
// We only want to use this function once so we will close it when the body is delivered.
if (isClosed) {
throw new HttpConnectionClosedException("The http connection is no longer " +
"available to deliver content");
synchronized (this) {
if (isClosed) {
throw new HttpConnectionClosedException("The http connection is no longer " +
"available to deliver content");
}
else {
isClosed = true;
}
}
if (body == null) {
body = CONNECTION_CLOSED;
}
else {
isClosed = true;
}
if (continuation != null && continuation.isSuspended()) {
continuation.setAttribute("response-body", body);
continuation.resume();
......@@ -212,7 +213,7 @@ public class HttpConnection {
if (continuation.isResumed()) {
String deliverable = (String) continuation.getAttribute("response-body");
// This will occur when the hold attribute of a session has been exceded.
// This will occur when the hold attribute of a session has been exceeded.
this.isDelivered = true;
if (deliverable == null) {
throw new HttpBindTimeoutException();
......
......@@ -99,9 +99,9 @@ public class HttpSession extends LocalClientSession {
private int wait;
private int hold = 0;
private String language;
private final List<HttpConnection> connectionQueue = new LinkedList<HttpConnection>();
private final List<Deliverable> pendingElements = new ArrayList<Deliverable>();
private final List<Delivered> sentElements = new ArrayList<Delivered>();
private final List<HttpConnection> connectionQueue = Collections.synchronizedList(new LinkedList<HttpConnection>());
private final List<Deliverable> pendingElements = Collections.synchronizedList(new ArrayList<Deliverable>());
private final List<Delivered> sentElements = Collections.synchronizedList(new ArrayList<Delivered>());
private boolean isSecure;
private int maxPollingInterval;
private long lastPoll = -1;
......@@ -134,6 +134,7 @@ public class HttpSession extends LocalClientSession {
StreamID streamID, long rid, HttpConnection connection) {
super(serverName, null, streamID);
conn = new HttpVirtualConnection(address);
this.isClosed = false;
this.lastActivity = System.currentTimeMillis();
this.lastRequestID = rid;
this.backupDeliverer = backupDeliverer;
......@@ -197,12 +198,12 @@ public class HttpSession extends LocalClientSession {
}
/**
* Returns true if this session has been closed and no longer activley accepting connections.
* Returns true if this session has been closed and no longer actively accepting connections.
*
* @return true if this session has been closed and no longer activley accepting connections.
* @return true if this session has been closed and no longer actively accepting connections.
*/
@Override
public synchronized boolean isClosed() {
public boolean isClosed() {
return isClosed;
}
......@@ -295,9 +296,9 @@ public class HttpSession extends LocalClientSession {
}
/**
* The max number of requests it is permissable for this session to have open at any one time.
* The max number of requests it is permissible for this session to have open at any one time.
*
* @param maxRequests The max number of requests it is permissable for this session to have open
* @param maxRequests The max number of requests it is permissible for this session to have open
* at any one time.
*/
public void setMaxRequests(int maxRequests) {
......@@ -305,10 +306,10 @@ public class HttpSession extends LocalClientSession {
}
/**
* Returns the max number of requests it is permissable for this session to have open at any one
* Returns the max number of requests it is permissible for this session to have open at any one
* time.
*
* @return the max number of requests it is permissable for this session to have open at any one
* @return the max number of requests it is permissible for this session to have open at any one
* time.
*/
public int getMaxRequests() {
......@@ -438,12 +439,14 @@ public class HttpSession extends LocalClientSession {
*/
public void pause(int duration) {
// Respond immediately to all pending requests
for (HttpConnection toClose : connectionQueue) {
if (!toClose.isClosed()) {
toClose.close();
lastRequestID = toClose.getRequestId();
}
}
synchronized (connectionQueue) {
for (HttpConnection toClose : connectionQueue) {
if (!toClose.isClosed()) {
toClose.close();
lastRequestID = toClose.getRequestId();
}
}
}
setInactivityTimeout(duration);
}
......@@ -454,21 +457,19 @@ public class HttpSession extends LocalClientSession {
*
* @return the time in milliseconds since the epoch that this session was last active.
*/
public synchronized long getLastActivity() {
if (connectionQueue.isEmpty()) {
return lastActivity;
}
else {
for (HttpConnection connection : connectionQueue) {
// The session is currently active, return the current time.
if (!connection.isClosed()) {
return System.currentTimeMillis();
}
}
// We have no currently open connections therefore we can assume that lastActivity is
// the last time the client did anything.
return lastActivity;
}
public long getLastActivity() {
if (!connectionQueue.isEmpty()) {
synchronized (connectionQueue) {
for (HttpConnection connection : connectionQueue) {
// The session is currently active, set the last activity to the current time.
if (!connection.isClosed()) {
lastActivity = System.currentTimeMillis();
break;
}
}
}
}
return lastActivity;
}
/**
......@@ -484,11 +485,13 @@ public class HttpSession extends LocalClientSession {
public long getLastAcknowledged() {
long ack = lastRequestID;
Collections.sort(connectionQueue, connectionComparator);
for (HttpConnection connection : connectionQueue) {
if (connection.getRequestId() == ack + 1) {
ack++;
}
}
synchronized (connectionQueue) {
for (HttpConnection connection : connectionQueue) {
if (connection.getRequestId() == ack + 1) {
ack++;
}
}
}
return ack;
}
......@@ -572,18 +575,20 @@ public class HttpSession extends LocalClientSession {
* Use {@link #consumeResponse(HttpConnection)} instead
*/
public String getResponse(long requestID) throws HttpBindException {
for (HttpConnection connection : connectionQueue) {
if (connection.getRequestId() == requestID) {
String response = getResponse(connection);
// connection needs to be removed after response is returned to maintain idempotence
// otherwise if this method is called again, after 'waiting', the InternalError
// will be thrown because the connection is no longer in the queue.
connectionQueue.remove(connection);
fireConnectionClosed(connection);
return response;
}
}
synchronized (connectionQueue) {
for (HttpConnection connection : connectionQueue) {
if (connection.getRequestId() == requestID) {
String response = getResponse(connection);
// connection needs to be removed after response is returned to maintain idempotence
// otherwise if this method is called again, after 'waiting', the InternalError
// will be thrown because the connection is no longer in the queue.
connectionQueue.remove(connection);
fireConnectionClosed(connection);
return response;
}
}
}
throw new InternalError("Could not locate connection: " + requestID);
}
......@@ -641,8 +646,8 @@ public class HttpSession extends LocalClientSession {
/**
* This methods sends any pending packets in the session. If no packets are
* pending, this method simply returns. The method is internally synchronized
* to avoid simultanious sending operations on this Session. If two
* threads try to run this method simultaniously, the first one will trigger
* to avoid simultaneous sending operations on this Session. If two
* threads try to run this method simultaneously, the first one will trigger
* the pending packets to be sent, while the second one will simply return
* (as there are no packets left to send).
*/
......@@ -728,12 +733,16 @@ public class HttpSession extends LocalClientSession {
}
private Delivered retrieveDeliverable(long rid) {
for (Delivered delivered : sentElements) {
if (delivered.getRequestID() == rid) {
return delivered;
}
}
return null;
Delivered result = null;
synchronized (sentElements) {
for (Delivered delivered : sentElements) {
if (delivered.getRequestID() == rid) {
result = delivered;
break;
}
}
}
return result;
}
private void addConnection(HttpConnection connection, boolean isPoll) throws HttpBindException,
......@@ -755,37 +764,39 @@ public class HttpSession extends LocalClientSession {
* deliverable on the new connection. This is under the assumption that a connection has been dropped,
* and re-requested before jetty has realised.
*/
for (HttpConnection queuedConnection : connectionQueue) {
if (queuedConnection.getRequestId() == rid) {
if(Log.isDebugEnabled()) {
Log.debug("Found previous connection in queue with rid " + rid);
}
if(queuedConnection.isClosed()) {
synchronized (connectionQueue) {
for (HttpConnection queuedConnection : connectionQueue) {
if (queuedConnection.getRequestId() == rid) {
if(Log.isDebugEnabled()) {
Log.debug("It's closed - copying deliverables");
Log.debug("Found previous connection in queue with rid " + rid);
}
Delivered deliverable = retrieveDeliverable(rid);
if (deliverable == null) {
Log.warn("Deliverable unavailable for " + rid);
throw new HttpBindException("Unexpected RID error.",
BoshBindingError.itemNotFound);
}
connection.deliverBody(createDeliverable(deliverable.deliverables));
} else {
if(Log.isDebugEnabled()) {
Log.debug("It's still open - calling close()");
}
deliver(queuedConnection, Collections.singleton(new Deliverable("")));
connection.close();
if(rid == (lastRequestID + 1)) {
lastRequestID = rid;
if(queuedConnection.isClosed()) {
if(Log.isDebugEnabled()) {
Log.debug("It's closed - copying deliverables");
}
Delivered deliverable = retrieveDeliverable(rid);
if (deliverable == null) {
Log.warn("Deliverable unavailable for " + rid);
throw new HttpBindException("Unexpected RID error.",
BoshBindingError.itemNotFound);
}
connection.deliverBody(createDeliverable(deliverable.deliverables));
} else {
if(Log.isDebugEnabled()) {
Log.debug("It's still open - calling close()");
}
deliver(queuedConnection, Collections.singleton(new Deliverable("")));
connection.close();
if(rid == (lastRequestID + 1)) {
lastRequestID = rid;
}
}
break;
}
break;
}
}
}
checkOveractivity(isPoll);
......@@ -808,31 +819,33 @@ public class HttpSession extends LocalClientSession {
connectionQueue.add(connection);
Collections.sort(connectionQueue, connectionComparator);
int connectionsToClose;
if(connectionQueue.get(connectionQueue.size() - 1) != connection) {
// Current connection does not have the greatest rid. That means
// requests were received out of order, respond to all.
connectionsToClose = connectionQueue.size();
}
else {
// Everything's fine, number of current connections open tells us
// how many that we need to close.
connectionsToClose = getOpenConnectionCount() - hold;
}
int closed = 0;
for (int i = 0; i < connectionQueue.size() && closed < connectionsToClose; i++) {
HttpConnection toClose = connectionQueue.get(i);
if (!toClose.isClosed() && toClose.getRequestId() == lastRequestID + 1) {
if(toClose == connection) {
// Current connection has no continuation yet, just deliver.
deliver("");
}
else {
toClose.close();
}
lastRequestID = toClose.getRequestId();
closed++;
}
synchronized (connectionQueue) {
int connectionsToClose;
if(connectionQueue.get(connectionQueue.size() - 1) != connection) {
// Current connection does not have the greatest rid. That means
// requests were received out of order, respond to all.
connectionsToClose = connectionQueue.size();
}
else {
// Everything's fine, number of current connections open tells us
// how many that we need to close.
connectionsToClose = getOpenConnectionCount() - hold;
}
int closed = 0;
for (int i = 0; i < connectionQueue.size() && closed < connectionsToClose; i++) {
HttpConnection toClose = connectionQueue.get(i);
if (!toClose.isClosed() && toClose.getRequestId() == lastRequestID + 1) {
if(toClose == connection) {
// Current connection has no continuation yet, just deliver.
deliver("");
}
else {
toClose.close();
}
lastRequestID = toClose.getRequestId();
closed++;
}
}
}
}
fireConnectionOpened(connection);
......@@ -840,6 +853,7 @@ public class HttpSession extends LocalClientSession {
private int getOpenConnectionCount() {
int count = 0;
// NOTE: synchronized by caller
for (HttpConnection connection : connectionQueue) {
if (!connection.isClosed()) {
count++;
......@@ -883,10 +897,12 @@ public class HttpSession extends LocalClientSession {
boolean overactivity = false;
String errorMessage = "Overactivity detected";
for (HttpConnection conn : connectionQueue) {
if (!conn.isClosed()) {
pendingConnections++;
}
synchronized (connectionQueue) {
for (HttpConnection conn : connectionQueue) {
if (!conn.isClosed()) {
pendingConnections++;
}
}
}
if(pendingConnections >= maxRequests) {
......@@ -917,7 +933,7 @@ public class HttpSession extends LocalClientSession {
}
}
private synchronized void deliver(String text) {
private void deliver(String text) {
if (text == null) {
// Do nothing if someone asked to send nothing :)
return;
......@@ -925,25 +941,27 @@ public class HttpSession extends LocalClientSession {
deliver(new Deliverable(text));
}
private synchronized void deliver(Packet stanza) {
public void deliver(Packet stanza) {
deliver(new Deliverable(Arrays.asList(stanza)));
}
private void deliver(Deliverable stanza) {
Collection<Deliverable> deliverable = Arrays.asList(stanza);
boolean delivered = false;
for (HttpConnection connection : connectionQueue) {
try {
if (connection.getRequestId() == lastRequestID + 1) {
lastRequestID = connection.getRequestId();
deliver(connection, deliverable);
delivered = true;
break;
}
}
catch (HttpConnectionClosedException e) {
/* Connection was closed, try the next one */
}
synchronized (connectionQueue) {
for (HttpConnection connection : connectionQueue) {
try {
if (connection.getRequestId() == lastRequestID + 1) {
lastRequestID = connection.getRequestId();
deliver(connection, deliverable);
delivered = true;
break;
}
}
catch (HttpConnectionClosedException e) {
/* Connection was closed, try the next one */
}
}
}
if (!delivered) {
......@@ -969,18 +987,20 @@ public class HttpSession extends LocalClientSession {
builder.append(">");
setLastResponseEmpty(elements.size() == 0);
for (Deliverable child : elements) {
builder.append(child.getDeliverable());
synchronized (elements) {
for (Deliverable child : elements) {
builder.append(child.getDeliverable());
}
}
builder.append("</body>");
return builder.toString();
}
private synchronized void closeConnection() {
if (isClosed) {
return;
}
isClosed = true;
private void closeSession() {
synchronized (this) {
if (isClosed) { return; }
isClosed = true;
}
if (pendingElements.size() > 0) {
failDelivery();
......@@ -993,28 +1013,32 @@ public class HttpSession extends LocalClientSession {
}
private void failDelivery() {
for (Deliverable deliverable : pendingElements) {
Collection<Packet> packet = deliverable.getPackets();
if (packet != null) {
failDelivery(packet);
}
}
synchronized (pendingElements) {
for (Deliverable deliverable : pendingElements) {
Collection<Packet> packet = deliverable.getPackets();
if (packet != null) {
failDelivery(packet);
}
}
pendingElements.clear();
}
for (HttpConnection toClose : connectionQueue) {
if (!toClose.isDelivered()) {
Delivered delivered = retrieveDeliverable(toClose.getRequestId());
if (delivered != null) {
failDelivery(delivered.getPackets());
}
else {
Log.warn("Packets could not be found for session " + getStreamID() + " cannot " +
"be delivered to client");
}
}
toClose.close();
fireConnectionClosed(toClose);
synchronized (connectionQueue) {
for (HttpConnection toClose : connectionQueue) {
if (!toClose.isDelivered()) {
Delivered delivered = retrieveDeliverable(toClose.getRequestId());
if (delivered != null) {
failDelivery(delivered.getPackets());
}
else {
Log.warn("Packets could not be found for session " + getStreamID() + " cannot " +
"be delivered to client");
}
}
toClose.close();
fireConnectionClosed(toClose);
}
}
pendingElements.clear();
}
private void failDelivery(Collection<Packet> packets) {
......@@ -1056,7 +1080,7 @@ public class HttpSession extends LocalClientSession {
@Override
public void closeVirtualConnection() {
((HttpSession) session).closeConnection();
((HttpSession) session).closeSession();
}
public byte[] getAddress() throws UnknownHostException {
......@@ -1204,10 +1228,12 @@ public class HttpSession extends LocalClientSession {
public Collection<Packet> getPackets() {
List<Packet> packets = new ArrayList<Packet>();
for (Deliverable deliverable : deliverables) {
if (deliverable.packets != null) {
packets.addAll(deliverable.getPackets());
}
synchronized (deliverables) {
for (Deliverable deliverable : deliverables) {
if (deliverable.packets != null) {
packets.addAll(deliverable.getPackets());
}
}
}
return packets;
}
......
......@@ -51,7 +51,8 @@ public class HttpSessionManager {
private static final Logger Log = LoggerFactory.getLogger(HttpSessionManager.class);
private SessionManager sessionManager;
private Map<String, HttpSession> sessionMap = new ConcurrentHashMap<String, HttpSession>();
private Map<String, HttpSession> sessionMap = new ConcurrentHashMap<String, HttpSession>(
JiveGlobals.getIntProperty("xmpp.httpbind.session.initial.count", 16));
private TimerTask inactivityTask;
private Executor sendPacketPool;
private SessionListener sessionListener = new SessionListener() {
......@@ -73,7 +74,8 @@ public class HttpSessionManager {
this.sessionManager = SessionManager.getInstance();
// Set the executor to use for processing http requests
int eventThreads = JiveGlobals.getIntProperty("xmpp.client.processing.threads", 16);
int eventThreads = JiveGlobals.getIntProperty("httpbind.client.processing.threads",
JiveGlobals.getIntProperty("xmpp.client.processing.threads", 16));
sendPacketPool = new ThreadPoolExecutor(
eventThreads + 1, eventThreads + 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
}
......
......@@ -838,7 +838,7 @@ public class LocalClientSession extends LocalSession implements ClientSession {
}
@Override
void deliver(Packet packet) throws UnauthorizedException {
public void deliver(Packet packet) throws UnauthorizedException {
if (conn != null && !conn.isClosed()) {
conn.deliver(packet);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment