Commit b8d2410f authored by Alex Wenckus's avatar Alex Wenckus Committed by alex

Recovering undelivered packets from connections when the session is closed. JM-971

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@7061 b35dd754-fafc-0310-a699-88a17e54d16e
parent 52fa91b1
...@@ -546,7 +546,8 @@ public class SessionManager extends BasicModule { ...@@ -546,7 +546,8 @@ public class SessionManager extends BasicModule {
if (serverName == null) { if (serverName == null) {
throw new UnauthorizedException("Server not initialized"); throw new UnauthorizedException("Server not initialized");
} }
HttpSession session = new HttpSession(serverName, address, id, rid); PacketDeliverer backupDeliverer = XMPPServer.getInstance().getPacketDeliverer();
HttpSession session = new HttpSession(backupDeliverer, serverName, address, id, rid);
Connection conn = session.getConnection(); Connection conn = session.getConnection();
conn.init(session); conn.init(session);
conn.registerCloseListener(clientSessionListener, session); conn.registerCloseListener(clientSessionListener, session);
......
...@@ -27,6 +27,7 @@ public class HttpConnection { ...@@ -27,6 +27,7 @@ public class HttpConnection {
private Continuation continuation; private Continuation continuation;
private boolean isClosed; private boolean isClosed;
private boolean isSecure = false; private boolean isSecure = false;
private boolean isDelivered;
/** /**
* Constructs an HTTP Connection. * Constructs an HTTP Connection.
...@@ -37,6 +38,7 @@ public class HttpConnection { ...@@ -37,6 +38,7 @@ public class HttpConnection {
public HttpConnection(long requestId, boolean isSecure) { public HttpConnection(long requestId, boolean isSecure) {
this.requestId = requestId; this.requestId = requestId;
this.isSecure = isSecure; this.isSecure = isSecure;
this.isDelivered = false;
} }
/** /**
...@@ -74,6 +76,10 @@ public class HttpConnection { ...@@ -74,6 +76,10 @@ public class HttpConnection {
return isSecure; return isSecure;
} }
public boolean isDelivered() {
return isDelivered;
}
/** /**
* Delivers content to the client. The content should be valid XMPP wrapped inside of a body. * Delivers content to the client. The content should be valid XMPP wrapped inside of a body.
* A <i>null</i> value for body indicates that the connection should be closed and the client * A <i>null</i> value for body indicates that the connection should be closed and the client
...@@ -162,11 +168,13 @@ public class HttpConnection { ...@@ -162,11 +168,13 @@ public class HttpConnection {
if (continuation.suspend(session.getWait() * 1000)) { if (continuation.suspend(session.getWait() * 1000)) {
String deliverable = (String) continuation.getObject(); String deliverable = (String) continuation.getObject();
// This will occur when the hold attribute of a session has been exceded. // This will occur when the hold attribute of a session has been exceded.
this.isDelivered = true;
if (deliverable == null) { if (deliverable == null) {
throw new HttpBindTimeoutException(); throw new HttpBindTimeoutException();
} }
return deliverable; return deliverable;
} }
this.isDelivered = true;
throw new HttpBindTimeoutException("Request " + requestId + " exceded response time from " + throw new HttpBindTimeoutException("Request " + requestId + " exceded response time from " +
"server of " + session.getWait() + " seconds."); "server of " + session.getWait() + " seconds.");
} }
......
...@@ -17,13 +17,12 @@ import org.dom4j.Namespace; ...@@ -17,13 +17,12 @@ import org.dom4j.Namespace;
import org.dom4j.QName; import org.dom4j.QName;
import org.jivesoftware.wildfire.Connection; import org.jivesoftware.wildfire.Connection;
import org.jivesoftware.wildfire.StreamID; import org.jivesoftware.wildfire.StreamID;
import org.jivesoftware.wildfire.XMPPServer; import org.jivesoftware.wildfire.PacketDeliverer;
import org.jivesoftware.wildfire.auth.UnauthorizedException; import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.net.SASLAuthentication; import org.jivesoftware.wildfire.net.SASLAuthentication;
import org.jivesoftware.wildfire.net.VirtualConnection; import org.jivesoftware.wildfire.net.VirtualConnection;
import org.jivesoftware.wildfire.session.ClientSession; import org.jivesoftware.wildfire.session.ClientSession;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import java.net.InetAddress; import java.net.InetAddress;
...@@ -44,7 +43,7 @@ public class HttpSession extends ClientSession { ...@@ -44,7 +43,7 @@ public class HttpSession extends ClientSession {
private String language; private String language;
private final List<HttpConnection> connectionQueue = new LinkedList<HttpConnection>(); private final List<HttpConnection> connectionQueue = new LinkedList<HttpConnection>();
private final List<Deliverable> pendingElements = new ArrayList<Deliverable>(); private final List<Deliverable> pendingElements = new ArrayList<Deliverable>();
private final List<Deliverable> sentElements = new ArrayList<Deliverable>(); private final List<Delivered> sentElements = new ArrayList<Delivered>();
private boolean isSecure; private boolean isSecure;
private int maxPollingInterval; private int maxPollingInterval;
private long lastPoll = -1; private long lastPoll = -1;
...@@ -54,6 +53,7 @@ public class HttpSession extends ClientSession { ...@@ -54,6 +53,7 @@ public class HttpSession extends ClientSession {
private long lastActivity; private long lastActivity;
private long lastRequestID; private long lastRequestID;
private int maxRequests; private int maxRequests;
private PacketDeliverer backupDeliverer;
private static final Comparator<HttpConnection> connectionComparator private static final Comparator<HttpConnection> connectionComparator
= new Comparator<HttpConnection>() { = new Comparator<HttpConnection>() {
...@@ -62,11 +62,13 @@ public class HttpSession extends ClientSession { ...@@ -62,11 +62,13 @@ public class HttpSession extends ClientSession {
} }
}; };
public HttpSession(String serverName, InetAddress address, StreamID streamID, long rid) { public HttpSession(PacketDeliverer backupDeliverer, String serverName, InetAddress address,
StreamID streamID, long rid) {
super(serverName, null, streamID); super(serverName, null, streamID);
conn = new HttpVirtualConnection(address); conn = new HttpVirtualConnection(address);
this.lastActivity = System.currentTimeMillis(); this.lastActivity = System.currentTimeMillis();
this.lastRequestID = rid; this.lastRequestID = rid;
this.backupDeliverer = backupDeliverer;
} }
/** /**
...@@ -313,9 +315,9 @@ public class HttpSession extends ClientSession { ...@@ -313,9 +315,9 @@ public class HttpSession extends ClientSession {
return lastActivity; return lastActivity;
} }
else { else {
for(HttpConnection connection : connectionQueue) { for (HttpConnection connection : connectionQueue) {
// The session is currently active, return the current time. // The session is currently active, return the current time.
if(!connection.isClosed()) { if (!connection.isClosed()) {
return System.currentTimeMillis(); return System.currentTimeMillis();
} }
} }
...@@ -328,10 +330,10 @@ public class HttpSession extends ClientSession { ...@@ -328,10 +330,10 @@ public class HttpSession extends ClientSession {
public String getResponse(long requestID) throws HttpBindException { public String getResponse(long requestID) throws HttpBindException {
for (HttpConnection connection : connectionQueue) { for (HttpConnection connection : connectionQueue) {
if (connection.getRequestId() == requestID) { if (connection.getRequestId() == requestID) {
if(requestID > lastRequestID + 1) { if (requestID > lastRequestID + 1) {
throw new HttpBindException("Invalid RID error.", true, 404); throw new HttpBindException("Invalid RID error.", true, 404);
} }
if(requestID > lastRequestID) { if (requestID > lastRequestID) {
lastRequestID = connection.getRequestId(); lastRequestID = connection.getRequestId();
} }
String response = getResponse(connection); String response = getResponse(connection);
...@@ -387,12 +389,12 @@ public class HttpSession extends ClientSession { ...@@ -387,12 +389,12 @@ public class HttpSession extends ClientSession {
throws HttpConnectionClosedException, HttpBindException { throws HttpConnectionClosedException, HttpBindException {
HttpConnection connection = new HttpConnection(rid, isSecure); HttpConnection connection = new HttpConnection(rid, isSecure);
if (rid <= lastRequestID) { if (rid <= lastRequestID) {
Deliverable deliverable = retrieveDeliverable(rid); Delivered deliverable = retrieveDeliverable(rid);
if (deliverable == null) { if (deliverable == null) {
Log.warn("Deliverable unavailable for " + rid); Log.warn("Deliverable unavailable for " + rid);
throw new HttpBindException("Unexpected RID Error", true, 404); throw new HttpBindException("Unexpected RID Error", true, 404);
} }
connection.deliverBody(deliverable.getDeliverable()); connection.deliverBody(createDeliverable(deliverable.deliverables));
return connection; return connection;
} }
else if (rid > (lastRequestID + hold + 1)) { else if (rid > (lastRequestID + hold + 1)) {
...@@ -405,8 +407,8 @@ public class HttpSession extends ClientSession { ...@@ -405,8 +407,8 @@ public class HttpSession extends ClientSession {
return connection; return connection;
} }
private Deliverable retrieveDeliverable(long rid) throws HttpBindException { private Delivered retrieveDeliverable(long rid) {
for (Deliverable delivered : sentElements) { for (Delivered delivered : sentElements) {
if (delivered.getRequestID() == rid) { if (delivered.getRequestID() == rid) {
return delivered; return delivered;
} }
...@@ -434,9 +436,8 @@ public class HttpSession extends ClientSession { ...@@ -434,9 +436,8 @@ public class HttpSession extends ClientSession {
// to be sent to the client. // to be sent to the client.
if (hold <= 0 || (pendingElements.size() > 0 && connection.getRequestId() if (hold <= 0 || (pendingElements.size() > 0 && connection.getRequestId()
== lastRequestID + 1)) { == lastRequestID + 1)) {
String deliverable = createDeliverable(pendingElements);
try { try {
deliver(connection, deliverable); deliver(connection, pendingElements);
lastRequestID = connection.getRequestId(); lastRequestID = connection.getRequestId();
pendingElements.clear(); pendingElements.clear();
} }
...@@ -461,11 +462,11 @@ public class HttpSession extends ClientSession { ...@@ -461,11 +462,11 @@ public class HttpSession extends ClientSession {
fireConnectionOpened(connection); fireConnectionOpened(connection);
} }
private void deliver(HttpConnection connection, String deliverable) private void deliver(HttpConnection connection, Collection<Deliverable> deliverable)
throws HttpConnectionClosedException { throws HttpConnectionClosedException {
connection.deliverBody(deliverable); connection.deliverBody(createDeliverable(deliverable));
Deliverable delivered = new Deliverable(deliverable); Delivered delivered = new Delivered(deliverable);
delivered.setRequestID(connection.getRequestId()); delivered.setRequestID(connection.getRequestId());
while (sentElements.size() > hold) { while (sentElements.size() > hold) {
sentElements.remove(0); sentElements.remove(0);
...@@ -496,11 +497,11 @@ public class HttpSession extends ClientSession { ...@@ -496,11 +497,11 @@ public class HttpSession extends ClientSession {
} }
private synchronized void deliver(Packet stanza) { private synchronized void deliver(Packet stanza) {
deliver(new Deliverable(stanza)); deliver(new Deliverable(Arrays.asList(stanza)));
} }
private void deliver(Deliverable stanza) { private void deliver(Deliverable stanza) {
String deliverable = createDeliverable(Arrays.asList(stanza)); Collection<Deliverable> deliverable = Arrays.asList(stanza);
boolean delivered = false; boolean delivered = false;
for (HttpConnection connection : connectionQueue) { for (HttpConnection connection : connectionQueue) {
try { try {
...@@ -548,11 +549,6 @@ public class HttpSession extends ClientSession { ...@@ -548,11 +549,6 @@ public class HttpSession extends ClientSession {
failDelivery(); failDelivery();
} }
for (HttpConnection toClose : connectionQueue) {
toClose.close();
fireConnectionClosed(toClose);
}
for (SessionListener listener : listeners) { for (SessionListener listener : listeners) {
listener.sessionClosed(this); listener.sessionClosed(this);
} }
...@@ -561,15 +557,41 @@ public class HttpSession extends ClientSession { ...@@ -561,15 +557,41 @@ public class HttpSession extends ClientSession {
private void failDelivery() { private void failDelivery() {
for (Deliverable deliverable : pendingElements) { for (Deliverable deliverable : pendingElements) {
Packet packet = deliverable.packet; Collection<Packet> packet = deliverable.packets;
if (packet != null && packet instanceof Message) { if (packet != null) {
XMPPServer.getInstance().getOfflineMessageStrategy() failDelivery(packet);
.storeOffline((Message) packet); }
}
for (HttpConnection toClose : connectionQueue) {
if (!toClose.isDelivered()) {
Delivered delivered = retrieveDeliverable(toClose.getRequestId());
if (delivered != null) {
failDelivery(delivered.getPackets());
} }
else {
Log.warn("Packets could not be found for session " + getStreamID() + " cannot" +
"be delivered to client");
}
}
toClose.close();
fireConnectionClosed(toClose);
} }
pendingElements.clear(); pendingElements.clear();
} }
private void failDelivery(Collection<Packet> packets) {
for (Packet packet : packets) {
try {
backupDeliverer.deliver(packet);
}
catch (UnauthorizedException e) {
Log.error("Unable to deliver message to backup deliverer", e);
}
}
}
private static String createEmptyBody() { private static String createEmptyBody() {
Element body = DocumentHelper.createElement("body"); Element body = DocumentHelper.createElement("body");
body.addNamespace("", "http://jabber.org/protocol/httpbind"); body.addNamespace("", "http://jabber.org/protocol/httpbind");
...@@ -611,22 +633,29 @@ public class HttpSession extends ClientSession { ...@@ -611,22 +633,29 @@ public class HttpSession extends ClientSession {
private class Deliverable implements Comparable<Deliverable> { private class Deliverable implements Comparable<Deliverable> {
private final String text; private final String text;
private final Packet packet; private final Collection<Packet> packets;
private long requestID; private long requestID;
public Deliverable(String text) { public Deliverable(String text) {
this.text = text; this.text = text;
this.packet = null; this.packets = null;
} }
public Deliverable(Packet element) { public Deliverable(Collection<Packet> elements) {
this.text = null; this.text = null;
this.packet = element.createCopy(); this.packets = new ArrayList<Packet>();
for (Packet element : elements) {
this.packets.add(element.createCopy());
}
} }
public String getDeliverable() { public String getDeliverable() {
if (text == null) { if (text == null) {
return packet.toXML(); StringBuilder builder = new StringBuilder();
for (Packet packet : packets) {
builder.append(packet.toXML());
}
return builder.toString();
} }
else { else {
return text; return text;
...@@ -645,4 +674,31 @@ public class HttpSession extends ClientSession { ...@@ -645,4 +674,31 @@ public class HttpSession extends ClientSession {
return (int) (o.getRequestID() - requestID); return (int) (o.getRequestID() - requestID);
} }
} }
private class Delivered {
private long requestID;
private Collection<Deliverable> deliverables;
public Delivered(Collection<Deliverable> deliverables) {
this.deliverables = deliverables;
}
public void setRequestID(long requestID) {
this.requestID = requestID;
}
public long getRequestID() {
return requestID;
}
public Collection<Packet> getPackets() {
List<Packet> packets = new ArrayList<Packet>();
for (Deliverable deliverable : deliverables) {
if (deliverable.packets != null) {
packets.addAll(deliverable.packets);
}
}
return packets;
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment