Commit 0db73760 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Service discovery is now cluster ready.

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@8605 b35dd754-fafc-0310-a699-88a17e54d16e
parent ee3aef75
......@@ -14,14 +14,20 @@ package org.jivesoftware.openfire.disco;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.QName;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.openfire.IQHandlerInfo;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterEventListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.cluster.NodeID;
import org.jivesoftware.openfire.forms.spi.XDataFormImpl;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.lock.LockManager;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.PacketError;
......@@ -29,6 +35,7 @@ import org.xmpp.packet.PacketError;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.Lock;
/**
* IQDiscoInfoHandler is responsible for handling disco#info requests. This class holds a map with
......@@ -50,12 +57,12 @@ import java.util.concurrent.CopyOnWriteArraySet;
*
* @author Gaston Dombiak
*/
public class IQDiscoInfoHandler extends IQHandler {
public class IQDiscoInfoHandler extends IQHandler implements ClusterEventListener {
private Map<String, DiscoInfoProvider> entities = new HashMap<String, DiscoInfoProvider>();
private Set<String> serverFeatures = new CopyOnWriteArraySet<String>();
private Map<String, DiscoInfoProvider> serverNodeProviders =
new ConcurrentHashMap<String, DiscoInfoProvider>();
private Set<String> localServerFeatures = new CopyOnWriteArraySet<String>();
private Cache<String, Set<NodeID>> serverFeatures;
private Map<String, DiscoInfoProvider> serverNodeProviders = new ConcurrentHashMap<String, DiscoInfoProvider>();
private IQHandlerInfo info;
private List<Element> anonymousUserIdentities = new ArrayList<Element>();
......@@ -65,7 +72,6 @@ public class IQDiscoInfoHandler extends IQHandler {
public IQDiscoInfoHandler() {
super("XMPP Disco Info Handler");
info = new IQHandlerInfo("query", "http://jabber.org/protocol/disco#info");
addServerFeature("http://jabber.org/protocol/disco#info");
// Initialize the user identity and features collections (optimization to avoid creating
// the same objects for each response)
Element userIdentity = DocumentHelper.createElement("identity");
......@@ -223,10 +229,23 @@ public class IQDiscoInfoHandler extends IQHandler {
* made against the server.
*
* @param namespace the namespace identifying the new server feature.
* @return true if the new feature was successfully added.
*/
public boolean addServerFeature(String namespace) {
return serverFeatures.add(namespace);
public void addServerFeature(String namespace) {
if (localServerFeatures.add(namespace)) {
Lock lock = LockManager.getLock(namespace);
try {
lock.lock();
Set<NodeID> nodeIDs = serverFeatures.get(namespace);
if (nodeIDs == null) {
nodeIDs = new HashSet<NodeID>();
}
nodeIDs.add(XMPPServer.getInstance().getNodeID());
serverFeatures.put(namespace, nodeIDs);
}
finally {
lock.unlock();
}
}
}
/**
......@@ -236,17 +255,101 @@ public class IQDiscoInfoHandler extends IQHandler {
* @param namespace the namespace of the feature to be removed.
*/
public void removeServerFeature(String namespace) {
serverFeatures.remove(namespace);
if (localServerFeatures.remove(namespace)) {
Lock lock = LockManager.getLock(namespace);
try {
lock.lock();
Set<NodeID> nodeIDs = serverFeatures.get(namespace);
if (nodeIDs != null) {
nodeIDs.remove(XMPPServer.getInstance().getNodeID());
if (nodeIDs.isEmpty()) {
serverFeatures.remove(namespace);
}
else {
serverFeatures.put(namespace, nodeIDs);
}
}
}
finally {
lock.unlock();
}
}
}
public void initialize(XMPPServer server) {
super.initialize(server);
serverFeatures = CacheFactory.createCache("Disco Server Features");
addServerFeature("http://jabber.org/protocol/disco#info");
// Track the implementors of ServerFeaturesProvider so that we can collect the features
// provided by the server
for (ServerFeaturesProvider provider : server.getServerFeaturesProviders()) {
addServerFeaturesProvider(provider);
}
setProvider(server.getServerInfo().getName(), getServerInfoProvider());
// Listen to cluster events
ClusterManager.addListener(this);
}
public void joinedCluster() {
restoreCacheContent();
}
public void joinedCluster(byte[] nodeID) {
// Do nothing
}
public void leftCluster() {
if (!XMPPServer.getInstance().isShuttingDown()) {
restoreCacheContent();
}
}
public void leftCluster(byte[] nodeID) {
if (ClusterManager.isSeniorClusterMember()) {
NodeID leftNode = new NodeID(nodeID);
// Remove server features added by node that is gone
for (Map.Entry<String, Set<NodeID>> entry : serverFeatures.entrySet()) {
String namespace = entry.getKey();
Lock lock = LockManager.getLock(namespace);
try {
lock.lock();
Set<NodeID> nodeIDs = entry.getValue();
if (nodeIDs.remove(leftNode)) {
if (nodeIDs.isEmpty()) {
serverFeatures.remove(namespace);
}
else {
serverFeatures.put(namespace, nodeIDs);
}
}
}
finally {
lock.unlock();
}
}
}
}
public void markedAsSeniorClusterMember() {
// Do nothing
}
private void restoreCacheContent() {
for (String feature : localServerFeatures) {
Lock lock = LockManager.getLock(feature);
try {
lock.lock();
Set<NodeID> nodeIDs = serverFeatures.get(feature);
if (nodeIDs == null) {
nodeIDs = new HashSet<NodeID>();
}
nodeIDs.add(XMPPServer.getInstance().getNodeID());
serverFeatures.put(feature, nodeIDs);
}
finally {
lock.unlock();
}
}
}
/**
......@@ -257,7 +360,7 @@ public class IQDiscoInfoHandler extends IQHandler {
* @return the DiscoInfoProvider responsible for providing information at the server level.
*/
private DiscoInfoProvider getServerInfoProvider() {
DiscoInfoProvider discoInfoProvider = new DiscoInfoProvider() {
return new DiscoInfoProvider() {
final ArrayList<Element> identities = new ArrayList<Element>();
public Iterator<Element> getIdentities(String name, String node, JID senderJID) {
......@@ -300,7 +403,7 @@ public class IQDiscoInfoHandler extends IQHandler {
}
if (name == null) {
// Answer features of the server
return serverFeatures.iterator();
return new HashSet<String>(serverFeatures.keySet()).iterator();
}
else {
// Answer features of the user
......@@ -336,6 +439,5 @@ public class IQDiscoInfoHandler extends IQHandler {
return null;
}
};
return discoInfoProvider;
}
}
\ No newline at end of file
......@@ -14,21 +14,34 @@ package org.jivesoftware.openfire.disco;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.QName;
import org.dom4j.tree.DefaultElement;
import org.jivesoftware.openfire.IQHandlerInfo;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterEventListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.cluster.NodeID;
import org.jivesoftware.openfire.handler.IQHandler;
import org.jivesoftware.openfire.roster.RosterItem;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.user.User;
import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.jivesoftware.util.lock.LockManager;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.PacketError;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
/**
* IQDiscoItemsHandler is responsible for handling disco#items requests. This class holds a map with
......@@ -52,10 +65,11 @@ import java.util.concurrent.ConcurrentHashMap;
*
* @author Gaston Dombiak
*/
public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProvider {
public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProvider, ClusterEventListener {
private Map<String,DiscoItemsProvider> entities = new HashMap<String,DiscoItemsProvider>();
private List<Element> serverItems = new ArrayList<Element>();
private Map<String, Element> localServerItems = new HashMap<String, Element>();
private Cache<String, ClusteredServerItem> serverItems;
private Map<String, DiscoItemsProvider> serverNodeProviders = new ConcurrentHashMap<String, DiscoItemsProvider>();
private IQHandlerInfo info;
private IQDiscoInfoHandler infoHandler;
......@@ -255,18 +269,31 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
* @param node the node that complements the jid address.
* @param name the discovered name of the component.
*/
public synchronized void addComponentItem(String jid, String node, String name) {
// A component may send his disco#info many times and we only want to have one item
// for the component so remove any element under the requested jid
removeComponentItem(jid);
// Create a new element based on the provided DiscoItem
Element element = DocumentHelper.createElement("item");
element.addAttribute("jid", jid);
element.addAttribute("node", node);
element.addAttribute("name", name);
// Add the element to the list of items related to the server
serverItems.add(element);
public void addComponentItem(String jid, String node, String name) {
Lock lock = LockManager.getLock(jid + "item");
try {
lock.lock();
ClusteredServerItem item = serverItems.get(jid);
if (item == null) {
// First time a node registers a server item for this component
item = new ClusteredServerItem();
Element element = DocumentHelper.createElement("item");
element.addAttribute("jid", jid);
element.addAttribute("node", node);
element.addAttribute("name", name);
item.element = element;
}
if (item.nodes.add(XMPPServer.getInstance().getNodeID())) {
// Update the cache with latest info
serverItems.put(jid, item);
}
// Keep track of the new server item added by this JVM
localServerItems.put(jid, item.element);
}
finally {
lock.unlock();
}
}
/**
......@@ -274,20 +301,37 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
*
* @param jid the jid of the component being removed.
*/
public synchronized void removeComponentItem(String jid) {
for (Iterator<Element> it = serverItems.iterator(); it.hasNext();) {
if (jid.equals(it.next().attributeValue("jid"))) {
it.remove();
public void removeComponentItem(String jid) {
Lock lock = LockManager.getLock(jid + "item");
try {
lock.lock();
ClusteredServerItem item = serverItems.get(jid);
if (item != null && item.nodes.remove(XMPPServer.getInstance().getNodeID())) {
// Update the cache with latest info
if (item.nodes.isEmpty()) {
serverItems.remove(jid);
}
else {
serverItems.put(jid, item);
}
}
}
finally {
lock.unlock();
}
// Remove locally added server item
localServerItems.remove(jid);
}
public void initialize(XMPPServer server) {
super.initialize(server);
serverItems = CacheFactory.createCache("Disco Server Items");
// Track the implementors of ServerItemsProvider so that we can collect the items
// provided by the server
infoHandler = server.getIQDiscoInfoHandler();
setProvider(server.getServerInfo().getName(), getServerItemsProvider());
// Listen to cluster events
ClusterManager.addListener(this);
}
public void start() throws IllegalStateException {
......@@ -305,6 +349,73 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
return features.iterator();
}
public void joinedCluster() {
restoreCacheContent();
}
public void joinedCluster(byte[] nodeID) {
// Do nothing
}
public void leftCluster() {
if (!XMPPServer.getInstance().isShuttingDown()) {
restoreCacheContent();
}
}
public void leftCluster(byte[] nodeID) {
if (ClusterManager.isSeniorClusterMember()) {
NodeID leftNode = new NodeID(nodeID);
for (Map.Entry<String, ClusteredServerItem> entry : serverItems.entrySet()) {
String jid = entry.getKey();
Lock lock = LockManager.getLock(jid + "item");
try {
lock.lock();
ClusteredServerItem item = entry.getValue();
if (item.nodes.remove(leftNode)) {
// Update the cache with latest info
if (item.nodes.isEmpty()) {
serverItems.remove(jid);
}
else {
serverItems.put(jid, item);
}
}
}
finally {
lock.unlock();
}
}
}
}
public void markedAsSeniorClusterMember() {
// Do nothing
}
private void restoreCacheContent() {
for (Map.Entry<String, Element> entry : localServerItems.entrySet()) {
String jid = entry.getKey();
Element element = entry.getValue();
Lock lock = LockManager.getLock(jid + "item");
try {
lock.lock();
ClusteredServerItem item = serverItems.get(jid);
if (item == null) {
// First time a node registers a server item for this component
item = new ClusteredServerItem();
item.element = element;
}
if (item.nodes.add(XMPPServer.getInstance().getNodeID())) {
// Update the cache with latest info
serverItems.put(jid, item);
}
}
finally {
lock.unlock();
}
}
}
private DiscoItemsProvider getServerItemsProvider() {
return new DiscoItemsProvider() {
public Iterator<Element> getItems(String name, String node, JID senderJID) {
......@@ -316,7 +427,11 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
return null;
}
if (name == null) {
return serverItems.iterator();
List<Element> answer = new ArrayList<Element>();
for (ClusteredServerItem item : serverItems.values()) {
answer.add(item.element);
}
return answer.iterator();
}
else {
List<Element> answer = new ArrayList<Element>();
......@@ -342,4 +457,22 @@ public class IQDiscoItemsHandler extends IQHandler implements ServerFeaturesProv
}
};
}
private static class ClusteredServerItem implements Externalizable {
private Element element;
private Set<NodeID> nodes = new HashSet<NodeID>();
public ClusteredServerItem() {
}
public void writeExternal(ObjectOutput out) throws IOException {
ExternalizableUtil.getInstance().writeSerializable(out, (DefaultElement) element);
ExternalizableUtil.getInstance().writeExternalizableCollection(out, nodes);
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
element = (Element) ExternalizableUtil.getInstance().readSerializable(in);
ExternalizableUtil.getInstance().readExternalizableCollection(in, nodes, getClass().getClassLoader());
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment