Commit bd3c4791 authored by Alex Wenckus's avatar Alex Wenckus Committed by alex

Fixed http-binding deadlock when sending packets. JM-1001

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@7553 b35dd754-fafc-0310-a699-88a17e54d16e
parent 9d7d7b97
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
......@@ -28,6 +27,8 @@ import org.xmpp.packet.Packet;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* A session represents a serious of interactions with an XMPP client sending packets using the HTTP
......@@ -55,6 +56,10 @@ public class HttpSession extends ClientSession {
private int maxRequests;
private PacketDeliverer backupDeliverer;
private final Queue<Collection<Element>> packetsToSend = new LinkedList<Collection<Element>>();
// Semaphore which protects the packets to send, so, there can only be one consumer at a time.
private final Semaphore packetsToSendSemaphore = new Semaphore(1);
private static final Comparator<HttpConnection> connectionComparator
= new Comparator<HttpConnection>() {
public int compare(HttpConnection o1, HttpConnection o2) {
......@@ -294,15 +299,6 @@ public class HttpSession extends ClientSession {
return inactivityTimeout;
}
/**
* Returns the number of connections currently awaiting a response on this session.
*
* @return the number of connections currently awaiting a response on this session.
*/
public int getConnectionCount() {
return connectionQueue.size();
}
/**
* Returns the time in milliseconds since the epoch that this session was last active. Activity
* is a request was either made or responded to. If the session is currently active, meaning
......@@ -350,12 +346,12 @@ public class HttpSession extends ClientSession {
}
catch (HttpBindTimeoutException e) {
// This connection timed out we need to increment the request count
if(connection.getRequestId() != lastRequestID + 1) {
if (connection.getRequestId() != lastRequestID + 1) {
throw new HttpBindException("Unexpected RID error.", true, 404);
}
lastRequestID = connection.getRequestId();
}
if(response == null) {
if (response == null) {
response = createEmptyBody();
}
return response;
......@@ -370,13 +366,45 @@ public class HttpSession extends ClientSession {
this.isSecure = isSecure;
}
/**
* Returns the next set of packets to be sent. This method is meant to be used in the
* producer-consumer model to prevent deadlocks when processing incoming packets from the
* session. The internal packet queue is protected so that only one collection of packets can be
* sent at a time. This method will return null if packets are currently being consumed or there
* is no collection of packets waiting to be sent. After done consuming the packets {@link
* #releasePacketsToSend()} needs to be called.
*
* @param time the quantity of time to wait for the queue to be free for consumption
* @param timeUnit the unit of time related to the quanity
* @return a collection of packets to be sent.
*/
public Collection<Element> getPacketsToSend(long time, TimeUnit timeUnit) {
try {
if (!packetsToSendSemaphore.tryAcquire(time, timeUnit)
|| packetsToSend.size() <= 0) {
return null;
}
}
catch (InterruptedException e) {
return null;
}
return packetsToSend.remove();
}
/**
* Releases the lock on the send packets queue so that other threads may access it.
*/
public void releasePacketsToSend() {
packetsToSendSemaphore.release();
}
/**
* Creates a new connection on this session. If a response is currently available for this
* session the connection is responded to immediately, otherwise it is queued awaiting a
* response.
*
* @param rid the request id related to the connection.
* @param isPoll true if the request was a poll, no packets were sent along with the request.
* @param packetsToBeSent any packets that this connection should send.
* @param isSecure true if the connection was secured using HTTPS.
* @return the created {@link org.jivesoftware.wildfire.http.HttpConnection} which represents
* the connection.
......@@ -386,8 +414,10 @@ public class HttpSession extends ClientSession {
* @throws HttpBindException if the connection has violated a facet of the HTTP binding
* protocol.
*/
HttpConnection createConnection(long rid, boolean isPoll, boolean isSecure)
throws HttpConnectionClosedException, HttpBindException {
synchronized HttpConnection createConnection(long rid, Collection<Element> packetsToBeSent,
boolean isSecure)
throws HttpConnectionClosedException, HttpBindException
{
HttpConnection connection = new HttpConnection(rid, isSecure);
if (rid <= lastRequestID) {
Delivered deliverable = retrieveDeliverable(rid);
......@@ -404,7 +434,10 @@ public class HttpSession extends ClientSession {
throw new HttpBindException("Unexpected RID Error", true, 404);
}
addConnection(connection, isPoll);
if (packetsToBeSent.size() > 0) {
packetsToSend.add(packetsToBeSent);
}
addConnection(connection, packetsToBeSent.size() <= 0);
return connection;
}
......@@ -470,8 +503,8 @@ public class HttpSession extends ClientSession {
private int getOpenConnectionCount() {
int count = 0;
for(HttpConnection connection : connectionQueue) {
if(!connection.isClosed()) {
for (HttpConnection connection : connectionQueue) {
if (!connection.isClosed()) {
count++;
}
}
......
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
......@@ -24,6 +23,9 @@ import org.dom4j.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
......@@ -35,6 +37,7 @@ public class HttpSessionManager {
private SessionManager sessionManager;
private Map<String, HttpSession> sessionMap = new ConcurrentHashMap<String, HttpSession>();
private TimerTask inactivityTask;
private Executor sendPacketPool = Executors.newCachedThreadPool();
private SessionListener sessionListener = new SessionListener() {
public void connectionOpened(HttpSession session, HttpConnection connection) {
}
......@@ -69,7 +72,7 @@ public class HttpSessionManager {
*/
public void stop() {
inactivityTask.cancel();
for(HttpSession session : sessionMap.values()) {
for (HttpSession session : sessionMap.values()) {
session.close();
}
sessionMap.clear();
......@@ -90,10 +93,11 @@ public class HttpSessionManager {
*
* @param address the internet address that was used to bind to Wildfie.
* @param rootNode the body element that was sent containing the request for a new session.
* @param connection the HTTP connection object which abstracts the individual connections
* to Wildfire over the HTTP binding protocol. The initial session creation response is
* returned to this connection.
* @param connection the HTTP connection object which abstracts the individual connections to
* Wildfire over the HTTP binding protocol. The initial session creation response is returned to
* this connection.
* @return the created HTTP session.
*
* @throws UnauthorizedException if the Wildfire server is currently in an uninitialized state.
* Either shutting down or starting up.
* @throws HttpBindException when there is an internal server error related to the creation of
......@@ -101,13 +105,12 @@ public class HttpSessionManager {
*/
public HttpSession createSession(InetAddress address, Element rootNode,
HttpConnection connection)
throws UnauthorizedException, HttpBindException
{
throws UnauthorizedException, HttpBindException {
// TODO Check if IP address is allowed to connect to the server
// Default language is English ("en").
String language = rootNode.attributeValue("xml:lang");
if(language == null || "".equals(language)) {
if (language == null || "".equals(language)) {
language = "en";
}
......@@ -138,13 +141,13 @@ public class HttpSessionManager {
/**
* Returns the longest time (in seconds) that Wildfire is allowed to wait before
* responding to any request during the session. This enables the client to prevent its TCP
* connection from expiring due to inactivity, as well as to limit the delay before it
* discovers any network failure.
* Returns the longest time (in seconds) that Wildfire is allowed to wait before responding to
* any request during the session. This enables the client to prevent its TCP connection from
* expiring due to inactivity, as well as to limit the delay before it discovers any network
* failure.
*
* @return the longest time (in seconds) that Wildfire is allowed to wait before
* responding to any request during the session.
* @return the longest time (in seconds) that Wildfire is allowed to wait before responding to
* any request during the session.
*/
public int getMaxWait() {
return JiveGlobals.getIntProperty("xmpp.httpbind.client.requests.wait",
......@@ -152,12 +155,11 @@ public class HttpSessionManager {
}
/**
* Wildfire SHOULD include two additional attributes in the session creation
* response element, specifying the shortest allowable polling interval and the longest
* allowable inactivity period (both in seconds). Communication of these parameters enables
* the client to engage in appropriate behavior (e.g., not sending empty request elements more
* often than desired, and ensuring that the periods with no requests pending are
* never too long).
* Wildfire SHOULD include two additional attributes in the session creation response element,
* specifying the shortest allowable polling interval and the longest allowable inactivity
* period (both in seconds). Communication of these parameters enables the client to engage in
* appropriate behavior (e.g., not sending empty request elements more often than desired, and
* ensuring that the periods with no requests pending are never too long).
*
* @return the maximum allowable period over which a client can send empty requests to the
* server.
......@@ -167,11 +169,11 @@ public class HttpSessionManager {
}
/**
* Wildfire MAY limit the number of simultaneous requests the client makes with
* the 'requests' attribute. The RECOMMENDED value is "2". Servers that only support polling
* behavior MUST prevent clients from making simultaneous requests by setting the 'requests'
* attribute to a value of "1" (however, polling is NOT RECOMMENDED). In any case, clients MUST
* NOT make more simultaneous requests than specified by the Wildfire.
* Wildfire MAY limit the number of simultaneous requests the client makes with the 'requests'
* attribute. The RECOMMENDED value is "2". Servers that only support polling behavior MUST
* prevent clients from making simultaneous requests by setting the 'requests' attribute to a
* value of "1" (however, polling is NOT RECOMMENDED). In any case, clients MUST NOT make more
* simultaneous requests than specified by the Wildfire.
*
* @return the number of simultaneous requests allowable.
*/
......@@ -180,12 +182,12 @@ public class HttpSessionManager {
}
/**
* Seconds a session has to be idle to be closed. Default is 30 minutes. Sending
* stanzas to the client is not considered as activity. We are only considering the connection
* active when the client sends some data or hearbeats (i.e. whitespaces) to the server.
* The reason for this is that sending data will fail if the connection is closed. And if
* the thread is blocked while sending data (because the socket is closed) then the clean up
* thread will close the socket anyway.
* Seconds a session has to be idle to be closed. Default is 30 minutes. Sending stanzas to the
* client is not considered as activity. We are only considering the connection active when the
* client sends some data or hearbeats (i.e. whitespaces) to the server. The reason for this is
* that sending data will fail if the connection is closed. And if the thread is blocked while
* sending data (because the socket is closed) then the clean up thread will close the socket
* anyway.
*
* @return Seconds a session has to be idle to be closed.
*/
......@@ -205,6 +207,7 @@ public class HttpSessionManager {
* was not.
* @param rootNode the XML body of the request.
* @return the created HTTP connection.
*
* @throws HttpBindException for several reasons: if the encoding inside of an auth packet is
* not recognized by the server, or if the packet type is not recognized.
* @throws HttpConnectionClosedException if the session is no longer available.
......@@ -215,22 +218,11 @@ public class HttpSessionManager {
{
//noinspection unchecked
List<Element> elements = rootNode.elements();
boolean isPoll = elements.size() <= 0;
HttpConnection connection = session.createConnection(rid, isPoll, isSecure);
SessionPacketRouter router = new SessionPacketRouter(session);
for (Element packet : elements) {
try {
router.route(packet);
}
catch (UnsupportedEncodingException e) {
throw new HttpBindException("Bad auth request, unknown encoding", true, 400);
}
catch (UnknownStanzaException e) {
throw new HttpBindException("Unknown packet type.", false, 400);
HttpConnection connection = session.createConnection(rid, elements, isSecure);
if (elements.size() > 0) {
// creates the runnable to forward the packets
new HttpPacketSender(session).init();
}
}
return connection;
}
......@@ -246,7 +238,7 @@ public class HttpSessionManager {
}
private static int getIntAttribute(String value, int defaultValue) {
if(value == null || "".equals(value)) {
if (value == null || "".equals(value)) {
return defaultValue;
}
try {
......@@ -270,7 +262,7 @@ public class HttpSessionManager {
response.addAttribute("wait", String.valueOf(session.getWait()));
Element features = response.addElement("stream:features");
for(Element feature : session.getAvailableStreamFeaturesElements()) {
for (Element feature : session.getAvailableStreamFeaturesElements()) {
features.add(feature);
}
......@@ -281,7 +273,7 @@ public class HttpSessionManager {
public void run() {
long currentTime = System.currentTimeMillis();
for(HttpSession session : sessionMap.values()) {
for (HttpSession session : sessionMap.values()) {
long lastActive = (currentTime - session.getLastActivity()) / 1000;
if (lastActive > session.getInactivityTimeout()) {
session.close();
......@@ -289,4 +281,54 @@ public class HttpSessionManager {
}
}
}
/**
* A runner that gurantees that the packets per a session will be sent and processed in the
* order in which they were received.
*/
private class HttpPacketSender implements Runnable {
private HttpSession session;
HttpPacketSender(HttpSession session) {
this.session = session;
}
public void run() {
Collection<Element> elements = null;
try {
elements = session.getPacketsToSend(20, TimeUnit.MILLISECONDS);
}
catch (Throwable t) {
/** Do nothing **/
}
if (elements == null) {
this.init();
return;
}
SessionPacketRouter router = new SessionPacketRouter(session);
try {
for (Element packet : elements) {
try {
router.route(packet);
}
catch (UnsupportedEncodingException e) {
Log.error("Client provided unsupported encoding type in auth request", e);
}
catch (UnknownStanzaException e) {
Log.error("Client provided unknown packet type", e);
}
}
}
finally {
session.releasePacketsToSend();
}
}
private void init() {
sendPacketPool.execute(this);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment