Commit a1edc2c5 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

More connection manager work. JM-666

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@3993 b35dd754-fafc-0310-a699-88a17e54d16e
parent fb54dbba
...@@ -117,6 +117,24 @@ public interface ConnectionManager { ...@@ -117,6 +117,24 @@ public interface ConnectionManager {
*/ */
public boolean isServerListenerEnabled(); public boolean isServerListenerEnabled();
/**
* Sets if the port listener for connection managers will be available or not. When disabled
* there won't be a port listener active. Therefore, clients will need to connect directly
* to the server.
*
* @param enabled true if new connection managers will be able to connect to the server.
*/
public void enableConnectionManagerListener(boolean enabled);
/**
* Returns true if the port listener for connection managers is available. When disabled
* there won't be a port listener active. Therefore, clients will need to connect directly
* to the server.
*
* @return true if the port listener for connection managers is available.
*/
public boolean isConnectionManagerListenerEnabled();
/** /**
* Sets the port to use for unsecured clients. Default port: 5222. * Sets the port to use for unsecured clients. Default port: 5222.
* *
...@@ -174,4 +192,20 @@ public interface ConnectionManager { ...@@ -174,4 +192,20 @@ public interface ConnectionManager {
* @return the port to use for remote servers. * @return the port to use for remote servers.
*/ */
public int getServerListenerPort(); public int getServerListenerPort();
/**
* Sets the port to use for connection managers. This port is used for connection managers
* to connect to this server. Default port: 5262.
*
* @param port the port to use for connection managers.
*/
public void setConnectionManagerListenerPort(int port);
/**
* Returns the port to use for remote servers. This port is used for connection managers
* to connect to this server. Default port: 5262.
*
* @return the port to use for connection managers.
*/
public int getConnectionManagerListenerPort();
} }
...@@ -203,6 +203,22 @@ public class SessionManager extends BasicModule { ...@@ -203,6 +203,22 @@ public class SessionManager extends BasicModule {
} }
} }
/**
* Returns all sessions originated from connection managers.
*
* @return all sessions originated from connection managers.
*/
public List<ConnectionMultiplexerSession> getConnectionMultiplexerSessions() {
if (connnectionManagerSessions.isEmpty()) {
return Collections.emptyList();
}
List<ConnectionMultiplexerSession> answer = new ArrayList<ConnectionMultiplexerSession>();
for (List<ConnectionMultiplexerSession> sessions : connnectionManagerSessions.values()) {
answer.addAll(sessions);
}
return answer;
}
/** /**
* Returns a collection with all the sessions originated from the connection manager * Returns a collection with all the sessions originated from the connection manager
* whose domain matches the specified domain. If there is no connection manager with * whose domain matches the specified domain. If there is no connection manager with
...@@ -249,6 +265,10 @@ public class SessionManager extends BasicModule { ...@@ -249,6 +265,10 @@ public class SessionManager extends BasicModule {
if (sessions == null) { if (sessions == null) {
sessions = new CopyOnWriteArrayList<ConnectionMultiplexerSession>(); sessions = new CopyOnWriteArrayList<ConnectionMultiplexerSession>();
connnectionManagerSessions.put(address.getDomain(), sessions); connnectionManagerSessions.put(address.getDomain(), sessions);
// Notify ConnectionMultiplexerManager that a new connection manager
// is available
ConnectionMultiplexerManager.getInstance()
.multiplexerAvailable(address.getDomain());
} }
} }
} }
...@@ -1483,7 +1503,7 @@ public class SessionManager extends BasicModule { ...@@ -1483,7 +1503,7 @@ public class SessionManager extends BasicModule {
connnectionManagerSessions.remove(domain); connnectionManagerSessions.remove(domain);
// Terminate ClientSessions originated from this connection manager // Terminate ClientSessions originated from this connection manager
// that are still active since the connection manager has gone down // that are still active since the connection manager has gone down
ConnectionMultiplexerManager.getInstance().closeClientSessions(domain); ConnectionMultiplexerManager.getInstance().multiplexerUnavailable(domain);
} }
} }
} }
...@@ -1603,7 +1623,8 @@ public class SessionManager extends BasicModule { ...@@ -1603,7 +1623,8 @@ public class SessionManager extends BasicModule {
for (Session session : sessions) { for (Session session : sessions) {
try { try {
session.getConnection().close(); // Notify connected client that the server is being shut down
session.getConnection().systemShutdown();
} }
catch (Throwable t) { catch (Throwable t) {
// Ignore. // Ignore.
......
...@@ -11,12 +11,14 @@ ...@@ -11,12 +11,14 @@
package org.jivesoftware.wildfire.multiplex; package org.jivesoftware.wildfire.multiplex;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.*; import org.jivesoftware.wildfire.*;
import org.jivesoftware.wildfire.auth.UnauthorizedException; import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.event.SessionEventDispatcher; import org.jivesoftware.wildfire.event.SessionEventDispatcher;
import org.jivesoftware.wildfire.event.SessionEventListener; import org.jivesoftware.wildfire.event.SessionEventListener;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
...@@ -68,8 +70,54 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -68,8 +70,54 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
return instance; return instance;
} }
/**
* Returns the default secret key that connection managers should present while trying to
* establish a new connection.
*
* @return the default secret key that connection managers should present while trying to
* establish a new connection.
*/
public static String getDefaultSecret() {
return JiveGlobals.getProperty("xmpp.multiplex.defaultSecret");
}
/**
* Sets the default secret key that connection managers should present while trying to
* establish a new connection.
*
* @param defaultSecret the default secret key that connection managers should present
* while trying to establish a new connection.
*/
public static void setDefaultSecret(String defaultSecret) {
JiveGlobals.setProperty("xmpp.multiplex.defaultSecret", defaultSecret);
}
private ConnectionMultiplexerManager() { private ConnectionMultiplexerManager() {
sessionManager = XMPPServer.getInstance().getSessionManager(); sessionManager = XMPPServer.getInstance().getSessionManager();
// Start thread that will send heartbeats to Connection Managers every 30 seconds
// to keep connections open.
Thread hearbeatThread = new Thread() {
public void run() {
while (true) {
try {
Thread.sleep(30000);
for (ConnectionMultiplexerSession session : sessionManager
.getConnectionMultiplexerSessions()) {
session.getConnection().deliverRawText(" ");
}
}
catch (InterruptedException e) {
// Do nothing
}
catch(Exception e) {
Log.error(e);
}
}
}
};
hearbeatThread.setDaemon(true);
hearbeatThread.setPriority(Thread.NORM_PRIORITY);
hearbeatThread.start();
} }
/** /**
...@@ -124,15 +172,35 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -124,15 +172,35 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
} }
/** /**
* Close client sessions that were established to the specified connection manager. This * A connection manager has become available. Clients can now connect to the server through
* action is usually required when the connection manager was stopped or suddenly went * the connection manager.
* down. *
* @param connectionManagerName the connection manager that has become available.
*/
public void multiplexerAvailable(String connectionManagerName) {
// Add a new entry in the list of available managers. Here is where we are going to store
// which clients were connected through which connection manager
Map<String, ClientSession> sessions = sessionsByManager.get(connectionManagerName);
if (sessions == null) {
synchronized (connectionManagerName.intern()) {
sessions = sessionsByManager.get(connectionManagerName);
if (sessions == null) {
sessions = new ConcurrentHashMap<String, ClientSession>();
sessionsByManager.put(connectionManagerName, sessions);
}
}
}
}
/**
* A connection manager has gone unavailable. Close client sessions that were established
* to the specified connection manager.
* *
* @param connectionManagerDomain the connection manager that is no longer available. * @param connectionManagerName the connection manager that is no longer available.
*/ */
public void closeClientSessions(String connectionManagerDomain) { public void multiplexerUnavailable(String connectionManagerName) {
// Remove the connection manager and the hosted sessions // Remove the connection manager and the hosted sessions
Map<String, ClientSession> sessions = sessionsByManager.remove(connectionManagerDomain); Map<String, ClientSession> sessions = sessionsByManager.remove(connectionManagerName);
if (sessions != null) { if (sessions != null) {
for (String streamID : sessions.keySet()) { for (String streamID : sessions.keySet()) {
// Remove inverse track of connection manager hosting streamIDs // Remove inverse track of connection manager hosting streamIDs
...@@ -183,6 +251,31 @@ public class ConnectionMultiplexerManager implements SessionEventListener { ...@@ -183,6 +251,31 @@ public class ConnectionMultiplexerManager implements SessionEventListener {
} }
} }
/**
* Returns the names of the connected connection managers to this server.
*
* @return the names of the connected connection managers to this server.
*/
public Collection<String> getMultiplexers() {
return sessionsByManager.keySet();
}
/**
* Returns the number of connected clients to a specific connection manager.
*
* @param managerName the name of the connection manager.
* @return the number of connected clients to a specific connection manager.
*/
public int getNumConnectedClients(String managerName) {
Map<String, ClientSession> clients = sessionsByManager.get(managerName);
if (clients == null) {
return 0;
}
else {
return clients.size();
}
}
public void anonymousSessionCreated(Session session) { public void anonymousSessionCreated(Session session) {
// Do nothing. // Do nothing.
} }
......
...@@ -19,7 +19,6 @@ import org.jivesoftware.util.Log; ...@@ -19,7 +19,6 @@ import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.*; import org.jivesoftware.wildfire.*;
import org.jivesoftware.wildfire.auth.AuthFactory; import org.jivesoftware.wildfire.auth.AuthFactory;
import org.jivesoftware.wildfire.auth.UnauthorizedException; import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.component.ExternalComponentManager;
import org.jivesoftware.wildfire.net.SASLAuthentication; import org.jivesoftware.wildfire.net.SASLAuthentication;
import org.jivesoftware.wildfire.net.SocketConnection; import org.jivesoftware.wildfire.net.SocketConnection;
import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParser;
...@@ -75,8 +74,8 @@ public class ConnectionMultiplexerSession extends Session { ...@@ -75,8 +74,8 @@ public class ConnectionMultiplexerSession extends Session {
Connection.CompressionPolicy.disabled.toString()); Connection.CompressionPolicy.disabled.toString());
compressionPolicy = Connection.CompressionPolicy.valueOf(policyName); compressionPolicy = Connection.CompressionPolicy.valueOf(policyName);
// Set the default read idle timeout. If none was set then assume 30 minutes // Set the default read idle timeout. If none was set then assume 5 minutes
idleTimeout = JiveGlobals.getIntProperty("xmpp.multiplex.idle", 30 * 60 * 1000); idleTimeout = JiveGlobals.getIntProperty("xmpp.multiplex.idle", 5 * 60 * 1000);
} }
public static Session createSession(String serverName, XMPPPacketReader reader, public static Session createSession(String serverName, XMPPPacketReader reader,
...@@ -115,7 +114,7 @@ public class ConnectionMultiplexerSession extends Session { ...@@ -115,7 +114,7 @@ public class ConnectionMultiplexerSession extends Session {
// Get the requested domain // Get the requested domain
JID address = new JID(domain); JID address = new JID(domain);
// Check that a secret key was configured in the server // Check that a secret key was configured in the server
String secretKey = getSecretKey(); String secretKey = ConnectionMultiplexerManager.getDefaultSecret();
if (secretKey == null) { if (secretKey == null) {
Log.debug("[ConMng] A shared secret for connection manager was not found."); Log.debug("[ConMng] A shared secret for connection manager was not found.");
// Include the internal-server-error in the response // Include the internal-server-error in the response
...@@ -212,11 +211,6 @@ public class ConnectionMultiplexerSession extends Session { ...@@ -212,11 +211,6 @@ public class ConnectionMultiplexerSession extends Session {
} }
} }
private static String getSecretKey() {
// TODO Use another shared secret (?)
return ExternalComponentManager.getDefaultSecret();
}
public ConnectionMultiplexerSession(String serverName, Connection connection, StreamID streamID) { public ConnectionMultiplexerSession(String serverName, Connection connection, StreamID streamID) {
super(serverName, connection, streamID); super(serverName, connection, streamID);
} }
...@@ -249,7 +243,8 @@ public class ConnectionMultiplexerSession extends Session { ...@@ -249,7 +243,8 @@ public class ConnectionMultiplexerSession extends Session {
*/ */
public boolean authenticate(String digest) { public boolean authenticate(String digest) {
// Perform authentication. Wait for the handshake (with the secret key) // Perform authentication. Wait for the handshake (with the secret key)
String anticipatedDigest = AuthFactory.createDigest(getStreamID().getID(), getSecretKey()); String anticipatedDigest = AuthFactory.createDigest(getStreamID().getID(),
ConnectionMultiplexerManager.getDefaultSecret());
// Check that the provided handshake (secret key + sessionID) is correct // Check that the provided handshake (secret key + sessionID) is correct
if (!anticipatedDigest.equalsIgnoreCase(digest)) { if (!anticipatedDigest.equalsIgnoreCase(digest)) {
Log.debug("[ConMng] Incorrect handshake for connection manager with domain: " + Log.debug("[ConMng] Incorrect handshake for connection manager with domain: " +
...@@ -309,9 +304,7 @@ public class ConnectionMultiplexerSession extends Session { ...@@ -309,9 +304,7 @@ public class ConnectionMultiplexerSession extends Session {
comp.addElement("method").setText("zlib"); comp.addElement("method").setText("zlib");
} }
// Add info about Non-SASL authentication // Add info about Non-SASL authentication
if (XMPPServer.getInstance().getIQAuthHandler().isAllowAnonymous()) { child.addElement("auth", "http://jabber.org/features/iq-auth");
child.addElement("auth", "http://jabber.org/features/iq-auth");
}
// Add info about In-Band Registration // Add info about In-Band Registration
if (XMPPServer.getInstance().getIQRegisterHandler().isInbandRegEnabled()) { if (XMPPServer.getInstance().getIQRegisterHandler().isInbandRegEnabled()) {
child.addElement("register", "http://jabber.org/features/iq-register"); child.addElement("register", "http://jabber.org/features/iq-register");
......
...@@ -49,6 +49,12 @@ public class MultiplexerPacketHandler { ...@@ -49,6 +49,12 @@ public class MultiplexerPacketHandler {
multiplexerManager = ConnectionMultiplexerManager.getInstance(); multiplexerManager = ConnectionMultiplexerManager.getInstance();
} }
/**
* Process IQ packet sent by a connection manager indicating that a new session has
* been created, should be closed or that a packet was failed to be delivered.
*
* @param packet the IQ packet.
*/
public void handle(Packet packet) { public void handle(Packet packet) {
if (packet instanceof IQ) { if (packet instanceof IQ) {
IQ iq = (IQ) packet; IQ iq = (IQ) packet;
...@@ -87,52 +93,6 @@ public class MultiplexerPacketHandler { ...@@ -87,52 +93,6 @@ public class MultiplexerPacketHandler {
.closeClientSession(connectionManagerDomain, streamID); .closeClientSession(connectionManagerDomain, streamID);
sendResultPacket(iq); sendResultPacket(iq);
} }
else if (child.element("send") != null) {
// Connection Manager wrapped a packet from a Client Session.
List wrappedElements = child.element("send").elements();
if (wrappedElements.size() != 1) {
// Wrapper element is wrapping 0 or many items
Element extraError = DocumentHelper.createElement(QName.get(
"invalid-payload",
"http://jabber.org/protocol/connectionmanager#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request, extraError);
}
else {
Element wrappedElement = (Element) wrappedElements.get(0);
String tag = wrappedElement.getName();
try {
if ("auth".equals(tag) || "response".equals(tag)) {
SASLAuthentication.handle(session, wrappedElement);
}
else if ("iq".equals(tag)) {
processIQ(session, getIQ(wrappedElement));
}
else if ("message".equals(tag)) {
processMessage(session, new Message(wrappedElement));
}
else if ("presence".equals(tag)) {
processPresence(session, new Presence(wrappedElement));
}
else {
Element extraError = DocumentHelper.createElement(QName.get(
"unknown-stanza",
"http://jabber.org/protocol/connectionmanager#errors"));
sendErrorPacket(iq, PacketError.Condition.bad_request,
extraError);
return;
}
}
catch (UnsupportedEncodingException e) {
Log.error("Error processing wrapped packet: " +
wrappedElement.asXML(), e);
sendErrorPacket(iq, PacketError.Condition.internal_server_error,
null);
}
}
// The wrapped packet does not contain information about the sender
// so the streamID helps identify the Client Session
sendResultPacket(iq);
}
else if (child.element("failed") != null) { else if (child.element("failed") != null) {
// Connection Manager failed to deliver a message // Connection Manager failed to deliver a message
// Connection Manager wrapped a packet from a Client Session. // Connection Manager wrapped a packet from a Client Session.
...@@ -179,7 +139,58 @@ public class MultiplexerPacketHandler { ...@@ -179,7 +139,58 @@ public class MultiplexerPacketHandler {
} }
} }
/**
* Processes a route packet that is wrapping a stanza sent by a client that is connected
* to the connection manager.
*
* @param route the route packet.
*/
public void route(Route route) {
String streamID = route.getStreamID();
if (streamID == null) {
// No stream ID was included so return a bad_request error
Element extraError = DocumentHelper.createElement(QName.get(
"id-required", "http://jabber.org/protocol/connectionmanager#errors"));
sendErrorPacket(route, PacketError.Condition.bad_request, extraError);
}
ClientSession session = multiplexerManager
.getClientSession(connectionManagerDomain, streamID);
if (session == null) {
// Specified Client Session does not exist
sendErrorPacket(route, PacketError.Condition.item_not_found, null);
return;
}
// Connection Manager wrapped a packet from a Client Session.
Element wrappedElement = route.getChildElement();
String tag = wrappedElement.getName();
try {
if ("auth".equals(tag) || "response".equals(tag)) {
SASLAuthentication.handle(session, wrappedElement);
}
else if ("iq".equals(tag)) {
processIQ(session, getIQ(wrappedElement));
}
else if ("message".equals(tag)) {
processMessage(session, new Message(wrappedElement));
}
else if ("presence".equals(tag)) {
processPresence(session, new Presence(wrappedElement));
}
else {
Element extraError = DocumentHelper.createElement(QName.get(
"unknown-stanza",
"http://jabber.org/protocol/connectionmanager#errors"));
sendErrorPacket(route, PacketError.Condition.bad_request, extraError);
}
}
catch (UnsupportedEncodingException e) {
Log.error("Error processing wrapped packet: " + wrappedElement.asXML(), e);
sendErrorPacket(route, PacketError.Condition.internal_server_error, null);
}
}
private void processIQ(ClientSession session, IQ packet) { private void processIQ(ClientSession session, IQ packet) {
packet.setFrom(session.getAddress());
try { try {
// Invoke the interceptors before we process the read packet // Invoke the interceptors before we process the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true, InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
...@@ -212,6 +223,7 @@ public class MultiplexerPacketHandler { ...@@ -212,6 +223,7 @@ public class MultiplexerPacketHandler {
} }
private void processPresence(ClientSession session, Presence packet) { private void processPresence(ClientSession session, Presence packet) {
packet.setFrom(session.getAddress());
try { try {
// Invoke the interceptors before we process the read packet // Invoke the interceptors before we process the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true, InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
...@@ -243,6 +255,7 @@ public class MultiplexerPacketHandler { ...@@ -243,6 +255,7 @@ public class MultiplexerPacketHandler {
} }
private void processMessage(ClientSession session, Message packet) { private void processMessage(ClientSession session, Message packet) {
packet.setFrom(session.getAddress());
try { try {
// Invoke the interceptors before we process the read packet // Invoke the interceptors before we process the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true, InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
...@@ -297,6 +310,26 @@ public class MultiplexerPacketHandler { ...@@ -297,6 +310,26 @@ public class MultiplexerPacketHandler {
deliver(reply); deliver(reply);
} }
/**
* Sends an IQ error with the specified condition to the sender of the original
* IQ packet.
*
* @param packet the packet to be bounced.
* @param extraError application specific error or null if none.
*/
private void sendErrorPacket(Route packet, PacketError.Condition error, Element extraError) {
Route reply = new Route(packet.getStreamID());
reply.setID(packet.getID());
reply.setFrom(packet.getTo());
reply.setTo(packet.getFrom());
reply.setError(error);
if (extraError != null) {
// Add specific application error if available
reply.getError().getElement().add(extraError);
}
deliver(reply);
}
/** /**
* Sends an IQ result packet confirming that the operation was successful. * Sends an IQ result packet confirming that the operation was successful.
* *
...@@ -308,7 +341,7 @@ public class MultiplexerPacketHandler { ...@@ -308,7 +341,7 @@ public class MultiplexerPacketHandler {
deliver(reply); deliver(reply);
} }
private void deliver(IQ reply) { private void deliver(Packet reply) {
// Get any session of the connection manager to deliver the packet // Get any session of the connection manager to deliver the packet
ConnectionMultiplexerSession session = ConnectionMultiplexerSession session =
multiplexerManager.getMultiplexerSession(connectionManagerDomain); multiplexerManager.getMultiplexerSession(connectionManagerDomain);
......
...@@ -39,6 +39,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -39,6 +39,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
private SessionManager sessionManager; private SessionManager sessionManager;
private PacketDeliverer deliverer; private PacketDeliverer deliverer;
private PacketRouter router; private PacketRouter router;
private RoutingTable routingTable;
private String serverName; private String serverName;
private XMPPServer server; private XMPPServer server;
private String localIPAddress = null; private String localIPAddress = null;
...@@ -74,7 +75,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -74,7 +75,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
// Start the port listener for s2s communication // Start the port listener for s2s communication
startServerListener(localIPAddress); startServerListener(localIPAddress);
// Start the port listener for Connections Multiplexers // Start the port listener for Connections Multiplexers
startMultiplexerListener(localIPAddress); startConnectionManagerListener(localIPAddress);
// Start the port listener for external components // Start the port listener for external components
startComponentListener(localIPAddress); startComponentListener(localIPAddress);
// Start the port listener for clients // Start the port listener for clients
...@@ -116,10 +117,10 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -116,10 +117,10 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
} }
} }
private void startMultiplexerListener(String localIPAddress) { private void startConnectionManagerListener(String localIPAddress) {
// Start multiplexers socket unless it's been disabled. // Start multiplexers socket unless it's been disabled.
if (isMultiplexerListenerEnabled()) { if (isConnectionManagerListenerEnabled()) {
int port = getMultiplexerListenerPort(); int port = getConnectionManagerListenerPort();
ServerPort serverPort = new ServerPort(port, serverName, localIPAddress, ServerPort serverPort = new ServerPort(port, serverName, localIPAddress,
false, null, ServerPort.Type.connectionManager); false, null, ServerPort.Type.connectionManager);
try { try {
...@@ -141,7 +142,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -141,7 +142,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
} }
} }
private void stopMultiplexerListener() { private void stopConnectionManagerListener() {
if (multiplexerSocketThread != null) { if (multiplexerSocketThread != null) {
multiplexerSocketThread.shutdown(); multiplexerSocketThread.shutdown();
ports.remove(multiplexerSocketThread.getServerPort()); ports.remove(multiplexerSocketThread.getServerPort());
...@@ -260,15 +261,18 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -260,15 +261,18 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
boolean useBlockingMode) throws IOException { boolean useBlockingMode) throws IOException {
if (serverPort.isClientPort()) { if (serverPort.isClientPort()) {
SocketConnection conn = new SocketConnection(deliverer, sock, isSecure); SocketConnection conn = new SocketConnection(deliverer, sock, isSecure);
return new ClientSocketReader(router, serverName, sock, conn, useBlockingMode); return new ClientSocketReader(router, routingTable, serverName, sock, conn,
useBlockingMode);
} }
else if (serverPort.isComponentPort()) { else if (serverPort.isComponentPort()) {
SocketConnection conn = new SocketConnection(deliverer, sock, isSecure); SocketConnection conn = new SocketConnection(deliverer, sock, isSecure);
return new ComponentSocketReader(router, serverName, sock, conn, useBlockingMode); return new ComponentSocketReader(router, routingTable, serverName, sock, conn,
useBlockingMode);
} }
else if (serverPort.isServerPort()) { else if (serverPort.isServerPort()) {
SocketConnection conn = new SocketConnection(deliverer, sock, isSecure); SocketConnection conn = new SocketConnection(deliverer, sock, isSecure);
return new ServerSocketReader(router, serverName, sock, conn, useBlockingMode); return new ServerSocketReader(router, routingTable, serverName, sock, conn,
useBlockingMode);
} }
else { else {
// Use the appropriate packeet deliverer for connection managers. The packet // Use the appropriate packeet deliverer for connection managers. The packet
...@@ -276,8 +280,8 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -276,8 +280,8 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
// the connection manager has finished the handshake. // the connection manager has finished the handshake.
SocketConnection conn = SocketConnection conn =
new SocketConnection(new MultiplexerPacketDeliverer(), sock, isSecure); new SocketConnection(new MultiplexerPacketDeliverer(), sock, isSecure);
return new ConnectionMultiplexerSocketReader(router, serverName, sock, conn, return new ConnectionMultiplexerSocketReader(router, routingTable, serverName, sock,
useBlockingMode); conn, useBlockingMode);
} }
} }
...@@ -285,6 +289,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -285,6 +289,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
super.initialize(server); super.initialize(server);
this.server = server; this.server = server;
router = server.getPacketRouter(); router = server.getPacketRouter();
routingTable = server.getRoutingTable();
deliverer = server.getPacketDeliverer(); deliverer = server.getPacketDeliverer();
sessionManager = server.getSessionManager(); sessionManager = server.getSessionManager();
} }
...@@ -373,25 +378,25 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -373,25 +378,25 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
return JiveGlobals.getBooleanProperty("xmpp.server.socket.active", true); return JiveGlobals.getBooleanProperty("xmpp.server.socket.active", true);
} }
public void enableMultiplexerListener(boolean enabled) { public void enableConnectionManagerListener(boolean enabled) {
if (enabled == isMultiplexerListenerEnabled()) { if (enabled == isConnectionManagerListenerEnabled()) {
// Ignore new setting // Ignore new setting
return; return;
} }
if (enabled) { if (enabled) {
JiveGlobals.setProperty("xmpp.multiplex.socket.active", "true"); JiveGlobals.setProperty("xmpp.multiplex.socket.active", "true");
// Start the port listener for s2s communication // Start the port listener for s2s communication
startMultiplexerListener(localIPAddress); startConnectionManagerListener(localIPAddress);
} }
else { else {
JiveGlobals.setProperty("xmpp.multiplex.socket.active", "false"); JiveGlobals.setProperty("xmpp.multiplex.socket.active", "false");
// Stop the port listener for s2s communication // Stop the port listener for s2s communication
stopMultiplexerListener(); stopConnectionManagerListener();
} }
} }
public boolean isMultiplexerListenerEnabled() { public boolean isConnectionManagerListenerEnabled() {
return JiveGlobals.getBooleanProperty("xmpp.multiplex.socket.active", true); return JiveGlobals.getBooleanProperty("xmpp.multiplex.socket.active", false);
} }
public void setClientListenerPort(int port) { public void setClientListenerPort(int port) {
...@@ -470,7 +475,21 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -470,7 +475,21 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
SocketAcceptThread.DEFAULT_SERVER_PORT); SocketAcceptThread.DEFAULT_SERVER_PORT);
} }
public int getMultiplexerListenerPort() { public void setConnectionManagerListenerPort(int port) {
if (port == getConnectionManagerListenerPort()) {
// Ignore new setting
return;
}
JiveGlobals.setProperty("xmpp.multiplex.socket.port", String.valueOf(port));
// Stop the port listener for connection managers
stopConnectionManagerListener();
if (isConnectionManagerListenerEnabled()) {
// Start the port listener for connection managers
startConnectionManagerListener(localIPAddress);
}
}
public int getConnectionManagerListenerPort() {
return JiveGlobals.getIntProperty("xmpp.multiplex.socket.port", return JiveGlobals.getIntProperty("xmpp.multiplex.socket.port",
SocketAcceptThread.DEFAULT_MULTIPLEX_PORT); SocketAcceptThread.DEFAULT_MULTIPLEX_PORT);
} }
...@@ -492,7 +511,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -492,7 +511,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
stopClientListeners(); stopClientListeners();
stopClientSSLListeners(); stopClientSSLListeners();
stopComponentListener(); stopComponentListener();
stopMultiplexerListener(); stopConnectionManagerListener();
stopServerListener(); stopServerListener();
SocketSendingTracker.getInstance().shutdown(); SocketSendingTracker.getInstance().shutdown();
serverName = null; serverName = null;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment