Commit 092b00b3 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

More clustering work with initial tests running fine.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@8412 b35dd754-fafc-0310-a699-88a17e54d16e
parent 7750f416
...@@ -18,6 +18,7 @@ import org.jivesoftware.openfire.roster.Roster; ...@@ -18,6 +18,7 @@ import org.jivesoftware.openfire.roster.Roster;
import org.jivesoftware.openfire.roster.RosterItem; import org.jivesoftware.openfire.roster.RosterItem;
import org.jivesoftware.openfire.roster.RosterManager; import org.jivesoftware.openfire.roster.RosterManager;
import org.jivesoftware.openfire.session.LocalClientSession; import org.jivesoftware.openfire.session.LocalClientSession;
import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.user.UserManager; import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.openfire.user.UserNotFoundException; import org.jivesoftware.openfire.user.UserNotFoundException;
...@@ -156,7 +157,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler ...@@ -156,7 +157,7 @@ public class PresenceUpdateHandler extends BasicModule implements ChannelHandler
} }
catch (UnauthorizedException e) { catch (UnauthorizedException e) {
try { try {
Session session = sessionManager.getSession(presence.getFrom()); LocalSession session = (LocalSession) sessionManager.getSession(presence.getFrom());
presence = presence.createCopy(); presence = presence.createCopy();
if (session != null) { if (session != null) {
presence.setFrom(new JID(null, session.getServerName(), null, true)); presence.setFrom(new JID(null, session.getServerName(), null, true));
......
...@@ -42,7 +42,7 @@ public class SocketPacketWriteHandler implements ChannelHandler { ...@@ -42,7 +42,7 @@ public class SocketPacketWriteHandler implements ChannelHandler {
routingTable.routePacket(recipient, packet); routingTable.routePacket(recipient, packet);
} }
// The target domain belongs to the local server // The target domain belongs to the local server
if (recipient == null || (recipient.getNode() == null && recipient.getResource() == null)) { else if (recipient == null || (recipient.getNode() == null && recipient.getResource() == null)) {
// no TO was found so send back the packet to the sender // no TO was found so send back the packet to the sender
routingTable.routePacket(packet.getFrom(), packet); routingTable.routePacket(packet.getFrom(), packet);
} }
......
...@@ -13,7 +13,6 @@ package org.jivesoftware.openfire.session; ...@@ -13,7 +13,6 @@ package org.jivesoftware.openfire.session;
import org.jivesoftware.openfire.privacy.PrivacyList; import org.jivesoftware.openfire.privacy.PrivacyList;
import org.jivesoftware.openfire.user.UserNotFoundException; import org.jivesoftware.openfire.user.UserNotFoundException;
import org.xmpp.packet.Packet;
import org.xmpp.packet.Presence; import org.xmpp.packet.Presence;
/** /**
...@@ -100,9 +99,8 @@ public interface ClientSession extends Session { ...@@ -100,9 +99,8 @@ public interface ClientSession extends Session {
* Set the presence of this session * Set the presence of this session
* *
* @param presence The presence for the session * @param presence The presence for the session
* @return The old priority of the session or null if not authenticated
*/ */
public Presence setPresence(Presence presence); public void setPresence(Presence presence);
/** /**
* Returns the number of conflicts detected on this session. * Returns the number of conflicts detected on this session.
...@@ -121,16 +119,4 @@ public interface ClientSession extends Session { ...@@ -121,16 +119,4 @@ public interface ClientSession extends Session {
* Increments the conflict by one. * Increments the conflict by one.
*/ */
public void incrementConflictCount(); public void incrementConflictCount();
/**
* Returns true if the specified packet must not be blocked based on the active or default
* privacy list rules. The active list will be tried first. If none was found then the
* default list is going to be used. If no default list was defined for this user then
* allow the packet to flow.
*
* @param packet the packet to analyze if it must be blocked.
* @return true if the specified packet must be blocked.
*/
public boolean canProcess(Packet packet);
} }
...@@ -624,9 +624,8 @@ public class LocalClientSession extends LocalSession implements ClientSession { ...@@ -624,9 +624,8 @@ public class LocalClientSession extends LocalSession implements ClientSession {
* Set the presence of this session * Set the presence of this session
* *
* @param presence The presence for the session * @param presence The presence for the session
* @return The old priority of the session or null if not authenticated
*/ */
public Presence setPresence(Presence presence) { public void setPresence(Presence presence) {
Presence oldPresence = this.presence; Presence oldPresence = this.presence;
this.presence = presence; this.presence = presence;
if (oldPresence.isAvailable() && !this.presence.isAvailable()) { if (oldPresence.isAvailable() && !this.presence.isAvailable()) {
...@@ -657,7 +656,6 @@ public class LocalClientSession extends LocalSession implements ClientSession { ...@@ -657,7 +656,6 @@ public class LocalClientSession extends LocalSession implements ClientSession {
// Notify listeners that the show or status value of the presence has changed // Notify listeners that the show or status value of the presence has changed
PresenceEventDispatcher.presenceChanged(this, presence); PresenceEventDispatcher.presenceChanged(this, presence);
} }
return oldPresence;
} }
/** /**
......
...@@ -51,16 +51,6 @@ public interface Session extends RoutableChannelHandler { ...@@ -51,16 +51,6 @@ public interface Session extends RoutableChannelHandler {
*/ */
public JID getAddress(); public JID getAddress();
/**
* Sets the new address of this session. The address is used by services like the core
* server packet router to determine if a packet should be sent to the handler.
* Handlers that are working on behalf of the server should use the generic server
* hostname address (e.g. server.com).
*
* @param address the new address of this session.
*/
public void setAddress(JID address);
/** /**
* Obtain the current status of this session. * Obtain the current status of this session.
* *
...@@ -76,13 +66,6 @@ public interface Session extends RoutableChannelHandler { ...@@ -76,13 +66,6 @@ public interface Session extends RoutableChannelHandler {
*/ */
public StreamID getStreamID(); public StreamID getStreamID();
/**
* Obtain the name of the server this session belongs to.
*
* @return the server name.
*/
public String getServerName();
/** /**
* Obtain the date the session was created. * Obtain the date the session was created.
* *
......
...@@ -192,7 +192,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable { ...@@ -192,7 +192,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
routed = false; routed = false;
} }
else { else {
if (clientRoute.getNodeID() == server.getNodeID()) { if (Arrays.equals(clientRoute.getNodeID(), server.getNodeID())) {
// This is a route to a local user hosted in this node // This is a route to a local user hosted in this node
try { try {
localRoutingTable.getRoute(jid.toString()).process(packet); localRoutingTable.getRoute(jid.toString()).process(packet);
...@@ -215,7 +215,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable { ...@@ -215,7 +215,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
// Packet sent to component hosted in this server // Packet sent to component hosted in this server
byte[] nodeID = componentsCache.get(jid.getDomain()); byte[] nodeID = componentsCache.get(jid.getDomain());
if (nodeID != null) { if (nodeID != null) {
if (nodeID == server.getNodeID()) { if (Arrays.equals(nodeID, server.getNodeID())) {
// This is a route to a local component hosted in this node // This is a route to a local component hosted in this node
try { try {
localRoutingTable.getRoute(jid.getDomain()).process(packet); localRoutingTable.getRoute(jid.getDomain()).process(packet);
...@@ -236,7 +236,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable { ...@@ -236,7 +236,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
// Packet sent to remote server // Packet sent to remote server
byte[] nodeID = serversCache.get(jid.getDomain()); byte[] nodeID = serversCache.get(jid.getDomain());
if (nodeID != null) { if (nodeID != null) {
if (nodeID == server.getNodeID()) { if (Arrays.equals(nodeID, server.getNodeID())) {
// This is a route to a remote server connected from this node // This is a route to a remote server connected from this node
try { try {
localRoutingTable.getRoute(jid.getDomain()).process(packet); localRoutingTable.getRoute(jid.getDomain()).process(packet);
...@@ -440,14 +440,14 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable { ...@@ -440,14 +440,14 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable {
// Add sessions of non-anonymous users hosted by other cluster nodes // Add sessions of non-anonymous users hosted by other cluster nodes
for (Map.Entry<String, ClientRoute> entry : usersCache.entrySet()) { for (Map.Entry<String, ClientRoute> entry : usersCache.entrySet()) {
ClientRoute route = entry.getValue(); ClientRoute route = entry.getValue();
if (route.getNodeID() != server.getNodeID()) { if (!Arrays.equals(route.getNodeID(), server.getNodeID())) {
sessions.add(locator.getClientSession(route.getNodeID(), new JID(entry.getKey()))); sessions.add(locator.getClientSession(route.getNodeID(), new JID(entry.getKey())));
} }
} }
// Add sessions of anonymous users hosted by other cluster nodes // Add sessions of anonymous users hosted by other cluster nodes
for (Map.Entry<String, ClientRoute> entry : anonymousUsersCache.entrySet()) { for (Map.Entry<String, ClientRoute> entry : anonymousUsersCache.entrySet()) {
ClientRoute route = entry.getValue(); ClientRoute route = entry.getValue();
if (route.getNodeID() != server.getNodeID()) { if (!Arrays.equals(route.getNodeID(), server.getNodeID())) {
sessions.add(locator.getClientSession(route.getNodeID(), new JID(entry.getKey()))); sessions.add(locator.getClientSession(route.getNodeID(), new JID(entry.getKey())));
} }
} }
......
...@@ -247,6 +247,23 @@ public class CacheFactory { ...@@ -247,6 +247,23 @@ public class CacheFactory {
return cacheFactoryStrategy.doSynchronousClusterTask(task, includeLocalMember); return cacheFactoryStrategy.doSynchronousClusterTask(task, includeLocalMember);
} }
/**
* Invokes a task on a given cluster member synchronously and returns the result of
* the remote operation. If clustering is not enabled, this method will return null.
*
* @param task the ClusterTask object to be invoked on a given cluster member.
* @param nodeID the byte array that identifies the target cluster member.
* @return result of remote operation or null if operation failed or operation returned null.
*/
public static Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) {
synchronized(CacheFactory.class) {
if (!clusteringEnabled) {
return null;
}
}
return cacheFactoryStrategy.doSynchronousClusterTask(task, nodeID);
}
/** /**
* Shuts down the clustering service. This method should be called when the Jive * Shuts down the clustering service. This method should be called when the Jive
......
...@@ -92,6 +92,16 @@ public interface CacheFactoryStrategy { ...@@ -92,6 +92,16 @@ public interface CacheFactoryStrategy {
*/ */
Collection<Object> doSynchronousClusterTask(ClusterTask task, boolean includeLocalMember); Collection<Object> doSynchronousClusterTask(ClusterTask task, boolean includeLocalMember);
/**
* Invokes a task on a given cluster member synchronously and returns the result of
* the remote operation. If clustering is not enabled, this method will return null.
*
* @param task the ClusterTask object to be invoked on a given cluster member.
* @param nodeID the byte array that identifies the target cluster member.
* @return result of remote operation or null if operation failed or operation returned null.
*/
Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID);
/** /**
* Updates the statistics of the specified caches and publishes them into * Updates the statistics of the specified caches and publishes them into
* a cache for statistics. The statistics cache is already known to the application * a cache for statistics. The statistics cache is already known to the application
......
...@@ -159,6 +159,10 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy { ...@@ -159,6 +159,10 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy {
return Collections.emptyList(); return Collections.emptyList();
} }
public Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) {
return null;
}
public void updateCacheStats(Map<String, Cache> caches) { public void updateCacheStats(Map<String, Cache> caches) {
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment