Commit 47a6bda6 authored by Alex Wenckus's avatar Alex Wenckus Committed by alex

Fix for JM-1001 a deadlock in wildfire

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/branches/3_2_branch@7559 b35dd754-fafc-0310-a699-88a17e54d16e
parent 49accc7d
/** /**
* $RCSfile$
* $Revision: $ * $Revision: $
* $Date: $ * $Date: $
* *
...@@ -28,6 +27,8 @@ import org.xmpp.packet.Packet; ...@@ -28,6 +27,8 @@ import org.xmpp.packet.Packet;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.*; import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/** /**
* A session represents a serious of interactions with an XMPP client sending packets using the HTTP * A session represents a serious of interactions with an XMPP client sending packets using the HTTP
...@@ -55,6 +56,10 @@ public class HttpSession extends ClientSession { ...@@ -55,6 +56,10 @@ public class HttpSession extends ClientSession {
private int maxRequests; private int maxRequests;
private PacketDeliverer backupDeliverer; private PacketDeliverer backupDeliverer;
private final Queue<Collection<Element>> packetsToSend = new LinkedList<Collection<Element>>();
// Semaphore which protects the packets to send, so, there can only be one consumer at a time.
private final Semaphore packetsToSendSemaphore = new Semaphore(1);
private static final Comparator<HttpConnection> connectionComparator private static final Comparator<HttpConnection> connectionComparator
= new Comparator<HttpConnection>() { = new Comparator<HttpConnection>() {
public int compare(HttpConnection o1, HttpConnection o2) { public int compare(HttpConnection o1, HttpConnection o2) {
...@@ -294,15 +299,6 @@ public class HttpSession extends ClientSession { ...@@ -294,15 +299,6 @@ public class HttpSession extends ClientSession {
return inactivityTimeout; return inactivityTimeout;
} }
/**
* Returns the number of connections currently awaiting a response on this session.
*
* @return the number of connections currently awaiting a response on this session.
*/
public int getConnectionCount() {
return connectionQueue.size();
}
/** /**
* Returns the time in milliseconds since the epoch that this session was last active. Activity * Returns the time in milliseconds since the epoch that this session was last active. Activity
* is a request was either made or responded to. If the session is currently active, meaning * is a request was either made or responded to. If the session is currently active, meaning
...@@ -350,12 +346,12 @@ public class HttpSession extends ClientSession { ...@@ -350,12 +346,12 @@ public class HttpSession extends ClientSession {
} }
catch (HttpBindTimeoutException e) { catch (HttpBindTimeoutException e) {
// This connection timed out we need to increment the request count // This connection timed out we need to increment the request count
if(connection.getRequestId() != lastRequestID + 1) { if (connection.getRequestId() != lastRequestID + 1) {
throw new HttpBindException("Unexpected RID error.", true, 404); throw new HttpBindException("Unexpected RID error.", true, 404);
} }
lastRequestID = connection.getRequestId(); lastRequestID = connection.getRequestId();
} }
if(response == null) { if (response == null) {
response = createEmptyBody(); response = createEmptyBody();
} }
return response; return response;
...@@ -370,13 +366,45 @@ public class HttpSession extends ClientSession { ...@@ -370,13 +366,45 @@ public class HttpSession extends ClientSession {
this.isSecure = isSecure; this.isSecure = isSecure;
} }
/**
* Returns the next set of packets to be sent. This method is meant to be used in the
* producer-consumer model to prevent deadlocks when processing incoming packets from the
* session. The internal packet queue is protected so that only one collection of packets can be
* sent at a time. This method will return null if packets are currently being consumed or there
* is no collection of packets waiting to be sent. After done consuming the packets {@link
* #releasePacketsToSend()} needs to be called.
*
* @param time the quantity of time to wait for the queue to be free for consumption
* @param timeUnit the unit of time related to the quanity
* @return a collection of packets to be sent.
*/
public Collection<Element> getPacketsToSend(long time, TimeUnit timeUnit) {
try {
if (!packetsToSendSemaphore.tryAcquire(time, timeUnit)
|| packetsToSend.size() <= 0) {
return null;
}
}
catch (InterruptedException e) {
return null;
}
return packetsToSend.remove();
}
/**
* Releases the lock on the send packets queue so that other threads may access it.
*/
public void releasePacketsToSend() {
packetsToSendSemaphore.release();
}
/** /**
* Creates a new connection on this session. If a response is currently available for this * Creates a new connection on this session. If a response is currently available for this
* session the connection is responded to immediately, otherwise it is queued awaiting a * session the connection is responded to immediately, otherwise it is queued awaiting a
* response. * response.
* *
* @param rid the request id related to the connection. * @param rid the request id related to the connection.
* @param isPoll true if the request was a poll, no packets were sent along with the request. * @param packetsToBeSent any packets that this connection should send.
* @param isSecure true if the connection was secured using HTTPS. * @param isSecure true if the connection was secured using HTTPS.
* @return the created {@link org.jivesoftware.wildfire.http.HttpConnection} which represents * @return the created {@link org.jivesoftware.wildfire.http.HttpConnection} which represents
* the connection. * the connection.
...@@ -386,8 +414,10 @@ public class HttpSession extends ClientSession { ...@@ -386,8 +414,10 @@ public class HttpSession extends ClientSession {
* @throws HttpBindException if the connection has violated a facet of the HTTP binding * @throws HttpBindException if the connection has violated a facet of the HTTP binding
* protocol. * protocol.
*/ */
HttpConnection createConnection(long rid, boolean isPoll, boolean isSecure) synchronized HttpConnection createConnection(long rid, Collection<Element> packetsToBeSent,
throws HttpConnectionClosedException, HttpBindException { boolean isSecure)
throws HttpConnectionClosedException, HttpBindException
{
HttpConnection connection = new HttpConnection(rid, isSecure); HttpConnection connection = new HttpConnection(rid, isSecure);
if (rid <= lastRequestID) { if (rid <= lastRequestID) {
Delivered deliverable = retrieveDeliverable(rid); Delivered deliverable = retrieveDeliverable(rid);
...@@ -404,7 +434,10 @@ public class HttpSession extends ClientSession { ...@@ -404,7 +434,10 @@ public class HttpSession extends ClientSession {
throw new HttpBindException("Unexpected RID Error", true, 404); throw new HttpBindException("Unexpected RID Error", true, 404);
} }
addConnection(connection, isPoll); if (packetsToBeSent.size() > 0) {
packetsToSend.add(packetsToBeSent);
}
addConnection(connection, packetsToBeSent.size() <= 0);
return connection; return connection;
} }
...@@ -470,8 +503,8 @@ public class HttpSession extends ClientSession { ...@@ -470,8 +503,8 @@ public class HttpSession extends ClientSession {
private int getOpenConnectionCount() { private int getOpenConnectionCount() {
int count = 0; int count = 0;
for(HttpConnection connection : connectionQueue) { for (HttpConnection connection : connectionQueue) {
if(!connection.isClosed()) { if (!connection.isClosed()) {
count++; count++;
} }
} }
......
/** /**
* $RCSfile$
* $Revision: $ * $Revision: $
* $Date: $ * $Date: $
* *
* Copyright (C) 2006 Jive Software. All rights reserved. * Copyright (C) 2007 Jive Software. All rights reserved.
* *
* This software is published under the terms of the GNU Public License (GPL), * This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution. * a copy of which is included in this distribution.
...@@ -24,6 +23,9 @@ import org.dom4j.*; ...@@ -24,6 +23,9 @@ import org.dom4j.*;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.InetAddress; import java.net.InetAddress;
...@@ -35,6 +37,7 @@ public class HttpSessionManager { ...@@ -35,6 +37,7 @@ public class HttpSessionManager {
private SessionManager sessionManager; private SessionManager sessionManager;
private Map<String, HttpSession> sessionMap = new ConcurrentHashMap<String, HttpSession>(); private Map<String, HttpSession> sessionMap = new ConcurrentHashMap<String, HttpSession>();
private TimerTask inactivityTask; private TimerTask inactivityTask;
private Executor sendPacketPool = Executors.newCachedThreadPool();
private SessionListener sessionListener = new SessionListener() { private SessionListener sessionListener = new SessionListener() {
public void connectionOpened(HttpSession session, HttpConnection connection) { public void connectionOpened(HttpSession session, HttpConnection connection) {
} }
...@@ -69,7 +72,7 @@ public class HttpSessionManager { ...@@ -69,7 +72,7 @@ public class HttpSessionManager {
*/ */
public void stop() { public void stop() {
inactivityTask.cancel(); inactivityTask.cancel();
for(HttpSession session : sessionMap.values()) { for (HttpSession session : sessionMap.values()) {
session.close(); session.close();
} }
sessionMap.clear(); sessionMap.clear();
...@@ -90,24 +93,24 @@ public class HttpSessionManager { ...@@ -90,24 +93,24 @@ public class HttpSessionManager {
* *
* @param address the internet address that was used to bind to Wildfie. * @param address the internet address that was used to bind to Wildfie.
* @param rootNode the body element that was sent containing the request for a new session. * @param rootNode the body element that was sent containing the request for a new session.
* @param connection the HTTP connection object which abstracts the individual connections * @param connection the HTTP connection object which abstracts the individual connections to
* to Wildfire over the HTTP binding protocol. The initial session creation response is * Wildfire over the HTTP binding protocol. The initial session creation response is returned to
* returned to this connection. * this connection.
* @return the created HTTP session. * @return the created HTTP session.
*
* @throws UnauthorizedException if the Wildfire server is currently in an uninitialized state. * @throws UnauthorizedException if the Wildfire server is currently in an uninitialized state.
* Either shutting down or starting up. * Either shutting down or starting up.
* @throws HttpBindException when there is an internal server error related to the creation of * @throws HttpBindException when there is an internal server error related to the creation of
* the initial session creation response. * the initial session creation response.
*/ */
public HttpSession createSession(InetAddress address, Element rootNode, public HttpSession createSession(InetAddress address, Element rootNode,
HttpConnection connection) HttpConnection connection)
throws UnauthorizedException, HttpBindException throws UnauthorizedException, HttpBindException {
{
// TODO Check if IP address is allowed to connect to the server // TODO Check if IP address is allowed to connect to the server
// Default language is English ("en"). // Default language is English ("en").
String language = rootNode.attributeValue("xml:lang"); String language = rootNode.attributeValue("xml:lang");
if(language == null || "".equals(language)) { if (language == null || "".equals(language)) {
language = "en"; language = "en";
} }
...@@ -138,13 +141,13 @@ public class HttpSessionManager { ...@@ -138,13 +141,13 @@ public class HttpSessionManager {
/** /**
* Returns the longest time (in seconds) that Wildfire is allowed to wait before * Returns the longest time (in seconds) that Wildfire is allowed to wait before responding to
* responding to any request during the session. This enables the client to prevent its TCP * any request during the session. This enables the client to prevent its TCP connection from
* connection from expiring due to inactivity, as well as to limit the delay before it * expiring due to inactivity, as well as to limit the delay before it discovers any network
* discovers any network failure. * failure.
* *
* @return the longest time (in seconds) that Wildfire is allowed to wait before * @return the longest time (in seconds) that Wildfire is allowed to wait before responding to
* responding to any request during the session. * any request during the session.
*/ */
public int getMaxWait() { public int getMaxWait() {
return JiveGlobals.getIntProperty("xmpp.httpbind.client.requests.wait", return JiveGlobals.getIntProperty("xmpp.httpbind.client.requests.wait",
...@@ -152,26 +155,25 @@ public class HttpSessionManager { ...@@ -152,26 +155,25 @@ public class HttpSessionManager {
} }
/** /**
* Wildfire SHOULD include two additional attributes in the session creation * Wildfire SHOULD include two additional attributes in the session creation response element,
* response element, specifying the shortest allowable polling interval and the longest * specifying the shortest allowable polling interval and the longest allowable inactivity
* allowable inactivity period (both in seconds). Communication of these parameters enables * period (both in seconds). Communication of these parameters enables the client to engage in
* the client to engage in appropriate behavior (e.g., not sending empty request elements more * appropriate behavior (e.g., not sending empty request elements more often than desired, and
* often than desired, and ensuring that the periods with no requests pending are * ensuring that the periods with no requests pending are never too long).
* never too long).
* *
* @return the maximum allowable period over which a client can send empty requests to the * @return the maximum allowable period over which a client can send empty requests to the
* server. * server.
*/ */
public int getPollingInterval() { public int getPollingInterval() {
return JiveGlobals.getIntProperty("xmpp.httpbind.client.requests.polling", 5); return JiveGlobals.getIntProperty("xmpp.httpbind.client.requests.polling", 5);
} }
/** /**
* Wildfire MAY limit the number of simultaneous requests the client makes with * Wildfire MAY limit the number of simultaneous requests the client makes with the 'requests'
* the 'requests' attribute. The RECOMMENDED value is "2". Servers that only support polling * attribute. The RECOMMENDED value is "2". Servers that only support polling behavior MUST
* behavior MUST prevent clients from making simultaneous requests by setting the 'requests' * prevent clients from making simultaneous requests by setting the 'requests' attribute to a
* attribute to a value of "1" (however, polling is NOT RECOMMENDED). In any case, clients MUST * value of "1" (however, polling is NOT RECOMMENDED). In any case, clients MUST NOT make more
* NOT make more simultaneous requests than specified by the Wildfire. * simultaneous requests than specified by the Wildfire.
* *
* @return the number of simultaneous requests allowable. * @return the number of simultaneous requests allowable.
*/ */
...@@ -180,12 +182,12 @@ public class HttpSessionManager { ...@@ -180,12 +182,12 @@ public class HttpSessionManager {
} }
/** /**
* Seconds a session has to be idle to be closed. Default is 30 minutes. Sending * Seconds a session has to be idle to be closed. Default is 30 minutes. Sending stanzas to the
* stanzas to the client is not considered as activity. We are only considering the connection * client is not considered as activity. We are only considering the connection active when the
* active when the client sends some data or hearbeats (i.e. whitespaces) to the server. * client sends some data or hearbeats (i.e. whitespaces) to the server. The reason for this is
* The reason for this is that sending data will fail if the connection is closed. And if * that sending data will fail if the connection is closed. And if the thread is blocked while
* the thread is blocked while sending data (because the socket is closed) then the clean up * sending data (because the socket is closed) then the clean up thread will close the socket
* thread will close the socket anyway. * anyway.
* *
* @return Seconds a session has to be idle to be closed. * @return Seconds a session has to be idle to be closed.
*/ */
...@@ -205,6 +207,7 @@ public class HttpSessionManager { ...@@ -205,6 +207,7 @@ public class HttpSessionManager {
* was not. * was not.
* @param rootNode the XML body of the request. * @param rootNode the XML body of the request.
* @return the created HTTP connection. * @return the created HTTP connection.
*
* @throws HttpBindException for several reasons: if the encoding inside of an auth packet is * @throws HttpBindException for several reasons: if the encoding inside of an auth packet is
* not recognized by the server, or if the packet type is not recognized. * not recognized by the server, or if the packet type is not recognized.
* @throws HttpConnectionClosedException if the session is no longer available. * @throws HttpConnectionClosedException if the session is no longer available.
...@@ -215,22 +218,11 @@ public class HttpSessionManager { ...@@ -215,22 +218,11 @@ public class HttpSessionManager {
{ {
//noinspection unchecked //noinspection unchecked
List<Element> elements = rootNode.elements(); List<Element> elements = rootNode.elements();
boolean isPoll = elements.size() <= 0; HttpConnection connection = session.createConnection(rid, elements, isSecure);
HttpConnection connection = session.createConnection(rid, isPoll, isSecure); if (elements.size() > 0) {
SessionPacketRouter router = new SessionPacketRouter(session); // creates the runnable to forward the packets
new HttpPacketSender(session).init();
for (Element packet : elements) {
try {
router.route(packet);
}
catch (UnsupportedEncodingException e) {
throw new HttpBindException("Bad auth request, unknown encoding", true, 400);
}
catch (UnknownStanzaException e) {
throw new HttpBindException("Unknown packet type.", false, 400);
}
} }
return connection; return connection;
} }
...@@ -246,7 +238,7 @@ public class HttpSessionManager { ...@@ -246,7 +238,7 @@ public class HttpSessionManager {
} }
private static int getIntAttribute(String value, int defaultValue) { private static int getIntAttribute(String value, int defaultValue) {
if(value == null || "".equals(value)) { if (value == null || "".equals(value)) {
return defaultValue; return defaultValue;
} }
try { try {
...@@ -270,7 +262,7 @@ public class HttpSessionManager { ...@@ -270,7 +262,7 @@ public class HttpSessionManager {
response.addAttribute("wait", String.valueOf(session.getWait())); response.addAttribute("wait", String.valueOf(session.getWait()));
Element features = response.addElement("stream:features"); Element features = response.addElement("stream:features");
for(Element feature : session.getAvailableStreamFeaturesElements()) { for (Element feature : session.getAvailableStreamFeaturesElements()) {
features.add(feature); features.add(feature);
} }
...@@ -280,8 +272,8 @@ public class HttpSessionManager { ...@@ -280,8 +272,8 @@ public class HttpSessionManager {
private class HttpSessionReaper extends TimerTask { private class HttpSessionReaper extends TimerTask {
public void run() { public void run() {
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
for(HttpSession session : sessionMap.values()) { for (HttpSession session : sessionMap.values()) {
long lastActive = (currentTime - session.getLastActivity()) / 1000; long lastActive = (currentTime - session.getLastActivity()) / 1000;
if (lastActive > session.getInactivityTimeout()) { if (lastActive > session.getInactivityTimeout()) {
session.close(); session.close();
...@@ -289,4 +281,54 @@ public class HttpSessionManager { ...@@ -289,4 +281,54 @@ public class HttpSessionManager {
} }
} }
} }
/**
* A runner that gurantees that the packets per a session will be sent and processed in the
* order in which they were received.
*/
private class HttpPacketSender implements Runnable {
private HttpSession session;
HttpPacketSender(HttpSession session) {
this.session = session;
}
public void run() {
Collection<Element> elements = null;
try {
elements = session.getPacketsToSend(20, TimeUnit.MILLISECONDS);
}
catch (Throwable t) {
/** Do nothing **/
}
if (elements == null) {
this.init();
return;
}
SessionPacketRouter router = new SessionPacketRouter(session);
try {
for (Element packet : elements) {
try {
router.route(packet);
}
catch (UnsupportedEncodingException e) {
Log.error("Client provided unsupported encoding type in auth request", e);
}
catch (UnknownStanzaException e) {
Log.error("Client provided unknown packet type", e);
}
}
}
finally {
session.releasePacketsToSend();
}
}
private void init() {
sendPacketPool.execute(this);
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment