Commit 3ffa39da authored by Ben Vinson's avatar Ben Vinson Committed by benv

Ensure use of the same connection per streamID when communicating with a...

Ensure use of the same connection per streamID when communicating with a connection manager as discussed here: http://community.igniterealtime.org/message/204254

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@11928 b35dd754-fafc-0310-a699-88a17e54d16e
parent 784e68d9
...@@ -68,11 +68,12 @@ public class ClientSessionConnection extends VirtualConnection { ...@@ -68,11 +68,12 @@ public class ClientSessionConnection extends VirtualConnection {
* @param packet the packet to send to the user. * @param packet the packet to send to the user.
*/ */
public void deliver(Packet packet) { public void deliver(Packet packet) {
String streamID = session.getStreamID().getID();
ConnectionMultiplexerSession multiplexerSession = ConnectionMultiplexerSession multiplexerSession =
multiplexerManager.getMultiplexerSession(connectionManagerName); multiplexerManager.getMultiplexerSession(connectionManagerName,streamID);
if (multiplexerSession != null) { if (multiplexerSession != null) {
// Wrap packet so that the connection manager can figure out the target session // Wrap packet so that the connection manager can figure out the target session
Route wrapper = new Route(session.getStreamID().getID()); Route wrapper = new Route(streamID);
wrapper.setFrom(serverName); wrapper.setFrom(serverName);
wrapper.setTo(connectionManagerName); wrapper.setTo(connectionManagerName);
wrapper.setChildElement(packet.getElement().createCopy()); wrapper.setChildElement(packet.getElement().createCopy());
...@@ -94,14 +95,15 @@ public class ClientSessionConnection extends VirtualConnection { ...@@ -94,14 +95,15 @@ public class ClientSessionConnection extends VirtualConnection {
* @param text the stanza to send to the user. * @param text the stanza to send to the user.
*/ */
public void deliverRawText(String text) { public void deliverRawText(String text) {
String streamID = session.getStreamID().getID();
ConnectionMultiplexerSession multiplexerSession = ConnectionMultiplexerSession multiplexerSession =
multiplexerManager.getMultiplexerSession(connectionManagerName); multiplexerManager.getMultiplexerSession(connectionManagerName,streamID);
if (multiplexerSession != null) { if (multiplexerSession != null) {
// Wrap packet so that the connection manager can figure out the target session // Wrap packet so that the connection manager can figure out the target session
StringBuilder sb = new StringBuilder(200 + text.length()); StringBuilder sb = new StringBuilder(200 + text.length());
sb.append("<route from=\"").append(serverName); sb.append("<route from=\"").append(serverName);
sb.append("\" to=\"").append(connectionManagerName); sb.append("\" to=\"").append(connectionManagerName);
sb.append("\" streamid=\"").append(session.getStreamID().getID()).append("\">"); sb.append("\" streamid=\"").append(streamID).append("\">");
sb.append(text); sb.append(text);
sb.append("</route>"); sb.append("</route>");
// Deliver the wrapped stanza // Deliver the wrapped stanza
...@@ -164,7 +166,7 @@ public class ClientSessionConnection extends VirtualConnection { ...@@ -164,7 +166,7 @@ public class ClientSessionConnection extends VirtualConnection {
} }
else { else {
ConnectionMultiplexerSession multiplexerSession = ConnectionMultiplexerSession multiplexerSession =
multiplexerManager.getMultiplexerSession(connectionManagerName); multiplexerManager.getMultiplexerSession(connectionManagerName,streamID);
if (multiplexerSession != null) { if (multiplexerSession != null) {
// Server requested to close the client session so let the connection manager // Server requested to close the client session so let the connection manager
// know that he has to finish the client session // know that he has to finish the client session
......
...@@ -253,13 +253,15 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -253,13 +253,15 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
/** /**
* Returns a {@link ConnectionMultiplexerSession} for the specified connection manager * Returns a {@link ConnectionMultiplexerSession} for the specified connection manager
* domain or <tt>null</tt> if none was found. In case the connection manager has many * domain or <tt>null</tt> if none was found. If a StreamID is passed in, the same connection
* will always be used for that StreamID. Otherwise, if the connection manager has many
* connections established with the server then one of them will be selected randomly. * connections established with the server then one of them will be selected randomly.
* *
* @param connectionManagerDomain the domain of the connection manager to get a session. * @param connectionManagerDomain the domain of the connection manager to get a session.
* @param streamID if provided, the same connection will always be used for a given streamID
* @return a session to the specified connection manager domain or null if none was found. * @return a session to the specified connection manager domain or null if none was found.
*/ */
public ConnectionMultiplexerSession getMultiplexerSession(String connectionManagerDomain) { public ConnectionMultiplexerSession getMultiplexerSession(String connectionManagerDomain,String streamID) {
List<ConnectionMultiplexerSession> sessions = List<ConnectionMultiplexerSession> sessions =
sessionManager.getConnectionMultiplexerSessions(connectionManagerDomain); sessionManager.getConnectionMultiplexerSessions(connectionManagerDomain);
if (sessions.isEmpty()) { if (sessions.isEmpty()) {
...@@ -268,12 +270,29 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -268,12 +270,29 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
else if (sessions.size() == 1) { else if (sessions.size() == 1) {
return sessions.get(0); return sessions.get(0);
} }
else { else if (streamID != null) {
// Always use the same connection for a given streamID
int connectionIndex = Math.abs(streamID.hashCode()) % sessions.size();
return sessions.get(connectionIndex);
} else {
// Pick a random session so we can distribute traffic evenly // Pick a random session so we can distribute traffic evenly
return sessions.get(randGen.nextInt(sessions.size())); return sessions.get(randGen.nextInt(sessions.size()));
} }
} }
/**
* Returns a {@link ConnectionMultiplexerSession} for the specified connection manager
* domain or <tt>null</tt> if none was found. In case the connection manager has many
* connections established with the server then one of them will be selected randomly.
*
* @param connectionManagerDomain the domain of the connection manager to get a session.
* @return a session to the specified connection manager domain or null if none was found.
*/
public ConnectionMultiplexerSession getMultiplexerSession(String connectionManagerDomain) {
return getMultiplexerSession(connectionManagerDomain,null);
}
/** /**
* Returns the names of the connected connection managers to this server. * Returns the names of the connected connection managers to this server.
* *
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment