Commit 0bc28bde authored by Dave Cridland's avatar Dave Cridland

Merge pull request #573 from guusdk/OF-1125

OF-1125: Should use StreamID class instead of String.
parents 85404465 2cc0a948
...@@ -89,11 +89,11 @@ class LocalSessionManager { ...@@ -89,11 +89,11 @@ class LocalSessionManager {
/** /**
* The sessions contained in this Map are server sessions originated by a remote server. These * The sessions contained in this Map are server sessions originated by a remote server. These
* sessions can only receive packets from the remote server but are not capable of sending * sessions can only receive packets from the remote server but are not capable of sending
* packets to the remote server. Sessions will be added to this collecion only after they were * packets to the remote server. Sessions will be added to this collection only after they were
* authenticated. * authenticated.
* Key: streamID, Value: the IncomingServerSession associated to the streamID. * Key: streamID, Value: the IncomingServerSession associated to the streamID.
*/ */
private final Map<String, LocalIncomingServerSession> incomingServerSessions = private final Map<StreamID, LocalIncomingServerSession> incomingServerSessions =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
...@@ -109,7 +109,7 @@ class LocalSessionManager { ...@@ -109,7 +109,7 @@ class LocalSessionManager {
return connnectionManagerSessions; return connnectionManagerSessions;
} }
public LocalIncomingServerSession getIncomingServerSession(String streamID) { public LocalIncomingServerSession getIncomingServerSession(StreamID streamID) {
return incomingServerSessions.get(streamID); return incomingServerSessions.get(streamID);
} }
...@@ -117,11 +117,11 @@ class LocalSessionManager { ...@@ -117,11 +117,11 @@ class LocalSessionManager {
return incomingServerSessions.values(); return incomingServerSessions.values();
} }
public void addIncomingServerSessions(String streamID, LocalIncomingServerSession session) { public void addIncomingServerSessions(StreamID streamID, LocalIncomingServerSession session) {
incomingServerSessions.put(streamID, session); incomingServerSessions.put(streamID, session);
} }
public void removeIncomingServerSessions(String streamID) { public void removeIncomingServerSessions(StreamID streamID) {
incomingServerSessions.remove(streamID); incomingServerSessions.remove(streamID);
} }
......
...@@ -22,22 +22,17 @@ package org.jivesoftware.openfire; ...@@ -22,22 +22,17 @@ package org.jivesoftware.openfire;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date;
import java.util.Deque;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import org.dom4j.Element;
import org.jivesoftware.openfire.audit.AuditStreamIDFactory; import org.jivesoftware.openfire.audit.AuditStreamIDFactory;
import org.jivesoftware.openfire.auth.AuthToken; import org.jivesoftware.openfire.auth.AuthToken;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
...@@ -45,11 +40,6 @@ import org.jivesoftware.openfire.cluster.ClusterEventListener; ...@@ -45,11 +40,6 @@ import org.jivesoftware.openfire.cluster.ClusterEventListener;
import org.jivesoftware.openfire.cluster.ClusterManager; import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.component.InternalComponentManager; import org.jivesoftware.openfire.component.InternalComponentManager;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.disco.DiscoInfoProvider;
import org.jivesoftware.openfire.disco.DiscoItem;
import org.jivesoftware.openfire.disco.DiscoItemsProvider;
import org.jivesoftware.openfire.disco.DiscoServerItem;
import org.jivesoftware.openfire.disco.ServerItemsProvider;
import org.jivesoftware.openfire.event.SessionEventDispatcher; import org.jivesoftware.openfire.event.SessionEventDispatcher;
import org.jivesoftware.openfire.http.HttpConnection; import org.jivesoftware.openfire.http.HttpConnection;
import org.jivesoftware.openfire.http.HttpSession; import org.jivesoftware.openfire.http.HttpSession;
...@@ -70,18 +60,13 @@ import org.jivesoftware.openfire.session.OutgoingServerSession; ...@@ -70,18 +60,13 @@ import org.jivesoftware.openfire.session.OutgoingServerSession;
import org.jivesoftware.openfire.session.RemoteSessionLocator; import org.jivesoftware.openfire.session.RemoteSessionLocator;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory; import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.openfire.user.UserManager; import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.XMPPDateTimeFormat;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.xmpp.forms.DataForm;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Message; import org.xmpp.packet.Message;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
...@@ -145,7 +130,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -145,7 +130,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
* Cache (unlimited, never expire) that holds incoming sessions of remote servers. * Cache (unlimited, never expire) that holds incoming sessions of remote servers.
* Key: stream ID that identifies the socket/session, Value: nodeID * Key: stream ID that identifies the socket/session, Value: nodeID
*/ */
private Cache<String, byte[]> incomingServerSessionsCache; private Cache<StreamID, byte[]> incomingServerSessionsCache;
/** /**
* Cache (unlimited, never expire) that holds list of incoming sessions * Cache (unlimited, never expire) that holds list of incoming sessions
* originated from the same remote server (domain/subdomain). For instance, jabber.org * originated from the same remote server (domain/subdomain). For instance, jabber.org
...@@ -153,7 +138,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -153,7 +138,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
* jivesoftware.com and the other socket to conference.jivesoftware.com). * jivesoftware.com and the other socket to conference.jivesoftware.com).
* Key: remote hostname (domain/subdomain), Value: list of stream IDs that identify each socket. * Key: remote hostname (domain/subdomain), Value: list of stream IDs that identify each socket.
*/ */
private Cache<String, List<String>> hostnameSessionsCache; private Cache<String, List<StreamID>> hostnameSessionsCache;
/** /**
* Cache (unlimited, never expire) that holds domains, subdomains and virtual * Cache (unlimited, never expire) that holds domains, subdomains and virtual
...@@ -167,7 +152,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -167,7 +152,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
* will have access to this clustered cache even in the case of this node going * will have access to this clustered cache even in the case of this node going
* down. * down.
*/ */
private Cache<String, Set<String>> validatedDomainsCache; private Cache<StreamID, Set<String>> validatedDomainsCache;
private ClientSessionListener clientSessionListener = new ClientSessionListener(); private ClientSessionListener clientSessionListener = new ClientSessionListener();
private ComponentSessionListener componentSessionListener = new ComponentSessionListener(); private ComponentSessionListener componentSessionListener = new ComponentSessionListener();
...@@ -465,7 +450,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -465,7 +450,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
*/ */
public void registerIncomingServerSession(String hostname, LocalIncomingServerSession session) { public void registerIncomingServerSession(String hostname, LocalIncomingServerSession session) {
// Keep local track of the incoming server session connected to this JVM // Keep local track of the incoming server session connected to this JVM
String streamID = session.getStreamID().getID(); StreamID streamID = session.getStreamID();
localSessionManager.addIncomingServerSessions(streamID, session); localSessionManager.addIncomingServerSessions(streamID, session);
// Keep track of the nodeID hosting the incoming server session // Keep track of the nodeID hosting the incoming server session
incomingServerSessionsCache.put(streamID, server.getNodeID().toByteArray()); incomingServerSessionsCache.put(streamID, server.getNodeID().toByteArray());
...@@ -473,7 +458,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -473,7 +458,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache); Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
try { try {
lock.lock(); lock.lock();
List<String> streamIDs = hostnameSessionsCache.get(hostname); List<StreamID> streamIDs = hostnameSessionsCache.get(hostname);
if (streamIDs == null) { if (streamIDs == null) {
streamIDs = new ArrayList<>(); streamIDs = new ArrayList<>();
} }
...@@ -508,7 +493,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -508,7 +493,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
*/ */
public void unregisterIncomingServerSession(String hostname, IncomingServerSession session) { public void unregisterIncomingServerSession(String hostname, IncomingServerSession session) {
// Remove local track of the incoming server session connected to this JVM // Remove local track of the incoming server session connected to this JVM
String streamID = session.getStreamID().getID(); StreamID streamID = session.getStreamID();
localSessionManager.removeIncomingServerSessions(streamID); localSessionManager.removeIncomingServerSessions(streamID);
// Remove track of the nodeID hosting the incoming server session // Remove track of the nodeID hosting the incoming server session
incomingServerSessionsCache.remove(streamID); incomingServerSessionsCache.remove(streamID);
...@@ -517,7 +502,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -517,7 +502,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache); Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
try { try {
lock.lock(); lock.lock();
List<String> streamIDs = hostnameSessionsCache.get(hostname); List<StreamID> streamIDs = hostnameSessionsCache.get(hostname);
if (streamIDs != null) { if (streamIDs != null) {
streamIDs.remove(streamID); streamIDs.remove(streamID);
if (streamIDs.isEmpty()) { if (streamIDs.isEmpty()) {
...@@ -563,7 +548,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -563,7 +548,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
* @param streamID id that uniquely identifies the session. * @param streamID id that uniquely identifies the session.
* @return domains, subdomains and virtual hosts that where validated. * @return domains, subdomains and virtual hosts that where validated.
*/ */
public Collection<String> getValidatedDomains(String streamID) { public Collection<String> getValidatedDomains(StreamID streamID) {
Lock lock = CacheFactory.getLock(streamID, validatedDomainsCache); Lock lock = CacheFactory.getLock(streamID, validatedDomainsCache);
try { try {
lock.lock(); lock.lock();
...@@ -850,7 +835,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -850,7 +835,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
* @param streamID the stream ID that identifies the incoming server session hosted by this JVM. * @param streamID the stream ID that identifies the incoming server session hosted by this JVM.
* @return the incoming server session hosted by this JVM or null if none was found. * @return the incoming server session hosted by this JVM or null if none was found.
*/ */
public LocalIncomingServerSession getIncomingServerSession(String streamID) { public LocalIncomingServerSession getIncomingServerSession(StreamID streamID) {
return localSessionManager.getIncomingServerSession(streamID); return localSessionManager.getIncomingServerSession(streamID);
} }
...@@ -863,7 +848,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -863,7 +848,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
* @return the sessions that were originated by a remote server. * @return the sessions that were originated by a remote server.
*/ */
public List<IncomingServerSession> getIncomingServerSessions(String hostname) { public List<IncomingServerSession> getIncomingServerSessions(String hostname) {
List<String> streamIDs; List<StreamID> streamIDs;
// Get list of sockets/sessions coming from the remote hostname // Get list of sockets/sessions coming from the remote hostname
Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache); Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
try { try {
...@@ -880,7 +865,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -880,7 +865,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
else { else {
// Collect the sessions associated to the found stream IDs // Collect the sessions associated to the found stream IDs
List<IncomingServerSession> sessions = new ArrayList<>(); List<IncomingServerSession> sessions = new ArrayList<>();
for (String streamID : streamIDs) { for (StreamID streamID : streamIDs) {
// Search in local hosted sessions // Search in local hosted sessions
IncomingServerSession session = localSessionManager.getIncomingServerSession(streamID); IncomingServerSession session = localSessionManager.getIncomingServerSession(streamID);
RemoteSessionLocator locator = server.getRemoteSessionLocator(); RemoteSessionLocator locator = server.getRemoteSessionLocator();
...@@ -1608,14 +1593,14 @@ public class SessionManager extends BasicModule implements ClusterEventListener/ ...@@ -1608,14 +1593,14 @@ public class SessionManager extends BasicModule implements ClusterEventListener/
// Add incoming server sessions hosted locally to the cache (using new nodeID) // Add incoming server sessions hosted locally to the cache (using new nodeID)
for (LocalIncomingServerSession session : localSessionManager.getIncomingServerSessions()) { for (LocalIncomingServerSession session : localSessionManager.getIncomingServerSessions()) {
String streamID = session.getStreamID().getID(); StreamID streamID = session.getStreamID();
incomingServerSessionsCache.put(streamID, server.getNodeID().toByteArray()); incomingServerSessionsCache.put(streamID, server.getNodeID().toByteArray());
for (String hostname : session.getValidatedDomains()) { for (String hostname : session.getValidatedDomains()) {
// Update list of sockets/sessions coming from the same remote hostname // Update list of sockets/sessions coming from the same remote hostname
Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache); Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
try { try {
lock.lock(); lock.lock();
List<String> streamIDs = hostnameSessionsCache.get(hostname); List<StreamID> streamIDs = hostnameSessionsCache.get(hostname);
if (streamIDs == null) { if (streamIDs == null) {
streamIDs = new ArrayList<>(); streamIDs = new ArrayList<>();
} }
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
package org.jivesoftware.openfire.multiplex; package org.jivesoftware.openfire.multiplex;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.net.VirtualConnection; import org.jivesoftware.openfire.net.VirtualConnection;
import org.jivesoftware.openfire.session.ConnectionMultiplexerSession; import org.jivesoftware.openfire.session.ConnectionMultiplexerSession;
...@@ -72,7 +73,7 @@ public class ClientSessionConnection extends VirtualConnection { ...@@ -72,7 +73,7 @@ public class ClientSessionConnection extends VirtualConnection {
*/ */
@Override @Override
public void deliver(Packet packet) { public void deliver(Packet packet) {
String streamID = session.getStreamID().getID(); StreamID streamID = session.getStreamID();
ConnectionMultiplexerSession multiplexerSession = ConnectionMultiplexerSession multiplexerSession =
multiplexerManager.getMultiplexerSession(connectionManagerName,streamID); multiplexerManager.getMultiplexerSession(connectionManagerName,streamID);
if (multiplexerSession != null) { if (multiplexerSession != null) {
...@@ -100,7 +101,7 @@ public class ClientSessionConnection extends VirtualConnection { ...@@ -100,7 +101,7 @@ public class ClientSessionConnection extends VirtualConnection {
*/ */
@Override @Override
public void deliverRawText(String text) { public void deliverRawText(String text) {
String streamID = session.getStreamID().getID(); StreamID streamID = session.getStreamID();
ConnectionMultiplexerSession multiplexerSession = ConnectionMultiplexerSession multiplexerSession =
multiplexerManager.getMultiplexerSession(connectionManagerName,streamID); multiplexerManager.getMultiplexerSession(connectionManagerName,streamID);
if (multiplexerSession != null) { if (multiplexerSession != null) {
...@@ -108,7 +109,7 @@ public class ClientSessionConnection extends VirtualConnection { ...@@ -108,7 +109,7 @@ public class ClientSessionConnection extends VirtualConnection {
StringBuilder sb = new StringBuilder(200 + text.length()); StringBuilder sb = new StringBuilder(200 + text.length());
sb.append("<route from=\"").append(serverName); sb.append("<route from=\"").append(serverName);
sb.append("\" to=\"").append(connectionManagerName); sb.append("\" to=\"").append(connectionManagerName);
sb.append("\" streamid=\"").append(streamID).append("\">"); sb.append("\" streamid=\"").append(streamID.getID()).append("\">");
sb.append(text); sb.append(text);
sb.append("</route>"); sb.append("</route>");
// Deliver the wrapped stanza // Deliver the wrapped stanza
...@@ -176,7 +177,7 @@ public class ClientSessionConnection extends VirtualConnection { ...@@ -176,7 +177,7 @@ public class ClientSessionConnection extends VirtualConnection {
@Override @Override
public void closeVirtualConnection() { public void closeVirtualConnection() {
// Figure out who requested the connection to be closed // Figure out who requested the connection to be closed
String streamID = session.getStreamID().getID(); StreamID streamID = session.getStreamID();
if (multiplexerManager.getClientSession(connectionManagerName, streamID) == null) { if (multiplexerManager.getClientSession(connectionManagerName, streamID) == null) {
// Client or Connection manager requested to close the session // Client or Connection manager requested to close the session
// Do nothing since it has already been removed and closed // Do nothing since it has already been removed and closed
...@@ -192,7 +193,7 @@ public class ClientSessionConnection extends VirtualConnection { ...@@ -192,7 +193,7 @@ public class ClientSessionConnection extends VirtualConnection {
closeRequest.setTo(connectionManagerName); closeRequest.setTo(connectionManagerName);
Element child = closeRequest.setChildElement("session", Element child = closeRequest.setChildElement("session",
"http://jabber.org/protocol/connectionmanager"); "http://jabber.org/protocol/connectionmanager");
child.addAttribute("id", streamID); child.addAttribute("id", streamID.getID());
child.addElement("close"); child.addElement("close");
multiplexerSession.process(closeRequest); multiplexerSession.process(closeRequest);
} }
......
...@@ -72,12 +72,12 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -72,12 +72,12 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
* Map that keeps track of connection managers and hosted connections. * Map that keeps track of connection managers and hosted connections.
* Key: stream ID; Value: Domain of connection manager hosting connection * Key: stream ID; Value: Domain of connection manager hosting connection
*/ */
private Map<String, String> streamIDs = new ConcurrentHashMap<>(); private Map<StreamID, String> streamIDs = new ConcurrentHashMap<>();
/** /**
* Map that keeps track of connection managers and hosted sessions. * Map that keeps track of connection managers and hosted sessions.
* Key: Domain of connection manager; Value: Map with Key: stream ID; Value: Client session * Key: Domain of connection manager; Value: Map with Key: stream ID; Value: Client session
*/ */
private Map<String, Map<String, LocalClientSession>> sessionsByManager = private Map<String, Map<StreamID, LocalClientSession>> sessionsByManager =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private SessionManager sessionManager; private SessionManager sessionManager;
...@@ -144,7 +144,7 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -144,7 +144,7 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
* @param hostAddress the textual representation of the address of the client or null if using old CM. * @param hostAddress the textual representation of the address of the client or null if using old CM.
* @return true if a session was created or false if the client should disconnect. * @return true if a session was created or false if the client should disconnect.
*/ */
public boolean createClientSession(String connectionManagerDomain, String streamID, String hostName, String hostAddress) { public boolean createClientSession(String connectionManagerDomain, StreamID streamID, String hostName, String hostAddress) {
Connection connection = new ClientSessionConnection(connectionManagerDomain, hostName, hostAddress); Connection connection = new ClientSessionConnection(connectionManagerDomain, hostName, hostAddress);
// Check if client is allowed to connect from the specified IP address. Ignore the checking if connection // Check if client is allowed to connect from the specified IP address. Ignore the checking if connection
// manager is old version and is not passing client's address // manager is old version and is not passing client's address
...@@ -156,11 +156,11 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -156,11 +156,11 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
} }
if (address == null || LocalClientSession.isAllowed(connection)) { if (address == null || LocalClientSession.isAllowed(connection)) {
LocalClientSession session = LocalClientSession session =
SessionManager.getInstance().createClientSession(connection, new BasicStreamID(streamID)); SessionManager.getInstance().createClientSession(connection, streamID);
// Register that this streamID belongs to the specified connection manager // Register that this streamID belongs to the specified connection manager
streamIDs.put(streamID, connectionManagerDomain); streamIDs.put(streamID, connectionManagerDomain);
// Register which sessions are being hosted by the speicifed connection manager // Register which sessions are being hosted by the speicifed connection manager
Map<String, LocalClientSession> sessions = sessionsByManager.get(connectionManagerDomain); Map<StreamID, LocalClientSession> sessions = sessionsByManager.get(connectionManagerDomain);
if (sessions == null) { if (sessions == null) {
synchronized (connectionManagerDomain.intern()) { synchronized (connectionManagerDomain.intern()) {
sessions = sessionsByManager.get(connectionManagerDomain); sessions = sessionsByManager.get(connectionManagerDomain);
...@@ -183,8 +183,8 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -183,8 +183,8 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
* of the session. * of the session.
* @param streamID the stream ID created by the connection manager for the session. * @param streamID the stream ID created by the connection manager for the session.
*/ */
public void closeClientSession(String connectionManagerDomain, String streamID) { public void closeClientSession(String connectionManagerDomain, StreamID streamID) {
Map<String, LocalClientSession> sessions = sessionsByManager.get(connectionManagerDomain); Map<StreamID, LocalClientSession> sessions = sessionsByManager.get(connectionManagerDomain);
if (sessions != null) { if (sessions != null) {
Session session = sessions.remove(streamID); Session session = sessions.remove(streamID);
if (session != null) { if (session != null) {
...@@ -203,7 +203,7 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -203,7 +203,7 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
public void multiplexerAvailable(String connectionManagerName) { public void multiplexerAvailable(String connectionManagerName) {
// Add a new entry in the list of available managers. Here is where we are going to store // Add a new entry in the list of available managers. Here is where we are going to store
// which clients were connected through which connection manager // which clients were connected through which connection manager
Map<String, LocalClientSession> sessions = sessionsByManager.get(connectionManagerName); Map<StreamID, LocalClientSession> sessions = sessionsByManager.get(connectionManagerName);
if (sessions == null) { if (sessions == null) {
synchronized (connectionManagerName.intern()) { synchronized (connectionManagerName.intern()) {
sessions = sessionsByManager.get(connectionManagerName); sessions = sessionsByManager.get(connectionManagerName);
...@@ -223,9 +223,9 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -223,9 +223,9 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
*/ */
public void multiplexerUnavailable(String connectionManagerName) { public void multiplexerUnavailable(String connectionManagerName) {
// Remove the connection manager and the hosted sessions // Remove the connection manager and the hosted sessions
Map<String, LocalClientSession> sessions = sessionsByManager.remove(connectionManagerName); Map<StreamID, LocalClientSession> sessions = sessionsByManager.remove(connectionManagerName);
if (sessions != null) { if (sessions != null) {
for (String streamID : sessions.keySet()) { for (StreamID streamID : sessions.keySet()) {
// Remove inverse track of connection manager hosting streamIDs // Remove inverse track of connection manager hosting streamIDs
streamIDs.remove(streamID); streamIDs.remove(streamID);
// Close the session // Close the session
...@@ -243,8 +243,8 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -243,8 +243,8 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
* @param streamID the stream ID created by the connection manager for the session. * @param streamID the stream ID created by the connection manager for the session.
* @return the ClientSession with the specified stream ID. * @return the ClientSession with the specified stream ID.
*/ */
public LocalClientSession getClientSession(String connectionManagerDomain, String streamID) { public LocalClientSession getClientSession(String connectionManagerDomain, StreamID streamID) {
Map<String, LocalClientSession> sessions = sessionsByManager.get(connectionManagerDomain); Map<StreamID, LocalClientSession> sessions = sessionsByManager.get(connectionManagerDomain);
if (sessions != null) { if (sessions != null) {
return sessions.get(streamID); return sessions.get(streamID);
} }
...@@ -261,7 +261,7 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -261,7 +261,7 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
* @param streamID if provided, the same connection will always be used for a given streamID * @param streamID if provided, the same connection will always be used for a given streamID
* @return a session to the specified connection manager domain or null if none was found. * @return a session to the specified connection manager domain or null if none was found.
*/ */
public ConnectionMultiplexerSession getMultiplexerSession(String connectionManagerDomain,String streamID) { public ConnectionMultiplexerSession getMultiplexerSession(String connectionManagerDomain,StreamID streamID) {
List<ConnectionMultiplexerSession> sessions = List<ConnectionMultiplexerSession> sessions =
sessionManager.getConnectionMultiplexerSessions(connectionManagerDomain); sessionManager.getConnectionMultiplexerSessions(connectionManagerDomain);
if (sessions.isEmpty()) { if (sessions.isEmpty()) {
...@@ -309,7 +309,7 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -309,7 +309,7 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
* @return the number of connected clients to a specific connection manager. * @return the number of connected clients to a specific connection manager.
*/ */
public int getNumConnectedClients(String managerName) { public int getNumConnectedClients(String managerName) {
Map<String, LocalClientSession> clients = sessionsByManager.get(managerName); Map<StreamID, LocalClientSession> clients = sessionsByManager.get(managerName);
if (clients == null) { if (clients == null) {
return 0; return 0;
} }
...@@ -345,41 +345,14 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -345,41 +345,14 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
private void removeSession(Session session) { private void removeSession(Session session) {
// Remove trace indicating that a connection manager is hosting a connection // Remove trace indicating that a connection manager is hosting a connection
String streamID = session.getStreamID().getID(); StreamID streamID = session.getStreamID();
String connectionManagerDomain = streamIDs.remove(streamID); String connectionManagerDomain = streamIDs.remove(streamID);
// Remove trace indicating that a connection manager is hosting a session // Remove trace indicating that a connection manager is hosting a session
if (connectionManagerDomain != null) { if (connectionManagerDomain != null) {
Map<String, LocalClientSession> sessions = sessionsByManager.get(connectionManagerDomain); Map<StreamID, LocalClientSession> sessions = sessionsByManager.get(connectionManagerDomain);
if (sessions != null) { if (sessions != null) {
sessions.remove(streamID); sessions.remove(streamID);
} }
} }
} }
/**
* Simple implementation of the StreamID interface to hold the stream ID assigned by
* the Connection Manager to the Session.
*/
private class BasicStreamID implements StreamID {
String id;
public BasicStreamID(String id) {
this.id = id;
}
@Override
public String getID() {
return id;
}
@Override
public String toString() {
return id;
}
@Override
public int hashCode() {
return id.hashCode();
}
}
} }
...@@ -26,10 +26,13 @@ import org.dom4j.DocumentHelper; ...@@ -26,10 +26,13 @@ import org.dom4j.DocumentHelper;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.QName; import org.dom4j.QName;
import org.jivesoftware.openfire.SessionPacketRouter; import org.jivesoftware.openfire.SessionPacketRouter;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.StreamIDFactory;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.session.ClientSession; import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.ConnectionMultiplexerSession; import org.jivesoftware.openfire.session.ConnectionMultiplexerSession;
import org.jivesoftware.openfire.session.LocalClientSession; import org.jivesoftware.openfire.session.LocalClientSession;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.xmpp.packet.IQ; import org.xmpp.packet.IQ;
...@@ -76,15 +79,16 @@ public class MultiplexerPacketHandler { ...@@ -76,15 +79,16 @@ public class MultiplexerPacketHandler {
} }
else if (iq.getType() == IQ.Type.set) { else if (iq.getType() == IQ.Type.set) {
Element child = iq.getChildElement(); Element child = iq.getChildElement();
String streamID = child.attributeValue("id"); String streamIDValue = child.attributeValue("id");
if (streamID == null) { if (streamIDValue == null) {
// No stream ID was included so return a bad_request error // No stream ID was included so return a bad_request error
Element extraError = DocumentHelper.createElement(QName.get( Element extraError = DocumentHelper.createElement(QName.get(
"id-required", "http://jabber.org/protocol/connectionmanager#errors")); "id-required", "http://jabber.org/protocol/connectionmanager#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request, extraError); sendErrorPacket(iq, PacketError.Condition.bad_request, extraError);
} }
else if ("session".equals(child.getName())) { else if ("session".equals(child.getName())) {
Element create = child.element("create"); StreamID streamID = BasicStreamIDFactory.createStreamID( streamIDValue );
Element create = child.element( "create" );
if (create != null) { if (create != null) {
// Get the InetAddress of the client // Get the InetAddress of the client
Element hostElement = create.element("host"); Element hostElement = create.element("host");
...@@ -167,7 +171,7 @@ public class MultiplexerPacketHandler { ...@@ -167,7 +171,7 @@ public class MultiplexerPacketHandler {
* @param route the route packet. * @param route the route packet.
*/ */
public void route(Route route) { public void route(Route route) {
String streamID = route.getStreamID(); StreamID streamID = route.getStreamID();
if (streamID == null) { if (streamID == null) {
// No stream ID was included so return a bad_request error // No stream ID was included so return a bad_request error
Element extraError = DocumentHelper.createElement(QName.get( Element extraError = DocumentHelper.createElement(QName.get(
......
...@@ -21,6 +21,8 @@ ...@@ -21,6 +21,8 @@
package org.jivesoftware.openfire.multiplex; package org.jivesoftware.openfire.multiplex;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import java.util.Iterator; import java.util.Iterator;
...@@ -40,10 +42,10 @@ public class Route extends Packet { ...@@ -40,10 +42,10 @@ public class Route extends Packet {
* @param streamID the stream ID that identifies the connection that is actually sending * @param streamID the stream ID that identifies the connection that is actually sending
* the wrapped stanza. * the wrapped stanza.
*/ */
public Route(String streamID) { public Route(StreamID streamID) {
this.element = docFactory.createDocument().addElement("route"); this.element = docFactory.createDocument().addElement("route");
// Set the stream ID that identifies the target session // Set the stream ID that identifies the target session
element.addAttribute("streamid", streamID); element.addAttribute("streamid", streamID.getID());
} }
/** /**
...@@ -104,8 +106,12 @@ public class Route extends Packet { ...@@ -104,8 +106,12 @@ public class Route extends Packet {
* @return the stream ID that identifies the connection that is actually sending * @return the stream ID that identifies the connection that is actually sending
* the wrapped stanza. * the wrapped stanza.
*/ */
public String getStreamID() { public StreamID getStreamID() {
return element.attributeValue("streamid"); final String value = element.attributeValue( "streamid" );
if (value == null) {
return null;
}
return BasicStreamIDFactory.createStreamID( value );
} }
/** /**
......
...@@ -38,12 +38,7 @@ import org.dom4j.Document; ...@@ -38,12 +38,7 @@ import org.dom4j.Document;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.io.XMPPPacketReader; import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.openfire.Connection; import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.RemoteConnectionFailedException;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.AuthFactory; import org.jivesoftware.openfire.auth.AuthFactory;
import org.jivesoftware.openfire.net.*; import org.jivesoftware.openfire.net.*;
import org.jivesoftware.openfire.session.ConnectionSettings; import org.jivesoftware.openfire.session.ConnectionSettings;
...@@ -264,7 +259,7 @@ public class ServerDialback { ...@@ -264,7 +259,7 @@ public class ServerDialback {
if (authenticateDomain(socketReader, localDomain, remoteDomain, id)) { if (authenticateDomain(socketReader, localDomain, remoteDomain, id)) {
log.debug( "Successfully authenticated the connection with dialback." ); log.debug( "Successfully authenticated the connection with dialback." );
// Domain was validated so create a new OutgoingServerSession // Domain was validated so create a new OutgoingServerSession
StreamID streamID = new BasicStreamIDFactory().createStreamID(id); StreamID streamID = BasicStreamIDFactory.createStreamID(id);
LocalOutgoingServerSession session = new LocalOutgoingServerSession(localDomain, connection, socketReader, streamID); LocalOutgoingServerSession session = new LocalOutgoingServerSession(localDomain, connection, socketReader, streamID);
connection.init(session); connection.init(session);
// Set the hostname as the address of the session // Set the hostname as the address of the session
...@@ -547,7 +542,7 @@ public class ServerDialback { ...@@ -547,7 +542,7 @@ public class ServerDialback {
log.debug( "Verifying dialback key..." ); log.debug( "Verifying dialback key..." );
try try
{ {
result = verifyKey( key, streamID.toString(), recipient, remoteDomain, socket, false ); result = verifyKey( key, streamID, recipient, remoteDomain, socket, false );
} }
catch (SSLHandshakeException e) catch (SSLHandshakeException e)
{ {
...@@ -564,7 +559,7 @@ public class ServerDialback { ...@@ -564,7 +559,7 @@ public class ServerDialback {
socket.connect( oldAddress, RemoteServerManager.getSocketTimeout() ); socket.connect( oldAddress, RemoteServerManager.getSocketTimeout() );
log.debug( "Successfully re-opened socket! Try to validate dialback key again (without TLS this time)..." ); log.debug( "Successfully re-opened socket! Try to validate dialback key again (without TLS this time)..." );
result = verifyKey( key, streamID.toString(), recipient, remoteDomain, socket, true ); result = verifyKey( key, streamID, recipient, remoteDomain, socket, true );
} }
switch(result) { switch(result) {
...@@ -617,7 +612,7 @@ public class ServerDialback { ...@@ -617,7 +612,7 @@ public class ServerDialback {
return host_unknown; return host_unknown;
} }
private VerifyResult sendVerifyKey(String key, String streamID, String recipient, String remoteDomain, Writer writer, XMPPPacketReader reader, Socket socket, boolean skipTLS) throws IOException, XmlPullParserException, RemoteConnectionFailedException { private VerifyResult sendVerifyKey(String key, StreamID streamID, String recipient, String remoteDomain, Writer writer, XMPPPacketReader reader, Socket socket, boolean skipTLS) throws IOException, XmlPullParserException, RemoteConnectionFailedException {
final Logger log = LoggerFactory.getLogger( Log.getName() + "[Acting as Receiving Server: Verify key with AS: " + remoteDomain + " for OS: " + recipient + " (id " + streamID + ")]" ); final Logger log = LoggerFactory.getLogger( Log.getName() + "[Acting as Receiving Server: Verify key with AS: " + remoteDomain + " for OS: " + recipient + " (id " + streamID + ")]" );
VerifyResult result = VerifyResult.error; VerifyResult result = VerifyResult.error;
...@@ -692,7 +687,7 @@ public class ServerDialback { ...@@ -692,7 +687,7 @@ public class ServerDialback {
sb.append("<db:verify"); sb.append("<db:verify");
sb.append(" from=\"").append(recipient).append("\""); sb.append(" from=\"").append(recipient).append("\"");
sb.append(" to=\"").append(remoteDomain).append("\""); sb.append(" to=\"").append(remoteDomain).append("\"");
sb.append(" id=\"").append(streamID).append("\">"); sb.append(" id=\"").append(streamID.getID()).append("\">");
sb.append(key); sb.append(key);
sb.append("</db:verify>"); sb.append("</db:verify>");
writer.write(sb.toString()); writer.write(sb.toString());
...@@ -701,7 +696,7 @@ public class ServerDialback { ...@@ -701,7 +696,7 @@ public class ServerDialback {
try { try {
Element doc = reader.parseDocument().getRootElement(); Element doc = reader.parseDocument().getRootElement();
if ("db".equals(doc.getNamespacePrefix()) && "verify".equals(doc.getName())) { if ("db".equals(doc.getNamespacePrefix()) && "verify".equals(doc.getName())) {
if (!streamID.equals(doc.attributeValue("id"))) { if (doc.attributeValue("id") == null || !streamID.equals(BasicStreamIDFactory.createStreamID( doc.attributeValue("id") ))) {
// Include the invalid-id stream error condition in the response // Include the invalid-id stream error condition in the response
writer.write(new StreamError(StreamError.Condition.invalid_id).toXML()); writer.write(new StreamError(StreamError.Condition.invalid_id).toXML());
writer.flush(); writer.flush();
...@@ -766,7 +761,7 @@ public class ServerDialback { ...@@ -766,7 +761,7 @@ public class ServerDialback {
/** /**
* Verifies the key with the Authoritative Server. * Verifies the key with the Authoritative Server.
*/ */
private VerifyResult verifyKey(String key, String streamID, String recipient, String remoteDomain, Socket socket, boolean skipTLS ) throws IOException, XmlPullParserException, RemoteConnectionFailedException { private VerifyResult verifyKey(String key, StreamID streamID, String recipient, String remoteDomain, Socket socket, boolean skipTLS ) throws IOException, XmlPullParserException, RemoteConnectionFailedException {
final Logger log = LoggerFactory.getLogger( Log.getName() + "[Acting as Receiving Server: Verify key with AS: " + remoteDomain + " for OS: " + recipient + " (id " + streamID + ")]" ); final Logger log = LoggerFactory.getLogger( Log.getName() + "[Acting as Receiving Server: Verify key with AS: " + remoteDomain + " for OS: " + recipient + " (id " + streamID + ")]" );
...@@ -832,9 +827,9 @@ public class ServerDialback { ...@@ -832,9 +827,9 @@ public class ServerDialback {
String verifyFROM = doc.attributeValue("from"); String verifyFROM = doc.attributeValue("from");
String verifyTO = doc.attributeValue("to"); String verifyTO = doc.attributeValue("to");
String key = doc.getTextTrim(); String key = doc.getTextTrim();
String id = doc.attributeValue("id"); StreamID streamID = BasicStreamIDFactory.createStreamID( doc.attributeValue("id") );
final Logger log = LoggerFactory.getLogger( Log.getName() + "[Acting as Authoritative Server: Verify key sent by RS: " + verifyFROM + " (id " + id+ ")]" ); final Logger log = LoggerFactory.getLogger( Log.getName() + "[Acting as Authoritative Server: Verify key sent by RS: " + verifyFROM + " (id " + streamID+ ")]" );
log.debug( "Verifying key... "); log.debug( "Verifying key... ");
...@@ -846,7 +841,7 @@ public class ServerDialback { ...@@ -846,7 +841,7 @@ public class ServerDialback {
// Verify the received key // Verify the received key
// Created the expected key based on the received ID value and the shared secret // Created the expected key based on the received ID value and the shared secret
String expectedKey = AuthFactory.createDigest(id, getSecretkey()); String expectedKey = AuthFactory.createDigest(streamID.getID(), getSecretkey());
boolean verified = expectedKey.equals(key); boolean verified = expectedKey.equals(key);
// Send the result of the key verification // Send the result of the key verification
...@@ -856,7 +851,7 @@ public class ServerDialback { ...@@ -856,7 +851,7 @@ public class ServerDialback {
sb.append(" to=\"").append(verifyFROM).append("\""); sb.append(" to=\"").append(verifyFROM).append("\"");
sb.append(" type=\""); sb.append(" type=\"");
sb.append(verified ? "valid" : "invalid"); sb.append(verified ? "valid" : "invalid");
sb.append("\" id=\"").append(id).append("\"/>"); sb.append("\" id=\"").append(streamID.getID()).append("\"/>");
connection.deliverRawText(sb.toString()); connection.deliverRawText(sb.toString());
log.debug("Verification successful! Key was: " + (verified ? "VALID" : "INVALID")); log.debug("Verification successful! Key was: " + (verified ? "VALID" : "INVALID"));
return verified; return verified;
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
package org.jivesoftware.openfire.session; package org.jivesoftware.openfire.session;
import org.jivesoftware.openfire.StreamID;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
/** /**
...@@ -78,7 +79,7 @@ public interface RemoteSessionLocator { ...@@ -78,7 +79,7 @@ public interface RemoteSessionLocator {
* @param streamID the stream ID that uniquely identifies the session. * @param streamID the stream ID that uniquely identifies the session.
* @return a session surrogate of an incoming server session hosted by a remote cluster node. * @return a session surrogate of an incoming server session hosted by a remote cluster node.
*/ */
IncomingServerSession getIncomingServerSession(byte[] nodeID, String streamID); IncomingServerSession getIncomingServerSession(byte[] nodeID, StreamID streamID);
/** /**
* Returns a session surrogate of an outgoing server session hosted by a remote cluster node. It is * Returns a session surrogate of an outgoing server session hosted by a remote cluster node. It is
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
package org.jivesoftware.openfire.spi; package org.jivesoftware.openfire.spi;
import org.apache.commons.lang.StringEscapeUtils;
import org.jivesoftware.openfire.StreamID; import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.StreamIDFactory; import org.jivesoftware.openfire.StreamIDFactory;
...@@ -49,15 +50,18 @@ public class BasicStreamIDFactory implements StreamIDFactory { ...@@ -49,15 +50,18 @@ public class BasicStreamIDFactory implements StreamIDFactory {
return new BasicStreamID(new BigInteger( MAX_STRING_SIZE * 5, random ).toString( 36 )); return new BasicStreamID(new BigInteger( MAX_STRING_SIZE * 5, random ).toString( 36 ));
} }
public StreamID createStreamID(String name) { public static StreamID createStreamID(String name) {
return new BasicStreamID(name); return new BasicStreamID(name);
} }
private class BasicStreamID implements StreamID { private static class BasicStreamID implements StreamID {
String id; String id;
public BasicStreamID(String id) { public BasicStreamID(String id) {
this.id = id; if ( id == null || id.isEmpty() ) {
throw new IllegalArgumentException( "Argument 'id' cannot be null." );
}
this.id = StringEscapeUtils.escapeXml( id );
} }
@Override @Override
...@@ -74,5 +78,10 @@ public class BasicStreamIDFactory implements StreamIDFactory { ...@@ -74,5 +78,10 @@ public class BasicStreamIDFactory implements StreamIDFactory {
public int hashCode() { public int hashCode() {
return id.hashCode(); return id.hashCode();
} }
@Override
public boolean equals(Object obj) {
return id.equals( obj );
}
} }
} }
...@@ -20,8 +20,10 @@ ...@@ -20,8 +20,10 @@
package com.jivesoftware.openfire.session; package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.SessionManager; import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.ClusterTask; import org.jivesoftware.util.cache.ClusterTask;
import org.jivesoftware.util.cache.ExternalizableUtil; import org.jivesoftware.util.cache.ExternalizableUtil;
...@@ -39,7 +41,7 @@ import java.io.ObjectOutput; ...@@ -39,7 +41,7 @@ import java.io.ObjectOutput;
public class DeliverRawTextTask implements ClusterTask<Void> { public class DeliverRawTextTask implements ClusterTask<Void> {
private SessionType sessionType; private SessionType sessionType;
private JID address; private JID address;
private String streamID; private StreamID streamID;
private String text; private String text;
public DeliverRawTextTask() { public DeliverRawTextTask() {
...@@ -66,7 +68,7 @@ public class DeliverRawTextTask implements ClusterTask<Void> { ...@@ -66,7 +68,7 @@ public class DeliverRawTextTask implements ClusterTask<Void> {
this.text = text; this.text = text;
} }
public DeliverRawTextTask(String streamID, String text) { public DeliverRawTextTask(StreamID streamID, String text) {
this.sessionType = SessionType.incomingServer; this.sessionType = SessionType.incomingServer;
this.streamID = streamID; this.streamID = streamID;
this.text = text; this.text = text;
...@@ -89,7 +91,7 @@ public class DeliverRawTextTask implements ClusterTask<Void> { ...@@ -89,7 +91,7 @@ public class DeliverRawTextTask implements ClusterTask<Void> {
} }
ExternalizableUtil.getInstance().writeBoolean(out, streamID != null); ExternalizableUtil.getInstance().writeBoolean(out, streamID != null);
if (streamID != null) { if (streamID != null) {
ExternalizableUtil.getInstance().writeSafeUTF(out, streamID); ExternalizableUtil.getInstance().writeSafeUTF(out, streamID.getID());
} }
} }
...@@ -100,7 +102,7 @@ public class DeliverRawTextTask implements ClusterTask<Void> { ...@@ -100,7 +102,7 @@ public class DeliverRawTextTask implements ClusterTask<Void> {
address = (JID) ExternalizableUtil.getInstance().readSerializable(in); address = (JID) ExternalizableUtil.getInstance().readSerializable(in);
} }
if (ExternalizableUtil.getInstance().readBoolean(in)) { if (ExternalizableUtil.getInstance().readBoolean(in)) {
streamID = ExternalizableUtil.getInstance().readSafeUTF(in); streamID = BasicStreamIDFactory.createStreamID( ExternalizableUtil.getInstance().readSafeUTF(in) );
} }
} }
......
...@@ -20,8 +20,10 @@ ...@@ -20,8 +20,10 @@
package com.jivesoftware.openfire.session; package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.SessionManager; import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.session.IncomingServerSession; import org.jivesoftware.openfire.session.IncomingServerSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.util.cache.ExternalizableUtil; import org.jivesoftware.util.cache.ExternalizableUtil;
import java.io.IOException; import java.io.IOException;
...@@ -35,13 +37,13 @@ import java.io.ObjectOutput; ...@@ -35,13 +37,13 @@ import java.io.ObjectOutput;
* @author Gaston Dombiak * @author Gaston Dombiak
*/ */
public class IncomingServerSessionTask extends RemoteSessionTask { public class IncomingServerSessionTask extends RemoteSessionTask {
private String streamID; private StreamID streamID;
public IncomingServerSessionTask() { public IncomingServerSessionTask() {
super(); super();
} }
protected IncomingServerSessionTask(Operation operation, String streamID) { protected IncomingServerSessionTask(Operation operation, StreamID streamID) {
super(operation); super(operation);
this.streamID = streamID; this.streamID = streamID;
} }
...@@ -63,12 +65,12 @@ public class IncomingServerSessionTask extends RemoteSessionTask { ...@@ -63,12 +65,12 @@ public class IncomingServerSessionTask extends RemoteSessionTask {
public void writeExternal(ObjectOutput out) throws IOException { public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out); super.writeExternal(out);
ExternalizableUtil.getInstance().writeSafeUTF(out, streamID); ExternalizableUtil.getInstance().writeSafeUTF( out, streamID.getID() );
} }
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in); super.readExternal(in);
streamID = ExternalizableUtil.getInstance().readSafeUTF(in); streamID = BasicStreamIDFactory.createStreamID( ExternalizableUtil.getInstance().readSafeUTF(in) );
} }
public String toString() { public String toString() {
......
...@@ -22,8 +22,10 @@ package com.jivesoftware.openfire.session; ...@@ -22,8 +22,10 @@ package com.jivesoftware.openfire.session;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.tree.DefaultElement; import org.dom4j.tree.DefaultElement;
import org.jivesoftware.openfire.SessionManager; import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.ClusterTask; import org.jivesoftware.util.cache.ClusterTask;
import org.jivesoftware.util.cache.ExternalizableUtil; import org.jivesoftware.util.cache.ExternalizableUtil;
...@@ -41,7 +43,7 @@ import java.io.ObjectOutput; ...@@ -41,7 +43,7 @@ import java.io.ObjectOutput;
public class ProcessPacketTask implements ClusterTask<Void> { public class ProcessPacketTask implements ClusterTask<Void> {
private SessionType sessionType; private SessionType sessionType;
private JID address; private JID address;
private String streamID; private StreamID streamID;
private Packet packet; private Packet packet;
public ProcessPacketTask() { public ProcessPacketTask() {
...@@ -68,7 +70,7 @@ public class ProcessPacketTask implements ClusterTask<Void> { ...@@ -68,7 +70,7 @@ public class ProcessPacketTask implements ClusterTask<Void> {
this.packet = packet; this.packet = packet;
} }
protected ProcessPacketTask(String streamID, Packet packet) { protected ProcessPacketTask(StreamID streamID, Packet packet) {
this.sessionType = SessionType.incomingServer; this.sessionType = SessionType.incomingServer;
this.streamID = streamID; this.streamID = streamID;
this.packet = packet; this.packet = packet;
...@@ -89,7 +91,7 @@ public class ProcessPacketTask implements ClusterTask<Void> { ...@@ -89,7 +91,7 @@ public class ProcessPacketTask implements ClusterTask<Void> {
} }
ExternalizableUtil.getInstance().writeBoolean(out, streamID != null); ExternalizableUtil.getInstance().writeBoolean(out, streamID != null);
if (streamID != null) { if (streamID != null) {
ExternalizableUtil.getInstance().writeSafeUTF(out, streamID); ExternalizableUtil.getInstance().writeSafeUTF( out, streamID.getID() );
} }
ExternalizableUtil.getInstance().writeInt(out, sessionType.ordinal()); ExternalizableUtil.getInstance().writeInt(out, sessionType.ordinal());
if (packet instanceof IQ) { if (packet instanceof IQ) {
...@@ -107,7 +109,7 @@ public class ProcessPacketTask implements ClusterTask<Void> { ...@@ -107,7 +109,7 @@ public class ProcessPacketTask implements ClusterTask<Void> {
address = (JID) ExternalizableUtil.getInstance().readSerializable(in); address = (JID) ExternalizableUtil.getInstance().readSerializable(in);
} }
if (ExternalizableUtil.getInstance().readBoolean(in)) { if (ExternalizableUtil.getInstance().readBoolean(in)) {
streamID = ExternalizableUtil.getInstance().readSafeUTF(in); streamID = BasicStreamIDFactory.createStreamID( ExternalizableUtil.getInstance().readSafeUTF(in) );
} }
sessionType = SessionType.values()[ExternalizableUtil.getInstance().readInt(in)]; sessionType = SessionType.values()[ExternalizableUtil.getInstance().readInt(in)];
int packetType = ExternalizableUtil.getInstance().readInt(in); int packetType = ExternalizableUtil.getInstance().readInt(in);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
package com.jivesoftware.openfire.session; package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.SessionManager; import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.session.IncomingServerSession; import org.jivesoftware.openfire.session.IncomingServerSession;
import org.jivesoftware.util.cache.ClusterTask; import org.jivesoftware.util.cache.ClusterTask;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
...@@ -36,9 +37,9 @@ public class RemoteIncomingServerSession extends RemoteSession implements Incomi ...@@ -36,9 +37,9 @@ public class RemoteIncomingServerSession extends RemoteSession implements Incomi
private String localDomain; private String localDomain;
public RemoteIncomingServerSession(byte[] nodeID, String streamID) { public RemoteIncomingServerSession(byte[] nodeID, StreamID streamID) {
super(nodeID, null); super(nodeID, null);
this.streamID = new BasicStreamID(streamID); this.streamID = streamID;
} }
public JID getAddress() { public JID getAddress() {
...@@ -53,7 +54,7 @@ public class RemoteIncomingServerSession extends RemoteSession implements Incomi ...@@ -53,7 +54,7 @@ public class RemoteIncomingServerSession extends RemoteSession implements Incomi
// Content is stored in a clustered cache so that even in the case of the node hosting // Content is stored in a clustered cache so that even in the case of the node hosting
// the sessions is lost we can still have access to this info to be able to perform // the sessions is lost we can still have access to this info to be able to perform
// proper clean up logic {@link ClusterListener#cleanupNode(NodeCacheKey) // proper clean up logic {@link ClusterListener#cleanupNode(NodeCacheKey)
return SessionManager.getInstance().getValidatedDomains(streamID.getID()); return SessionManager.getInstance().getValidatedDomains(streamID);
} }
public String getLocalDomain() { public String getLocalDomain() {
......
...@@ -74,8 +74,7 @@ public abstract class RemoteSession implements Session { ...@@ -74,8 +74,7 @@ public abstract class RemoteSession implements Session {
// Get it once and cache it since it never changes // Get it once and cache it since it never changes
if (streamID == null) { if (streamID == null) {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getStreamID); ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getStreamID);
String id = (String) doSynchronousClusterTask(task); streamID = (StreamID) doSynchronousClusterTask(task);
streamID = new BasicStreamID(id);
} }
return streamID; return streamID;
} }
...@@ -180,28 +179,4 @@ public abstract class RemoteSession implements Session { ...@@ -180,28 +179,4 @@ public abstract class RemoteSession implements Session {
protected void doClusterTask(ClusterTask task) { protected void doClusterTask(ClusterTask task) {
CacheFactory.doClusterTask(task, nodeID); CacheFactory.doClusterTask(task, nodeID);
} }
/**
* Simple implementation of the StreamID interface to hold the stream ID of
* the surrogated session.
*/
protected static class BasicStreamID implements StreamID {
String id;
public BasicStreamID(String id) {
this.id = id;
}
public String getID() {
return id;
}
public String toString() {
return id;
}
public int hashCode() {
return id.hashCode();
}
}
} }
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package com.jivesoftware.openfire.session; package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.session.*; import org.jivesoftware.openfire.session.*;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
...@@ -43,7 +44,7 @@ public class RemoteSessionLocator implements org.jivesoftware.openfire.session.R ...@@ -43,7 +44,7 @@ public class RemoteSessionLocator implements org.jivesoftware.openfire.session.R
return new RemoteConnectionMultiplexerSession(nodeID, address); return new RemoteConnectionMultiplexerSession(nodeID, address);
} }
public IncomingServerSession getIncomingServerSession(byte[] nodeID, String streamID) { public IncomingServerSession getIncomingServerSession(byte[] nodeID, StreamID streamID) {
return new RemoteIncomingServerSession(nodeID, streamID); return new RemoteIncomingServerSession(nodeID, streamID);
} }
......
...@@ -56,7 +56,7 @@ public abstract class RemoteSessionTask implements ClusterTask<Object> { ...@@ -56,7 +56,7 @@ public abstract class RemoteSessionTask implements ClusterTask<Object> {
public void run() { public void run() {
if (operation == Operation.getStreamID) { if (operation == Operation.getStreamID) {
result = getSession().getStreamID().getID(); result = getSession().getStreamID();
} }
else if (operation == Operation.getServerName) { else if (operation == Operation.getServerName) {
result = getSession().getServerName(); result = getSession().getServerName();
......
...@@ -24,16 +24,14 @@ import com.tangosol.util.MapEvent; ...@@ -24,16 +24,14 @@ import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener; import com.tangosol.util.MapListener;
import com.tangosol.util.UID; import com.tangosol.util.UID;
import com.tangosol.util.filter.MapEventFilter; import com.tangosol.util.filter.MapEventFilter;
import org.jivesoftware.openfire.PacketException; import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterManager; import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.cluster.NodeID; import org.jivesoftware.openfire.cluster.NodeID;
import org.jivesoftware.openfire.handler.DirectedPresence; import org.jivesoftware.openfire.handler.DirectedPresence;
import org.jivesoftware.openfire.handler.PresenceUpdateHandler; import org.jivesoftware.openfire.handler.PresenceUpdateHandler;
import org.jivesoftware.openfire.session.IncomingServerSession; import org.jivesoftware.openfire.session.IncomingServerSession;
import org.jivesoftware.openfire.session.RemoteSessionLocator; import org.jivesoftware.openfire.session.RemoteSessionLocator;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.openfire.spi.ClientRoute; import org.jivesoftware.openfire.spi.ClientRoute;
import org.jivesoftware.openfire.spi.RoutingTableImpl; import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
...@@ -408,7 +406,8 @@ public class ClusterListener implements MemberListener { ...@@ -408,7 +406,8 @@ public class ClusterListener implements MemberListener {
Set<String> incomingSessions = lookupJIDList(key, incomingServerSessionsCache.getName()); Set<String> incomingSessions = lookupJIDList(key, incomingServerSessionsCache.getName());
if (!incomingSessions.isEmpty()) { if (!incomingSessions.isEmpty()) {
for (String streamID : new ArrayList<String>(incomingSessions)) { for (String streamIDValue : new ArrayList<>(incomingSessions)) {
StreamID streamID = BasicStreamIDFactory.createStreamID( streamIDValue );
IncomingServerSession session = sessionLocator.getIncomingServerSession(key.toByteArray(), streamID); IncomingServerSession session = sessionLocator.getIncomingServerSession(key.toByteArray(), streamID);
// Remove all the hostnames that were registered for this server session // Remove all the hostnames that were registered for this server session
for (String hostname : session.getValidatedDomains()) { for (String hostname : session.getValidatedDomains()) {
......
...@@ -20,8 +20,10 @@ ...@@ -20,8 +20,10 @@
package org.jivesoftware.openfire.plugin.session; package org.jivesoftware.openfire.plugin.session;
import org.jivesoftware.openfire.SessionManager; import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.ClusterTask; import org.jivesoftware.util.cache.ClusterTask;
import org.jivesoftware.util.cache.ExternalizableUtil; import org.jivesoftware.util.cache.ExternalizableUtil;
...@@ -39,7 +41,7 @@ import java.io.ObjectOutput; ...@@ -39,7 +41,7 @@ import java.io.ObjectOutput;
public class DeliverRawTextTask implements ClusterTask<Void> { public class DeliverRawTextTask implements ClusterTask<Void> {
private SessionType sessionType; private SessionType sessionType;
private JID address; private JID address;
private String streamID; private StreamID streamID;
private String text; private String text;
public DeliverRawTextTask() { public DeliverRawTextTask() {
...@@ -66,7 +68,7 @@ public class DeliverRawTextTask implements ClusterTask<Void> { ...@@ -66,7 +68,7 @@ public class DeliverRawTextTask implements ClusterTask<Void> {
this.text = text; this.text = text;
} }
public DeliverRawTextTask(String streamID, String text) { public DeliverRawTextTask(StreamID streamID, String text) {
this.sessionType = SessionType.incomingServer; this.sessionType = SessionType.incomingServer;
this.streamID = streamID; this.streamID = streamID;
this.text = text; this.text = text;
...@@ -89,7 +91,7 @@ public class DeliverRawTextTask implements ClusterTask<Void> { ...@@ -89,7 +91,7 @@ public class DeliverRawTextTask implements ClusterTask<Void> {
} }
ExternalizableUtil.getInstance().writeBoolean(out, streamID != null); ExternalizableUtil.getInstance().writeBoolean(out, streamID != null);
if (streamID != null) { if (streamID != null) {
ExternalizableUtil.getInstance().writeSafeUTF(out, streamID); ExternalizableUtil.getInstance().writeSafeUTF( out, streamID.getID() );
} }
} }
...@@ -100,7 +102,7 @@ public class DeliverRawTextTask implements ClusterTask<Void> { ...@@ -100,7 +102,7 @@ public class DeliverRawTextTask implements ClusterTask<Void> {
address = (JID) ExternalizableUtil.getInstance().readSerializable(in); address = (JID) ExternalizableUtil.getInstance().readSerializable(in);
} }
if (ExternalizableUtil.getInstance().readBoolean(in)) { if (ExternalizableUtil.getInstance().readBoolean(in)) {
streamID = ExternalizableUtil.getInstance().readSafeUTF(in); streamID = BasicStreamIDFactory.createStreamID( ExternalizableUtil.getInstance().readSafeUTF(in) );
} }
} }
......
...@@ -20,8 +20,10 @@ ...@@ -20,8 +20,10 @@
package org.jivesoftware.openfire.plugin.session; package org.jivesoftware.openfire.plugin.session;
import org.jivesoftware.openfire.SessionManager; import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.session.IncomingServerSession; import org.jivesoftware.openfire.session.IncomingServerSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.util.cache.ExternalizableUtil; import org.jivesoftware.util.cache.ExternalizableUtil;
import java.io.IOException; import java.io.IOException;
...@@ -35,13 +37,13 @@ import java.io.ObjectOutput; ...@@ -35,13 +37,13 @@ import java.io.ObjectOutput;
* @author Gaston Dombiak * @author Gaston Dombiak
*/ */
public class IncomingServerSessionTask extends RemoteSessionTask { public class IncomingServerSessionTask extends RemoteSessionTask {
private String streamID; private StreamID streamID;
public IncomingServerSessionTask() { public IncomingServerSessionTask() {
super(); super();
} }
protected IncomingServerSessionTask(Operation operation, String streamID) { protected IncomingServerSessionTask(Operation operation, StreamID streamID) {
super(operation); super(operation);
this.streamID = streamID; this.streamID = streamID;
} }
...@@ -66,12 +68,12 @@ public class IncomingServerSessionTask extends RemoteSessionTask { ...@@ -66,12 +68,12 @@ public class IncomingServerSessionTask extends RemoteSessionTask {
public void writeExternal(ObjectOutput out) throws IOException { public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out); super.writeExternal(out);
ExternalizableUtil.getInstance().writeSafeUTF(out, streamID); ExternalizableUtil.getInstance().writeSafeUTF(out, streamID.getID());
} }
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in); super.readExternal(in);
streamID = ExternalizableUtil.getInstance().readSafeUTF(in); streamID = BasicStreamIDFactory.createStreamID( ExternalizableUtil.getInstance().readSafeUTF(in) );
} }
public String toString() { public String toString() {
......
...@@ -22,8 +22,10 @@ package org.jivesoftware.openfire.plugin.session; ...@@ -22,8 +22,10 @@ package org.jivesoftware.openfire.plugin.session;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.tree.DefaultElement; import org.dom4j.tree.DefaultElement;
import org.jivesoftware.openfire.SessionManager; import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.ClusterTask; import org.jivesoftware.util.cache.ClusterTask;
import org.jivesoftware.util.cache.ExternalizableUtil; import org.jivesoftware.util.cache.ExternalizableUtil;
...@@ -41,7 +43,7 @@ import java.io.ObjectOutput; ...@@ -41,7 +43,7 @@ import java.io.ObjectOutput;
public class ProcessPacketTask implements ClusterTask<Void> { public class ProcessPacketTask implements ClusterTask<Void> {
private SessionType sessionType; private SessionType sessionType;
private JID address; private JID address;
private String streamID; private StreamID streamID;
private Packet packet; private Packet packet;
public ProcessPacketTask() { public ProcessPacketTask() {
...@@ -68,7 +70,7 @@ public class ProcessPacketTask implements ClusterTask<Void> { ...@@ -68,7 +70,7 @@ public class ProcessPacketTask implements ClusterTask<Void> {
this.packet = packet; this.packet = packet;
} }
protected ProcessPacketTask(String streamID, Packet packet) { protected ProcessPacketTask(StreamID streamID, Packet packet) {
this.sessionType = SessionType.incomingServer; this.sessionType = SessionType.incomingServer;
this.streamID = streamID; this.streamID = streamID;
this.packet = packet; this.packet = packet;
...@@ -89,7 +91,7 @@ public class ProcessPacketTask implements ClusterTask<Void> { ...@@ -89,7 +91,7 @@ public class ProcessPacketTask implements ClusterTask<Void> {
} }
ExternalizableUtil.getInstance().writeBoolean(out, streamID != null); ExternalizableUtil.getInstance().writeBoolean(out, streamID != null);
if (streamID != null) { if (streamID != null) {
ExternalizableUtil.getInstance().writeSafeUTF(out, streamID); ExternalizableUtil.getInstance().writeSafeUTF( out, streamID.getID() );
} }
ExternalizableUtil.getInstance().writeInt(out, sessionType.ordinal()); ExternalizableUtil.getInstance().writeInt(out, sessionType.ordinal());
if (packet instanceof IQ) { if (packet instanceof IQ) {
...@@ -107,7 +109,7 @@ public class ProcessPacketTask implements ClusterTask<Void> { ...@@ -107,7 +109,7 @@ public class ProcessPacketTask implements ClusterTask<Void> {
address = (JID) ExternalizableUtil.getInstance().readSerializable(in); address = (JID) ExternalizableUtil.getInstance().readSerializable(in);
} }
if (ExternalizableUtil.getInstance().readBoolean(in)) { if (ExternalizableUtil.getInstance().readBoolean(in)) {
streamID = ExternalizableUtil.getInstance().readSafeUTF(in); streamID = BasicStreamIDFactory.createStreamID( ExternalizableUtil.getInstance().readSafeUTF(in) );
} }
sessionType = SessionType.values()[ExternalizableUtil.getInstance().readInt(in)]; sessionType = SessionType.values()[ExternalizableUtil.getInstance().readInt(in)];
int packetType = ExternalizableUtil.getInstance().readInt(in); int packetType = ExternalizableUtil.getInstance().readInt(in);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
package org.jivesoftware.openfire.plugin.session; package org.jivesoftware.openfire.plugin.session;
import org.jivesoftware.openfire.SessionManager; import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.session.IncomingServerSession; import org.jivesoftware.openfire.session.IncomingServerSession;
import org.jivesoftware.util.cache.ClusterTask; import org.jivesoftware.util.cache.ClusterTask;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
...@@ -37,9 +38,9 @@ public class RemoteIncomingServerSession extends RemoteSession implements Incomi ...@@ -37,9 +38,9 @@ public class RemoteIncomingServerSession extends RemoteSession implements Incomi
private String localDomain; private String localDomain;
private long usingServerDialback = -1; private long usingServerDialback = -1;
public RemoteIncomingServerSession(byte[] nodeID, String streamID) { public RemoteIncomingServerSession(byte[] nodeID, StreamID streamID) {
super(nodeID, null); super(nodeID, null);
this.streamID = new BasicStreamID(streamID); this.streamID = streamID;
} }
public boolean isUsingServerDialback() { public boolean isUsingServerDialback() {
...@@ -62,7 +63,7 @@ public class RemoteIncomingServerSession extends RemoteSession implements Incomi ...@@ -62,7 +63,7 @@ public class RemoteIncomingServerSession extends RemoteSession implements Incomi
// Content is stored in a clustered cache so that even in the case of the node hosting // Content is stored in a clustered cache so that even in the case of the node hosting
// the sessions is lost we can still have access to this info to be able to perform // the sessions is lost we can still have access to this info to be able to perform
// proper clean up logic {@link ClusterListener#cleanupNode(NodeCacheKey) // proper clean up logic {@link ClusterListener#cleanupNode(NodeCacheKey)
return SessionManager.getInstance().getValidatedDomains(streamID.getID()); return SessionManager.getInstance().getValidatedDomains(streamID);
} }
public String getLocalDomain() { public String getLocalDomain() {
...@@ -74,14 +75,14 @@ public class RemoteIncomingServerSession extends RemoteSession implements Incomi ...@@ -74,14 +75,14 @@ public class RemoteIncomingServerSession extends RemoteSession implements Incomi
} }
RemoteSessionTask getRemoteSessionTask(RemoteSessionTask.Operation operation) { RemoteSessionTask getRemoteSessionTask(RemoteSessionTask.Operation operation) {
return new IncomingServerSessionTask(operation, streamID.getID()); return new IncomingServerSessionTask(operation, streamID);
} }
ClusterTask getDeliverRawTextTask(String text) { ClusterTask getDeliverRawTextTask(String text) {
return new DeliverRawTextTask(streamID.getID(), text); return new DeliverRawTextTask(streamID, text);
} }
ClusterTask getProcessPacketTask(Packet packet) { ClusterTask getProcessPacketTask(Packet packet) {
return new ProcessPacketTask(streamID.getID(), packet); return new ProcessPacketTask(streamID, packet);
} }
} }
...@@ -79,8 +79,7 @@ public abstract class RemoteSession implements Session { ...@@ -79,8 +79,7 @@ public abstract class RemoteSession implements Session {
// Get it once and cache it since it never changes // Get it once and cache it since it never changes
if (streamID == null) { if (streamID == null) {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getStreamID); ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getStreamID);
String id = (String) doSynchronousClusterTask(task); streamID = (StreamID) doSynchronousClusterTask(task);
streamID = new BasicStreamID(id);
} }
return streamID; return streamID;
} }
...@@ -221,28 +220,4 @@ public abstract class RemoteSession implements Session { ...@@ -221,28 +220,4 @@ public abstract class RemoteSession implements Session {
public final Locale getLanguage() { public final Locale getLanguage() {
return Locale.getDefault(); return Locale.getDefault();
} }
/**
* Simple implementation of the StreamID interface to hold the stream ID of
* the surrogated session.
*/
protected static class BasicStreamID implements StreamID {
String id;
public BasicStreamID(String id) {
this.id = id;
}
public String getID() {
return id;
}
public String toString() {
return id;
}
public int hashCode() {
return id.hashCode();
}
}
} }
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.jivesoftware.openfire.plugin.session; package org.jivesoftware.openfire.plugin.session;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.session.*; import org.jivesoftware.openfire.session.*;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
...@@ -43,7 +44,7 @@ public class RemoteSessionLocator implements org.jivesoftware.openfire.session.R ...@@ -43,7 +44,7 @@ public class RemoteSessionLocator implements org.jivesoftware.openfire.session.R
return new RemoteConnectionMultiplexerSession(nodeID, address); return new RemoteConnectionMultiplexerSession(nodeID, address);
} }
public IncomingServerSession getIncomingServerSession(byte[] nodeID, String streamID) { public IncomingServerSession getIncomingServerSession(byte[] nodeID, StreamID streamID) {
return new RemoteIncomingServerSession(nodeID, streamID); return new RemoteIncomingServerSession(nodeID, streamID);
} }
......
...@@ -56,7 +56,7 @@ public abstract class RemoteSessionTask implements ClusterTask<Object> { ...@@ -56,7 +56,7 @@ public abstract class RemoteSessionTask implements ClusterTask<Object> {
public void run() { public void run() {
if (operation == Operation.getStreamID) { if (operation == Operation.getStreamID) {
result = getSession().getStreamID().getID(); result = getSession().getStreamID();
} }
else if (operation == Operation.getServerName) { else if (operation == Operation.getServerName) {
result = getSession().getServerName(); result = getSession().getServerName();
......
...@@ -29,10 +29,7 @@ import java.util.Set; ...@@ -29,10 +29,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import org.jivesoftware.openfire.PacketException; import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterManager; import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.cluster.ClusterNodeInfo; import org.jivesoftware.openfire.cluster.ClusterNodeInfo;
import org.jivesoftware.openfire.cluster.NodeID; import org.jivesoftware.openfire.cluster.NodeID;
...@@ -42,6 +39,7 @@ import org.jivesoftware.openfire.plugin.util.cluster.HazelcastClusterNodeInfo; ...@@ -42,6 +39,7 @@ import org.jivesoftware.openfire.plugin.util.cluster.HazelcastClusterNodeInfo;
import org.jivesoftware.openfire.session.ClientSessionInfo; import org.jivesoftware.openfire.session.ClientSessionInfo;
import org.jivesoftware.openfire.session.IncomingServerSession; import org.jivesoftware.openfire.session.IncomingServerSession;
import org.jivesoftware.openfire.session.RemoteSessionLocator; import org.jivesoftware.openfire.session.RemoteSessionLocator;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.openfire.spi.ClientRoute; import org.jivesoftware.openfire.spi.ClientRoute;
import org.jivesoftware.openfire.spi.RoutingTableImpl; import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.util.StringUtils; import org.jivesoftware.util.StringUtils;
...@@ -347,7 +345,8 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -347,7 +345,8 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
Set<String> incomingSessions = lookupJIDList(key, incomingServerSessionsCache.getName()); Set<String> incomingSessions = lookupJIDList(key, incomingServerSessionsCache.getName());
if (!incomingSessions.isEmpty()) { if (!incomingSessions.isEmpty()) {
for (String streamID : new ArrayList<String>(incomingSessions)) { for (String streamIDValue : new ArrayList<>(incomingSessions)) {
StreamID streamID = BasicStreamIDFactory.createStreamID( streamIDValue );
IncomingServerSession session = sessionLocator.getIncomingServerSession(key.toByteArray(), streamID); IncomingServerSession session = sessionLocator.getIncomingServerSession(key.toByteArray(), streamID);
// Remove all the hostnames that were registered for this server session // Remove all the hostnames that were registered for this server session
for (String hostname : session.getValidatedDomains()) { for (String hostname : session.getValidatedDomains()) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment