Commit 505ce1e8 authored by Alex Wenckus's avatar Alex Wenckus Committed by alex

Removed the semaphore used in sending packets through HTTP binding. JM-1022

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@7986 b35dd754-fafc-0310-a699-88a17e54d16e
parent c0ac0ae8
......@@ -17,6 +17,8 @@ import org.dom4j.QName;
import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.PacketDeliverer;
import org.jivesoftware.openfire.SessionPacketRouter;
import org.jivesoftware.openfire.multiplex.UnknownStanzaException;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.net.SASLAuthentication;
import org.jivesoftware.openfire.net.VirtualConnection;
......@@ -27,8 +29,7 @@ import org.xmpp.packet.Packet;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.io.UnsupportedEncodingException;
/**
* A session represents a serious of interactions with an XMPP client sending packets using the HTTP
......@@ -58,7 +59,7 @@ public class HttpSession extends ClientSession {
private final Queue<Collection<Element>> packetsToSend = new LinkedList<Collection<Element>>();
// Semaphore which protects the packets to send, so, there can only be one consumer at a time.
private Semaphore packetsToSendSemaphore = new Semaphore(1);
private SessionPacketRouter router;
private static final Comparator<HttpConnection> connectionComparator
= new Comparator<HttpConnection>() {
......@@ -367,42 +368,38 @@ public class HttpSession extends ClientSession {
}
/**
* Returns the next set of packets to be sent. This method is meant to be used in the
* producer-consumer model to prevent deadlocks when processing incoming packets from the
* session. The internal packet queue is protected so that only one collection of packets can be
* sent at a time. This method will return null if packets are currently being consumed or there
* is no collection of packets waiting to be sent. After done consuming the packets {@link
* #releasePacketsToSend()} needs to be called.
*
* @param time the quantity of time to wait for the queue to be free for consumption
* @param timeUnit the unit of time related to the quanity
* @return a collection of packets to be sent.
* @throws HttpConnectionClosedException when the session has been closed and is no longer
* available to send packets.
* This methods sends any pending packets in the session. If no packets are
* pending, this method simply returns. The method is internally synchronized
* to avoid simultanious sending operations on this Session. If two
* threads try to run this method simultaniously, the first one will trigger
* the pending packets to be sent, while the second one will simply return
* (as there are no packets left to send).
*/
public Collection<Element> getPacketsToSend(long time, TimeUnit timeUnit)
throws HttpConnectionClosedException
{
if(isClosed) {
throw new HttpConnectionClosedException("the connection has been closed");
protected void sendPendingPackets() {
// access blocked only on send to prevent deadlocks
synchronized (packetsToSend) {
if (packetsToSend.size() <= 0) {
return;
}
if (router == null) {
router = new SessionPacketRouter(this);
}
for (Element packet : packetsToSend.remove()) {
try {
if (!packetsToSendSemaphore.tryAcquire(time, timeUnit)
|| packetsToSend.size() <= 0) {
return null;
router.route(packet);
}
catch (UnsupportedEncodingException e) {
Log.error(
"Client provided unsupported encoding type in auth request",
e);
}
catch (UnknownStanzaException e) {
Log.error("Client provided unknown packet type", e);
}
catch (InterruptedException e) {
return null;
}
return packetsToSend.remove();
}
/**
* Releases the lock on the send packets queue so that other threads may access it.
*/
public void releasePacketsToSend() {
packetsToSendSemaphore.release();
}
/**
......@@ -609,7 +606,6 @@ public class HttpSession extends ClientSession {
listener.sessionClosed(this);
}
this.listeners.clear();
this.packetsToSendSemaphore = null;
}
private void failDelivery() {
......
......@@ -283,8 +283,8 @@ public class HttpSessionManager {
}
/**
* A runner that gurantees that the packets per a session will be sent and processed in the
* order in which they were received.
* A runner that gurantees that the packets per a session will be sent and
* processed in the order in which they were received.
*/
private class HttpPacketSender implements Runnable {
private HttpSession session;
......@@ -294,41 +294,7 @@ public class HttpSessionManager {
}
public void run() {
Collection<Element> elements = null;
try {
elements = session.getPacketsToSend(20, TimeUnit.MILLISECONDS);
}
catch(HttpConnectionClosedException he) {
/** the session has been closed **/
return;
}
catch (Throwable t) {
/** Do nothing **/
}
if (elements == null) {
this.init();
return;
}
SessionPacketRouter router = new SessionPacketRouter(session);
try {
for (Element packet : elements) {
try {
router.route(packet);
}
catch (UnsupportedEncodingException e) {
Log.error("Client provided unsupported encoding type in auth request", e);
}
catch (UnknownStanzaException e) {
Log.error("Client provided unknown packet type", e);
}
}
}
finally {
session.releasePacketsToSend();
}
session.sendPendingPackets();
}
private void init() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment