Commit 20b26fe2 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

More clustering work on 1) start/stop components and 2) cluster events handling.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@8500 b35dd754-fafc-0310-a699-88a17e54d16e
parent 24eec17b
...@@ -87,7 +87,7 @@ class LocalSessionManager { ...@@ -87,7 +87,7 @@ class LocalSessionManager {
return connnectionManagerSessions; return connnectionManagerSessions;
} }
public LocalIncomingServerSession getIncomingServerSessions(String streamID) { public LocalIncomingServerSession getIncomingServerSession(String streamID) {
return incomingServerSessions.get(streamID); return incomingServerSessions.get(streamID);
} }
......
...@@ -300,6 +300,15 @@ public interface RoutingTable { ...@@ -300,6 +300,15 @@ public interface RoutingTable {
*/ */
void setRemotePacketRouter(RemotePacketRouter remotePacketRouter); void setRemotePacketRouter(RemotePacketRouter remotePacketRouter);
/**
* Returns the {@link RemotePacketRouter} to use for deliverying packets to entities hosted
* in remote nodes of the cluster or <tt>null</tt> if none was set.
*
* @return the RemotePacketRouter to use for deliverying packets to entities hosted
* in remote nodes of the cluster.
*/
RemotePacketRouter getRemotePacketRouter();
/** /**
* Broadcasts the specified message to connected client sessions to the local node or * Broadcasts the specified message to connected client sessions to the local node or
* across the cluster. Both available and unavailable client sessions will receive the message. * across the cluster. Both available and unavailable client sessions will receive the message.
......
...@@ -421,11 +421,12 @@ public class SessionManager extends BasicModule { ...@@ -421,11 +421,12 @@ public class SessionManager extends BasicModule {
* @param hostname the hostname that is being served by the remote server. * @param hostname the hostname that is being served by the remote server.
* @param session the session to unregiser. * @param session the session to unregiser.
*/ */
private void unregisterIncomingServerSession(String hostname, IncomingServerSession session) { public void unregisterIncomingServerSession(String hostname, IncomingServerSession session) {
// Remove local track of the incoming server session connected to this JVM // Remove local track of the incoming server session connected to this JVM
localSessionManager.removeIncomingServerSessions(session.getStreamID().getID()); localSessionManager.removeIncomingServerSessions(session.getStreamID().getID());
// Remove track of the nodeID hosting the incoming server session // Remove track of the nodeID hosting the incoming server session
incomingServerSessionsCache.remove(session.getStreamID().getID()); incomingServerSessionsCache.remove(session.getStreamID().getID());
// Remove from list of sockets/sessions coming from the remote hostname // Remove from list of sockets/sessions coming from the remote hostname
Lock lock = LockManager.getLock(hostname); Lock lock = LockManager.getLock(hostname);
try { try {
...@@ -689,6 +690,16 @@ public class SessionManager extends BasicModule { ...@@ -689,6 +690,16 @@ public class SessionManager extends BasicModule {
return results; return results;
} }
/**
* Returns the incoming server session hosted by this JVM that matches the specified stream ID.
*
* @param streamID the stream ID that identifies the incoming server session hosted by this JVM.
* @return the incoming server session hosted by this JVM or null if none was found.
*/
public LocalIncomingServerSession getIncomingServerSession(String streamID) {
return localSessionManager.getIncomingServerSession(streamID);
}
/** /**
* Returns the list of sessions that were originated by a remote server. The list will be * Returns the list of sessions that were originated by a remote server. The list will be
* ordered chronologically. IncomingServerSession can only receive packets from the remote * ordered chronologically. IncomingServerSession can only receive packets from the remote
...@@ -717,7 +728,7 @@ public class SessionManager extends BasicModule { ...@@ -717,7 +728,7 @@ public class SessionManager extends BasicModule {
List<IncomingServerSession> sessions = new ArrayList<IncomingServerSession>(); List<IncomingServerSession> sessions = new ArrayList<IncomingServerSession>();
for (String streamID : streamIDs) { for (String streamID : streamIDs) {
// Search in local hosted sessions // Search in local hosted sessions
IncomingServerSession session = localSessionManager.getIncomingServerSessions(streamID); IncomingServerSession session = localSessionManager.getIncomingServerSession(streamID);
RemoteSessionLocator locator = server.getRemoteSessionLocator(); RemoteSessionLocator locator = server.getRemoteSessionLocator();
if (session == null && locator != null) { if (session == null && locator != null) {
// Get the node hosting this session // Get the node hosting this session
...@@ -900,9 +911,32 @@ public class SessionManager extends BasicModule { ...@@ -900,9 +911,32 @@ public class SessionManager extends BasicModule {
return false; return false;
} }
// Remove route to the removed session (anonymous or not)
boolean anonymous = session.getAuthToken().isAnonymous(); boolean anonymous = session.getAuthToken().isAnonymous();
boolean removed = routingTable.removeClientRoute(session.getAddress()); return removeSession(session, session.getAddress(), anonymous, false);
}
/**
* Removes a session.
*
* @param session the session or null when session is derived from fullJID.
* @param fullJID the address of the session.
* @param anonymous true if the authenticated user is anonymous.
* @param forceUnavailable true if an unavailable presence must be created and routed.
* @return true if the requested session was successfully removed.
*/
public boolean removeSession(ClientSession session, JID fullJID, boolean anonymous, boolean forceUnavailable) {
// Do nothing if server is shutting down. Note: When the server
// is shutting down the serverName will be null.
if (serverName == null) {
return false;
}
if (session == null) {
session = getSession(fullJID);
}
// Remove route to the removed session (anonymous or not)
boolean removed = routingTable.removeClientRoute(fullJID);
if (removed) { if (removed) {
// Fire session event. // Fire session event.
...@@ -918,12 +952,11 @@ public class SessionManager extends BasicModule { ...@@ -918,12 +952,11 @@ public class SessionManager extends BasicModule {
// Remove the session from the pre-Authenticated sessions list (if present) // Remove the session from the pre-Authenticated sessions list (if present)
boolean preauth_removed = boolean preauth_removed =
localSessionManager.getPreAuthenticatedSessions().remove(session.getAddress().getResource()) != null; localSessionManager.getPreAuthenticatedSessions().remove(fullJID.getResource()) != null;
// If the user is still available then send an unavailable presence // If the user is still available then send an unavailable presence
Presence presence = session.getPresence(); if (forceUnavailable || session.getPresence().isAvailable()) {
if (presence.isAvailable()) {
Presence offline = new Presence(); Presence offline = new Presence();
offline.setFrom(session.getAddress()); offline.setFrom(fullJID);
offline.setTo(new JID(null, serverName, null, true)); offline.setTo(new JID(null, serverName, null, true));
offline.setType(Presence.Type.unavailable); offline.setType(Presence.Type.unavailable);
router.route(offline); router.route(offline);
...@@ -1042,6 +1075,7 @@ public class SessionManager extends BasicModule { ...@@ -1042,6 +1075,7 @@ public class SessionManager extends BasicModule {
for (String hostname : session.getValidatedDomains()) { for (String hostname : session.getValidatedDomains()) {
unregisterIncomingServerSession(hostname, session); unregisterIncomingServerSession(hostname, session);
} }
LocalIncomingServerSession.releaseValidatedDomains(session.getStreamID().getID());
} }
} }
......
...@@ -17,7 +17,7 @@ import org.jivesoftware.openfire.container.BasicModule; ...@@ -17,7 +17,7 @@ import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.roster.Roster; import org.jivesoftware.openfire.roster.Roster;
import org.jivesoftware.openfire.roster.RosterItem; import org.jivesoftware.openfire.roster.RosterItem;
import org.jivesoftware.openfire.roster.RosterManager; import org.jivesoftware.openfire.roster.RosterManager;
import org.jivesoftware.openfire.session.LocalClientSession; import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.LocalSession; import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.user.UserManager; import org.jivesoftware.openfire.user.UserManager;
...@@ -90,10 +90,10 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -90,10 +90,10 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
} }
public void process(Packet packet) throws UnauthorizedException, PacketException { public void process(Packet packet) throws UnauthorizedException, PacketException {
process((Presence) packet, (LocalClientSession) sessionManager.getSession(packet.getFrom())); process((Presence) packet, sessionManager.getSession(packet.getFrom()));
} }
private void process(Presence presence, LocalClientSession session) throws UnauthorizedException, PacketException { private void process(Presence presence, ClientSession session) throws UnauthorizedException, PacketException {
try { try {
Presence.Type type = presence.getType(); Presence.Type type = presence.getType();
// Available // Available
...@@ -150,6 +150,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -150,6 +150,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
* Handle presence updates that affect roster subscriptions. * Handle presence updates that affect roster subscriptions.
* *
* @param presence The presence presence to handle * @param presence The presence presence to handle
* @throws PacketException if the packet is null or the packet could not be routed.
*/ */
public void process(Presence presence) throws PacketException { public void process(Presence presence) throws PacketException {
try { try {
...@@ -188,7 +189,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -188,7 +189,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
* @param session The session being updated * @param session The session being updated
* @throws UserNotFoundException If the user being updated does not exist * @throws UserNotFoundException If the user being updated does not exist
*/ */
private void initSession(LocalClientSession session) throws UserNotFoundException { private void initSession(ClientSession session) throws UserNotFoundException {
// Only user sessions need to be authenticated // Only user sessions need to be authenticated
if (userManager.isRegisteredUser(session.getAddress().getNode())) { if (userManager.isRegisteredUser(session.getAddress().getNode())) {
...@@ -276,33 +277,6 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -276,33 +277,6 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
Log.warn("Presence requested from server " Log.warn("Presence requested from server "
+ localServer.getServerInfo().getName() + localServer.getServerInfo().getName()
+ " by unknown user: " + update.getFrom()); + " by unknown user: " + update.getFrom());
/*
Connection con = null;
PreparedStatement pstmt = null;
try {
pstmt = con.prepareStatement(GET_ROSTER_SUBS);
pstmt.setString(1, update.getSender().toBareString().toLowerCase());
ResultSet rs = pstmt.executeQuery();
while (rs.next()){
long userID = rs.getLong(1);
try {
User user = server.getUserManager().getUser(userID);
update.setRecipient(user.getAddress());
server.getSessionManager().userBroadcast(user.getUsername(),
update.getPacket());
} catch (UserNotFoundException e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"),e);
} catch (UnauthorizedException e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"),e);
}
}
}
catch (SQLException e) {
Log.error(LocaleUtils.getLocalizedString("admin.error"),e);
}
*/
} }
} }
......
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
package org.jivesoftware.openfire.muc; package org.jivesoftware.openfire.muc;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.user.UserNotFoundException; import org.jivesoftware.openfire.user.UserNotFoundException;
import org.xmpp.component.Component; import org.xmpp.component.Component;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
...@@ -258,7 +257,7 @@ public interface MultiUserChatServer extends Component { ...@@ -258,7 +257,7 @@ public interface MultiUserChatServer extends Component {
* *
* @param msg The message to broadcast. * @param msg The message to broadcast.
*/ */
void serverBroadcast(String msg) throws UnauthorizedException; void serverBroadcast(String msg);
/** /**
* Returns the total chat time of all rooms combined. * Returns the total chat time of all rooms combined.
...@@ -297,11 +296,12 @@ public interface MultiUserChatServer extends Component { ...@@ -297,11 +296,12 @@ public interface MultiUserChatServer extends Component {
* the disco#items list. Moreover, service discovery features will be disabled. * the disco#items list. Moreover, service discovery features will be disabled.
* *
* @param enabled true if the service is enabled. * @param enabled true if the service is enabled.
* @param persistent true if the new setting will persist accorss restarts.
*/ */
void enableService(boolean enabled); void enableService(boolean enabled, boolean persistent);
/** /**
* Returns true if the MUC service is available. Use {@link #enableService(boolean)} to * Returns true if the MUC service is available. Use {@link #enableService(boolean, boolean)} to
* enable or disable the service. * enable or disable the service.
* *
* @return true if the MUC service is available. * @return true if the MUC service is available.
......
...@@ -15,6 +15,8 @@ import org.dom4j.DocumentHelper; ...@@ -15,6 +15,8 @@ import org.dom4j.DocumentHelper;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.openfire.*; import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.cluster.ClusterEventListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.disco.DiscoInfoProvider; import org.jivesoftware.openfire.disco.DiscoInfoProvider;
import org.jivesoftware.openfire.disco.DiscoItemsProvider; import org.jivesoftware.openfire.disco.DiscoItemsProvider;
...@@ -57,7 +59,7 @@ import java.util.concurrent.atomic.AtomicLong; ...@@ -57,7 +59,7 @@ import java.util.concurrent.atomic.AtomicLong;
* @author Gaston Dombiak * @author Gaston Dombiak
*/ */
public class MultiUserChatServerImpl extends BasicModule implements MultiUserChatServer, public class MultiUserChatServerImpl extends BasicModule implements MultiUserChatServer,
ServerItemsProvider, DiscoInfoProvider, DiscoItemsProvider, RoutableChannelHandler { ServerItemsProvider, DiscoInfoProvider, DiscoItemsProvider, RoutableChannelHandler, ClusterEventListener {
private static final FastDateFormat dateFormatter = FastDateFormat private static final FastDateFormat dateFormatter = FastDateFormat
.getInstance(JiveConstants.XMPP_DELAY_DATETIME_FORMAT, TimeZone.getTimeZone("UTC")); .getInstance(JiveConstants.XMPP_DELAY_DATETIME_FORMAT, TimeZone.getTimeZone("UTC"));
...@@ -533,7 +535,7 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha ...@@ -533,7 +535,7 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha
return user; return user;
} }
public void serverBroadcast(String msg) throws UnauthorizedException { public void serverBroadcast(String msg) {
for (MUCRoom room : rooms.values()) { for (MUCRoom room : rooms.values()) {
room.serverBroadcast(msg); room.serverBroadcast(msg);
} }
...@@ -796,6 +798,8 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha ...@@ -796,6 +798,8 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha
router = server.getPacketRouter(); router = server.getPacketRouter();
// Configure the handler of iq:register packets // Configure the handler of iq:register packets
registerHandler = new IQMUCRegisterHandler(this); registerHandler = new IQMUCRegisterHandler(this);
// Listen to cluster events
ClusterManager.addListener(this);
} }
public void start() { public void start() {
...@@ -834,7 +838,7 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha ...@@ -834,7 +838,7 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha
} }
public void enableService(boolean enabled) { public void enableService(boolean enabled, boolean persistent) {
if (isServiceEnabled() == enabled) { if (isServiceEnabled() == enabled) {
// Do nothing if the service status has not changed // Do nothing if the service status has not changed
return; return;
...@@ -846,7 +850,9 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha ...@@ -846,7 +850,9 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha
// Stop the service/module // Stop the service/module
stop(); stop();
} }
JiveGlobals.setProperty("xmpp.muc.enabled", Boolean.toString(enabled)); if (persistent) {
JiveGlobals.setProperty("xmpp.muc.enabled", Boolean.toString(enabled));
}
serviceEnabled = enabled; serviceEnabled = enabled;
if (enabled) { if (enabled) {
// Start the service/module // Start the service/module
...@@ -917,6 +923,21 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha ...@@ -917,6 +923,21 @@ public class MultiUserChatServerImpl extends BasicModule implements MultiUserCha
outMessages.addAndGet(numOccupants); outMessages.addAndGet(numOccupants);
} }
public void joinedCluster() {
// Disable the service until we know that we are the senior cluster member
enableService(false, false);
}
public void leftCluster() {
// Offer the service when not running in a cluster
enableService(true, false);
}
public void markedAsSeniorClusterMember() {
// Offer the service since we are the senior cluster member
enableService(true, false);
}
public Iterator<DiscoServerItem> getItems() { public Iterator<DiscoServerItem> getItems() {
// Check if the service is disabled. Info is not available when disabled. // Check if the service is disabled. Info is not available when disabled.
if (!isServiceEnabled()) { if (!isServiceEnabled()) {
......
...@@ -17,6 +17,8 @@ import org.jivesoftware.openfire.PacketRouter; ...@@ -17,6 +17,8 @@ import org.jivesoftware.openfire.PacketRouter;
import org.jivesoftware.openfire.RoutableChannelHandler; import org.jivesoftware.openfire.RoutableChannelHandler;
import org.jivesoftware.openfire.RoutingTable; import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterEventListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.component.InternalComponentManager; import org.jivesoftware.openfire.component.InternalComponentManager;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.disco.DiscoInfoProvider; import org.jivesoftware.openfire.disco.DiscoInfoProvider;
...@@ -45,7 +47,7 @@ import java.util.concurrent.CopyOnWriteArrayList; ...@@ -45,7 +47,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
* @author Matt Tucker * @author Matt Tucker
*/ */
public class PubSubModule extends BasicModule implements ServerItemsProvider, DiscoInfoProvider, public class PubSubModule extends BasicModule implements ServerItemsProvider, DiscoInfoProvider,
DiscoItemsProvider, RoutableChannelHandler, PubSubService { DiscoItemsProvider, RoutableChannelHandler, PubSubService, ClusterEventListener {
/** /**
* the chat service's hostname * the chat service's hostname
...@@ -108,6 +110,11 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -108,6 +110,11 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
*/ */
private PubSubEngine engine = null; private PubSubEngine engine = null;
/**
* Flag that indicates if the service is enabled.
*/
private boolean serviceEnabled = true;
public PubSubModule() { public PubSubModule() {
super("Publish Subscribe Service"); super("Publish Subscribe Service");
} }
...@@ -386,6 +393,8 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -386,6 +393,8 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
else { else {
rootCollectionNode = (CollectionNode) getNode(rootNodeID); rootCollectionNode = (CollectionNode) getNode(rootNodeID);
} }
// Listen to cluster events
ClusterManager.addListener(this);
} }
public void start() { public void start() {
...@@ -409,6 +418,42 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di ...@@ -409,6 +418,42 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
engine.shutdown(); engine.shutdown();
} }
private void enableService(boolean enabled) {
if (serviceEnabled == enabled) {
// Do nothing if the service status has not changed
return;
}
XMPPServer server = XMPPServer.getInstance();
if (!enabled) {
// Disable disco information
server.getIQDiscoItemsHandler().removeServerItemsProvider(this);
// Stop the service/module
stop();
}
serviceEnabled = enabled;
if (enabled) {
// Start the service/module
start();
// Enable disco information
server.getIQDiscoItemsHandler().addServerItemsProvider(this);
}
}
public void joinedCluster() {
// Disable the service until we know that we are the senior cluster member
enableService(false);
}
public void leftCluster() {
// Offer the service when not running in a cluster
enableService(true);
}
public void markedAsSeniorClusterMember() {
// Offer the service since we are the senior cluster member
enableService(true);
}
public Iterator<DiscoServerItem> getItems() { public Iterator<DiscoServerItem> getItems() {
ArrayList<DiscoServerItem> items = new ArrayList<DiscoServerItem>(); ArrayList<DiscoServerItem> items = new ArrayList<DiscoServerItem>();
......
...@@ -64,6 +64,26 @@ public interface ClientSession extends Session { ...@@ -64,6 +64,26 @@ public interface ClientSession extends Session {
*/ */
public String getUsername() throws UserNotFoundException; public String getUsername() throws UserNotFoundException;
/**
* Flag indicating if this session has been initialized once coming
* online. Session initialization occurs after the session receives
* the first "available" presence update from the client. Initialization
* actions include pushing offline messages, presence subscription requests,
* and presence statuses to the client. Initialization occurs only once
* following the first available presence transition.
*
* @return True if the session has already been initializsed
*/
public boolean isInitialized();
/**
* Sets the initialization state of the session.
*
* @param isInit True if the session has been initialized
* @see #isInitialized
*/
public void setInitialized(boolean isInit);
/** /**
* Returns true if the offline messages of the user should be sent to the user when * Returns true if the offline messages of the user should be sent to the user when
* the user becomes online. If the user sent a disco request with node * the user becomes online. If the user sent a disco request with node
......
...@@ -11,7 +11,11 @@ ...@@ -11,7 +11,11 @@
package org.jivesoftware.openfire.session; package org.jivesoftware.openfire.session;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import java.util.Collection; import java.util.Collection;
import java.util.Set;
/** /**
* Server-to-server communication is done using two TCP connections between the servers. One * Server-to-server communication is done using two TCP connections between the servers. One
...@@ -36,6 +40,14 @@ import java.util.Collection; ...@@ -36,6 +40,14 @@ import java.util.Collection;
*/ */
public interface IncomingServerSession extends Session { public interface IncomingServerSession extends Session {
/**
* Cache (unlimited, never expire) that holds domains, subdomains and virtual
* hostnames of the remote server that were validated with this server for each
* incoming server session.
* Key: stream ID, Value: Domains and subdomains of the remote server that were
* validated with this server.
*/
static Cache<String, Set<String>> validatedDomainsCache = CacheFactory.createCache("Validated Domains");
/** /**
* Returns a collection with all the domains, subdomains and virtual hosts that where * Returns a collection with all the domains, subdomains and virtual hosts that where
* validated. The remote server is allowed to send packets from any of these domains, * validated. The remote server is allowed to send packets from any of these domains,
...@@ -55,5 +67,4 @@ public interface IncomingServerSession extends Session { ...@@ -55,5 +67,4 @@ public interface IncomingServerSession extends Session {
* when validating the session. * when validating the session.
*/ */
public String getLocalDomain(); public String getLocalDomain();
} }
...@@ -22,15 +22,18 @@ import org.jivesoftware.openfire.net.SocketConnection; ...@@ -22,15 +22,18 @@ import org.jivesoftware.openfire.net.SocketConnection;
import org.jivesoftware.openfire.server.ServerDialback; import org.jivesoftware.openfire.server.ServerDialback;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.util.lock.LockManager;
import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserException;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import org.xmpp.packet.StreamError; import org.xmpp.packet.StreamError;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.Lock;
/** /**
* Server-to-server communication is done using two TCP connections between the servers. One * Server-to-server communication is done using two TCP connections between the servers. One
...@@ -54,12 +57,6 @@ import java.util.Collections; ...@@ -54,12 +57,6 @@ import java.util.Collections;
* @author Gaston Dombiak * @author Gaston Dombiak
*/ */
public class LocalIncomingServerSession extends LocalSession implements IncomingServerSession { public class LocalIncomingServerSession extends LocalSession implements IncomingServerSession {
/**
* List of domains, subdomains and virtual hostnames of the remote server that were
* validated with this server. The remote server is allowed to send packets to this
* server from any of the validated domains.
*/
private Collection<String> validatedDomains = new ArrayList<String>();
/** /**
* Domains or subdomain of this server that was used by the remote server * Domains or subdomain of this server that was used by the remote server
...@@ -196,6 +193,22 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming ...@@ -196,6 +193,22 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming
return session; return session;
} }
/**
* Removes the list of validate domains for the specified session. This is usually required
* when an incoming server session is closed.
*
* @param streamID the streamID that identifies the incoming server session.
*/
public static void releaseValidatedDomains(String streamID) {
Lock lock = LockManager.getLock(streamID);
try {
lock.lock();
validatedDomainsCache.remove(streamID);
} finally {
lock.unlock();
}
}
public LocalIncomingServerSession(String serverName, Connection connection, StreamID streamID) { public LocalIncomingServerSession(String serverName, Connection connection, StreamID streamID) {
super(serverName, connection, streamID); super(serverName, connection, streamID);
} }
...@@ -260,7 +273,18 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming ...@@ -260,7 +273,18 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming
* @return domains, subdomains and virtual hosts that where validated. * @return domains, subdomains and virtual hosts that where validated.
*/ */
public Collection<String> getValidatedDomains() { public Collection<String> getValidatedDomains() {
return Collections.unmodifiableCollection(validatedDomains); String streamID = getStreamID().getID();
Lock lock = LockManager.getLock(streamID);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
if (validatedDomains == null) {
return Collections.emptyList();
}
return Collections.unmodifiableCollection(validatedDomains);
} finally {
lock.unlock();
}
} }
/** /**
...@@ -270,7 +294,24 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming ...@@ -270,7 +294,24 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming
* @param domain the new validated domain, subdomain or virtual host to add. * @param domain the new validated domain, subdomain or virtual host to add.
*/ */
public void addValidatedDomain(String domain) { public void addValidatedDomain(String domain) {
if (validatedDomains.add(domain)) { boolean added;
String streamID = getStreamID().getID();
Lock lock = LockManager.getLock(streamID);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
if (validatedDomains == null) {
validatedDomains = new HashSet<String>();
}
added = validatedDomains.add(domain);
if (added) {
validatedDomainsCache.put(streamID, validatedDomains);
}
} finally {
lock.unlock();
}
if (added) {
// Register the new validated domain for this server session in SessionManager // Register the new validated domain for this server session in SessionManager
SessionManager.getInstance().registerIncomingServerSession(domain, this); SessionManager.getInstance().registerIncomingServerSession(domain, this);
} }
...@@ -285,7 +326,25 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming ...@@ -285,7 +326,25 @@ public class LocalIncomingServerSession extends LocalSession implements Incoming
* validated domains. * validated domains.
*/ */
public void removeValidatedDomain(String domain) { public void removeValidatedDomain(String domain) {
validatedDomains.remove(domain); String streamID = getStreamID().getID();
Lock lock = LockManager.getLock(streamID);
try {
lock.lock();
Set<String> validatedDomains = validatedDomainsCache.get(streamID);
if (validatedDomains == null) {
validatedDomains = new HashSet<String>();
}
validatedDomains.remove(domain);
if (!validatedDomains.isEmpty()) {
validatedDomainsCache.put(streamID, validatedDomains);
}
else {
validatedDomainsCache.remove(streamID);
}
} finally {
lock.unlock();
}
// Unregister the validated domain for this server session in SessionManager // Unregister the validated domain for this server session in SessionManager
SessionManager.getInstance().unregisterIncomingServerSessions(domain); SessionManager.getInstance().unregisterIncomingServerSessions(domain);
} }
......
...@@ -66,6 +66,13 @@ public interface Session extends RoutableChannelHandler { ...@@ -66,6 +66,13 @@ public interface Session extends RoutableChannelHandler {
*/ */
public StreamID getStreamID(); public StreamID getStreamID();
/**
* Obtain the name of the server this session belongs to.
*
* @return the server name.
*/
public String getServerName();
/** /**
* Obtain the date the session was created. * Obtain the date the session was created.
* *
......
...@@ -50,8 +50,8 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable { ...@@ -50,8 +50,8 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
*/ */
private Cache<String, byte[]> serversCache; private Cache<String, byte[]> serversCache;
/** /**
* Cache (unlimited, never expire) that holds sessions of external components connected to the server. * Cache (unlimited, never expire) that holds components connected to the server.
* Key: component domain, Value: nodeID * Key: component domain, Value: list of nodeIDs hosting the component
*/ */
private Cache<String, Set<byte[]>> componentsCache; private Cache<String, Set<byte[]>> componentsCache;
/** /**
...@@ -638,6 +638,10 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable { ...@@ -638,6 +638,10 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
this.remotePacketRouter = remotePacketRouter; this.remotePacketRouter = remotePacketRouter;
} }
public RemotePacketRouter getRemotePacketRouter() {
return remotePacketRouter;
}
public void initialize(XMPPServer server) { public void initialize(XMPPServer server) {
super.initialize(server); super.initialize(server);
this.server = server; this.server = server;
......
...@@ -9,6 +9,7 @@ package org.jivesoftware.util.cache; ...@@ -9,6 +9,7 @@ package org.jivesoftware.util.cache;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.XMPPServerListener; import org.jivesoftware.openfire.XMPPServerListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.container.Plugin; import org.jivesoftware.openfire.container.Plugin;
import org.jivesoftware.openfire.container.PluginClassLoader; import org.jivesoftware.openfire.container.PluginClassLoader;
import org.jivesoftware.openfire.container.PluginManager; import org.jivesoftware.openfire.container.PluginManager;
...@@ -33,7 +34,6 @@ import java.util.concurrent.CopyOnWriteArrayList; ...@@ -33,7 +34,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
*/ */
public class CacheFactory { public class CacheFactory {
public static String CLUSTER_PROPERTY_NAME = "cache.clustering.enabled";
public static String LOCAL_CACHE_PROPERTY_NAME = "cache.clustering.local.class"; public static String LOCAL_CACHE_PROPERTY_NAME = "cache.clustering.local.class";
public static String CLUSTERED_CACHE_PROPERTY_NAME = "cache.clustering.clustered.class"; public static String CLUSTERED_CACHE_PROPERTY_NAME = "cache.clustering.clustered.class";
...@@ -121,15 +121,6 @@ public class CacheFactory { ...@@ -121,15 +121,6 @@ public class CacheFactory {
return clusteringStarted; return clusteringStarted;
} }
/**
* Returns true if this instance is configured to run in a cluster.
* @return true if this instance is configured to run in a cluster.
*/
public static boolean isClusteringConfigured() {
return JiveGlobals.getXMLProperty(CLUSTER_PROPERTY_NAME, false);
}
/** /**
* Returns a byte[] that uniquely identifies this member within the cluster or <tt>null</tt> * Returns a byte[] that uniquely identifies this member within the cluster or <tt>null</tt>
* when not in a cluster. * when not in a cluster.
...@@ -151,7 +142,7 @@ public class CacheFactory { ...@@ -151,7 +142,7 @@ public class CacheFactory {
if (enabled == clusteringStarted) { if (enabled == clusteringStarted) {
return; return;
} }
JiveGlobals.setXMLProperty(CLUSTER_PROPERTY_NAME, String.valueOf(enabled)); ClusterManager.setClusteringEnabled(enabled);
if (!enabled) { if (!enabled) {
stopClustering(); stopClustering();
} }
...@@ -212,19 +203,19 @@ public class CacheFactory { ...@@ -212,19 +203,19 @@ public class CacheFactory {
* *
* @param task the task to be invoked on the specified cluster member. * @param task the task to be invoked on the specified cluster member.
* @param nodeID the byte array that identifies the target cluster member. * @param nodeID the byte array that identifies the target cluster member.
* @return false if not in a cluster or specified cluster node was not found. * @throws IllegalStateException if requested node was not found or not running in a cluster.
*/ */
public static boolean doClusterTask(final ClusterTask task, byte[] nodeID) { public static void doClusterTask(final ClusterTask task, byte[] nodeID) {
if (!clusteringStarted) { if (!clusteringStarted) {
return false; throw new IllegalStateException("Cluster service is not available");
} }
synchronized(CacheFactory.class) { synchronized(CacheFactory.class) {
if (!clusteringStarted) { if (!clusteringStarted) {
return false; throw new IllegalStateException("Cluster service is not available");
} }
} }
return cacheFactoryStrategy.doClusterTask(task, nodeID); cacheFactoryStrategy.doClusterTask(task, nodeID);
} }
/** /**
...@@ -254,11 +245,12 @@ public class CacheFactory { ...@@ -254,11 +245,12 @@ public class CacheFactory {
* @param task the ClusterTask object to be invoked on a given cluster member. * @param task the ClusterTask object to be invoked on a given cluster member.
* @param nodeID the byte array that identifies the target cluster member. * @param nodeID the byte array that identifies the target cluster member.
* @return result of remote operation or null if operation failed or operation returned null. * @return result of remote operation or null if operation failed or operation returned null.
* @throws IllegalStateException if requested node was not found or not running in a cluster.
*/ */
public static Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) { public static Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) {
synchronized(CacheFactory.class) { synchronized(CacheFactory.class) {
if (!clusteringStarted) { if (!clusteringStarted) {
return null; throw new IllegalStateException("Cluster service is not available");
} }
} }
...@@ -280,7 +272,7 @@ public class CacheFactory { ...@@ -280,7 +272,7 @@ public class CacheFactory {
return; return;
} }
// See if clustering should be enabled. // See if clustering should be enabled.
boolean enabled = JiveGlobals.getXMLProperty(CLUSTER_PROPERTY_NAME, false); boolean enabled = ClusterManager.isClusteringEnabled();
if (enabled) { if (enabled) {
Log.debug("Shutting down clustered cache service."); Log.debug("Shutting down clustered cache service.");
...@@ -337,7 +329,7 @@ public class CacheFactory { ...@@ -337,7 +329,7 @@ public class CacheFactory {
return; return;
} }
// See if clustering should be enabled. // See if clustering should be enabled.
boolean enabled = JiveGlobals.getXMLProperty(CLUSTER_PROPERTY_NAME, false); boolean enabled = ClusterManager.isClusteringEnabled();
// If the user tried to turn on clustering, make sure they're actually allowed to. // If the user tried to turn on clustering, make sure they're actually allowed to.
if (enabled) { if (enabled) {
......
...@@ -99,6 +99,7 @@ public interface CacheFactoryStrategy { ...@@ -99,6 +99,7 @@ public interface CacheFactoryStrategy {
* @param task the ClusterTask object to be invoked on a given cluster member. * @param task the ClusterTask object to be invoked on a given cluster member.
* @param nodeID the byte array that identifies the target cluster member. * @param nodeID the byte array that identifies the target cluster member.
* @return result of remote operation or null if operation failed or operation returned null. * @return result of remote operation or null if operation failed or operation returned null.
* @throws IllegalStateException if requested node was not found.
*/ */
Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID); Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID);
......
...@@ -74,6 +74,7 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy { ...@@ -74,6 +74,7 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy {
cacheNames.put("Incoming Server Sessions", "incServerSessions"); cacheNames.put("Incoming Server Sessions", "incServerSessions");
cacheNames.put("Sessions by Hostname", "sessionsHostname"); cacheNames.put("Sessions by Hostname", "sessionsHostname");
cacheNames.put("Secret Keys Cache", "secretKeys"); cacheNames.put("Secret Keys Cache", "secretKeys");
cacheNames.put("Validated Domains", "validatedDomains");
cacheProps.put("cache.fileTransfer.size", 128 * 1024l); cacheProps.put("cache.fileTransfer.size", 128 * 1024l);
cacheProps.put("cache.fileTransfer.expirationTime", 1000 * 60 * 10l); cacheProps.put("cache.fileTransfer.expirationTime", 1000 * 60 * 10l);
...@@ -125,6 +126,8 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy { ...@@ -125,6 +126,8 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy {
cacheProps.put("cache.sessionsHostname.expirationTime", -1l); cacheProps.put("cache.sessionsHostname.expirationTime", -1l);
cacheProps.put("cache.secretKeys.size", -1l); cacheProps.put("cache.secretKeys.size", -1l);
cacheProps.put("cache.secretKeys.expirationTime", -1l); cacheProps.put("cache.secretKeys.expirationTime", -1l);
cacheProps.put("cache.validatedDomains.size", -1l);
cacheProps.put("cache.validatedDomains.expirationTime", -1l);
} }
...@@ -155,7 +158,7 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy { ...@@ -155,7 +158,7 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy {
} }
public boolean doClusterTask(ClusterTask task, byte[] nodeID) { public boolean doClusterTask(ClusterTask task, byte[] nodeID) {
return false; throw new IllegalStateException("Cluster service is not available");
} }
public Collection<Object> doSynchronousClusterTask(ClusterTask task, boolean includeLocalMember) { public Collection<Object> doSynchronousClusterTask(ClusterTask task, boolean includeLocalMember) {
...@@ -163,7 +166,7 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy { ...@@ -163,7 +166,7 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy {
} }
public Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) { public Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) {
return null; throw new IllegalStateException("Cluster service is not available");
} }
public void updateCacheStats(Map<String, Cache> caches) { public void updateCacheStats(Map<String, Cache> caches) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment