Commit a5dff28d authored by Dave Cridland's avatar Dave Cridland

Merge pull request #156 from tevans/OF-857

OF-857: Synchronize connection/session cleanup
parents 111fe941 32a1c171
...@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; ...@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Message; import org.xmpp.packet.Message;
import org.xmpp.packet.PacketError; import org.xmpp.packet.PacketError;
import org.xmpp.packet.PacketExtension;
/** /**
* Controls what is done with offline messages. * Controls what is done with offline messages.
...@@ -80,9 +81,11 @@ public class OfflineMessageStrategy extends BasicModule { ...@@ -80,9 +81,11 @@ public class OfflineMessageStrategy extends BasicModule {
public void storeOffline(Message message) { public void storeOffline(Message message) {
if (message != null) { if (message != null) {
// Do nothing if the message was sent to the server itself, an anonymous user or a non-existent user // Do nothing if the message was sent to the server itself, an anonymous user or a non-existent user
// Also ignore message carbons
JID recipientJID = message.getTo(); JID recipientJID = message.getTo();
if (recipientJID == null || serverAddress.equals(recipientJID) || if (recipientJID == null || serverAddress.equals(recipientJID) ||
recipientJID.getNode() == null || recipientJID.getNode() == null ||
message.getExtension("received", "urn:xmpp:carbons:2") != null ||
!UserManager.getInstance().isRegisteredUser(recipientJID.getNode())) { !UserManager.getInstance().isRegisteredUser(recipientJID.getNode())) {
return; return;
} }
......
...@@ -641,10 +641,12 @@ public class SessionManager extends BasicModule implements ClusterEventListener ...@@ -641,10 +641,12 @@ public class SessionManager extends BasicModule implements ClusterEventListener
JID searchJID = new JID(originatingResource.getNode(), originatingResource.getDomain(), null); JID searchJID = new JID(originatingResource.getNode(), originatingResource.getDomain(), null);
List<JID> addresses = routingTable.getRoutes(searchJID, null); List<JID> addresses = routingTable.getRoutes(searchJID, null);
for (JID address : addresses) { for (JID address : addresses) {
// Send the presence of the session whose presence has changed to if (!originatingResource.equals(address)) {
// this other user's session // Send the presence of the session whose presence has changed to
presence.setTo(address); // this user's other session(s)
routingTable.routePacket(address, presence, false); presence.setTo(address);
routingTable.routePacket(address, presence, false);
}
} }
} }
......
...@@ -50,7 +50,7 @@ public class ClientConnectionHandler extends ConnectionHandler { ...@@ -50,7 +50,7 @@ public class ClientConnectionHandler extends ConnectionHandler {
@Override @Override
NIOConnection createNIOConnection(IoSession session) { NIOConnection createNIOConnection(IoSession session) {
return new NIOConnection(session, XMPPServer.getInstance().getPacketDeliverer()); return new NIOConnection(session, new OfflinePacketDeliverer());
} }
@Override @Override
......
...@@ -203,25 +203,22 @@ public class NIOConnection implements Connection { ...@@ -203,25 +203,22 @@ public class NIOConnection implements Connection {
} }
public void close() { public void close() {
boolean closedSuccessfully = false; synchronized(this) {
synchronized (this) { if (isClosed()) {
if (!isClosed()) { return;
try { }
deliverRawText(flashClient ? "</flash:stream>" : "</stream:stream>", false); try {
} catch (Exception e) { deliverRawText(flashClient ? "</flash:stream>" : "</stream:stream>", false);
// Ignore } catch (Exception e) {
} // Ignore
if (session != null) {
session.setStatus(Session.STATUS_CLOSED);
}
ioSession.close(false);
closed = true;
closedSuccessfully = true;
} }
} if (session != null) {
if (closedSuccessfully) { session.setStatus(Session.STATUS_CLOSED);
notifyCloseListeners(); }
} closed = true;
notifyCloseListeners(); // clean up session, etc.
}
ioSession.close(false); // async via MINA
} }
public void systemShutdown() { public void systemShutdown() {
...@@ -248,11 +245,8 @@ public class NIOConnection implements Connection { ...@@ -248,11 +245,8 @@ public class NIOConnection implements Connection {
session = owner; session = owner;
} }
public boolean isClosed() { public synchronized boolean isClosed() {
if (session == null) { return closed;
return closed;
}
return session.getStatus() == Session.STATUS_CLOSED;
} }
public boolean isSecure() { public boolean isSecure() {
...@@ -261,7 +255,15 @@ public class NIOConnection implements Connection { ...@@ -261,7 +255,15 @@ public class NIOConnection implements Connection {
public void deliver(Packet packet) throws UnauthorizedException { public void deliver(Packet packet) throws UnauthorizedException {
if (isClosed()) { if (isClosed()) {
backupDeliverer.deliver(packet); // OF-857: Do not allow the backup deliverer to recurse
if (backupDeliverer == null) {
Log.error("Failed to deliver packet: " + packet.toXML());
throw new IllegalStateException("Connection closed");
}
// attempt to deliver via backup only once
PacketDeliverer backup = backupDeliverer;
backupDeliverer = null;
backup.deliver(packet);
} }
else { else {
boolean errorDelivering = false; boolean errorDelivering = false;
......
/**
* Copyright (C) 2005-2015 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.openfire.nio;
import org.jivesoftware.openfire.OfflineMessageStrategy;
import org.jivesoftware.openfire.PacketDeliverer;
import org.jivesoftware.openfire.PacketException;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.util.LocaleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.IQ;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import org.xmpp.packet.Presence;
/**
* Fallback method used by {@link org.jivesoftware.openfire.nio.NIOConnection} when a
* connection fails to send a {@link Packet} (likely because it was closed). Message packets
* will be stored offline for later retrieval. IQ and Presence packets are dropped.<p>
*
* @author Tom Evans
*/
public class OfflinePacketDeliverer implements PacketDeliverer {
private static final Logger Log = LoggerFactory.getLogger(OfflinePacketDeliverer.class);
private OfflineMessageStrategy messageStrategy;
public OfflinePacketDeliverer() {
this.messageStrategy = XMPPServer.getInstance().getOfflineMessageStrategy();
}
@Override
public void deliver(Packet packet) throws UnauthorizedException, PacketException {
if (packet instanceof Message) {
messageStrategy.storeOffline((Message) packet);
}
else if (packet instanceof Presence) {
// presence packets are dropped silently
}
else if (packet instanceof IQ) {
// IQ packets are logged before being dropped
Log.warn(LocaleUtils.getLocalizedString("admin.error.routing") + "\n" + packet.toString());
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment