Commit d361954f authored by Tom Evans's avatar Tom Evans Committed by tevans

OF-205: Added clustering support for PubSub module; updated existing...

OF-205: Added clustering support for PubSub module; updated existing clustering plugin; introduced new Hazelcast plugin (merged from pubsub_clustering branch)

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@13301 b35dd754-fafc-0310-a699-88a17e54d16e
parents 72248555 a0974239
......@@ -354,7 +354,7 @@ public class XMPPServer {
Log.warn("Unable to determine local hostname.", ex);
}
version = new Version(3, 7, 2, Version.ReleaseStatus.Alpha, -1);
version = new Version(3, 7, 2, Version.ReleaseStatus.Beta, -1);
if ("true".equals(JiveGlobals.getXMLProperty("setup"))) {
setupMode = false;
}
......@@ -934,9 +934,6 @@ public class XMPPServer {
for (XMPPServerListener listener : listeners) {
listener.serverStopping();
}
// Shutdown the task engine.
TaskEngine.getInstance().shutdown();
// If we don't have modules then the server has already been shutdown
if (modules.isEmpty()) {
return;
......@@ -953,6 +950,10 @@ public class XMPPServer {
modules.clear();
// Stop the Db connection manager.
DbConnectionManager.destroyConnectionProvider();
// Shutdown the task engine.
TaskEngine.getInstance().shutdown();
// hack to allow safe stopping
Log.info("Openfire stopped");
}
......
......@@ -93,6 +93,11 @@ public class NodeID implements Externalizable {
public int hashCode() {
return Arrays.hashCode(nodeID);
}
@Override
public String toString() {
return new String(nodeID);
}
public byte[] toByteArray() {
return nodeID;
......
......@@ -157,6 +157,11 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
pepServiceManager = new PEPServiceManager();
}
public PEPServiceManager getServiceManager()
{
return pepServiceManager;
}
/*
* (non-Javadoc)
*
......@@ -441,7 +446,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
*
* @return the knownRemotePresences map
*/
public Map<String, Set<JID>> getKnownRemotePresenes() {
public Map<String, Set<JID>> getKnownRemotePresences() {
return knownRemotePresences;
}
......
......@@ -348,7 +348,8 @@ public class PEPService implements PubSubService, Cacheable {
else {
// Since recipientJID is not local, try to get presence info from cached known remote
// presences.
Map<String, Set<JID>> knownRemotePresences = XMPPServer.getInstance().getIQPEPHandler().getKnownRemotePresenes();
Map<String, Set<JID>> knownRemotePresences = XMPPServer.getInstance().getIQPEPHandler()
.getKnownRemotePresences();
Set<JID> remotePresenceSet = knownRemotePresences.get(getAddress().toBareJID());
if (remotePresenceSet != null) {
......@@ -494,7 +495,7 @@ public class PEPService implements PubSubService, Cacheable {
Message notification = new Message();
Element event = notification.getElement().addElement("event", "http://jabber.org/protocol/pubsub#event");
Element items = event.addElement("items");
items.addAttribute("node", leafLastPublishedItem.getNode().getNodeID());
items.addAttribute("node", leafLastPublishedItem.getNodeID());
Element item = items.addElement("item");
if (((LeafNode) leafLastPublishedItem.getNode()).isItemRequired()) {
item.addAttribute("id", leafLastPublishedItem.getID());
......
......@@ -21,19 +21,15 @@
package org.jivesoftware.openfire.pubsub;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.dom4j.Element;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.forms.DataForm;
......@@ -76,11 +72,9 @@ public class LeafNode extends Node {
*/
private boolean sendItemSubscribe;
/**
* List of items that were published to the node and that are still active. If the node is
* not configured to persist items then the last published item will be kept. The list is
* sorted cronologically.
* The last item published to this node. In a cluster this may have occurred on a different cluster node.
*/
volatile private PublishedItem lastPublished;
private PublishedItem lastPublished;
// TODO Add checking of max payload size. Return <not-acceptable> plus a application specific error condition of <payload-too-big/>.
......@@ -173,8 +167,10 @@ public class LeafNode extends Node {
protected void deletingNode() {
}
void setLastPublishedItem(PublishedItem item) {
lastPublished = item;
public synchronized void setLastPublishedItem(PublishedItem item)
{
if ((lastPublished == null) || (item != null) && item.getCreationDate().after(lastPublished.getCreationDate()))
lastPublished = item;
}
public int getMaxPayloadSize() {
......@@ -360,7 +356,7 @@ public class LeafNode extends Node {
}
@Override
public PublishedItem getLastPublishedItem() {
public synchronized PublishedItem getLastPublishedItem() {
return lastPublished;
}
......
......@@ -20,21 +20,33 @@
package org.jivesoftware.openfire.pubsub;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.dom4j.Element;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.pubsub.cluster.CancelSubscriptionTask;
import org.jivesoftware.openfire.pubsub.cluster.ModifySubscriptionTask;
import org.jivesoftware.openfire.pubsub.cluster.NewSubscriptionTask;
import org.jivesoftware.openfire.pubsub.cluster.RemoveNodeTask;
import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.CacheFactory;
import org.xmpp.forms.DataForm;
import org.xmpp.forms.FormField;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* A virtual location to which information can be published and from which event
* notifications and/or payloads can be received (in other pubsub systems, this may
......@@ -44,6 +56,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
*/
public abstract class Node {
public static final String PUBSUB_SVC_ID = XMPPServer.getInstance().getPubSubModule().getServiceID();
/**
* Reference to the publish and subscribe service.
*/
......@@ -410,6 +424,21 @@ public abstract class Node {
return subscriptionsByID.values();
}
/**
* Returns all subscriptions to the node. If multiple subscriptions are enabled,
* this method returns the subscriptions by <tt>subId</tt>, otherwise it returns
* the subscriptions by {@link JID}.
*
* @return All subscriptions to the node.
*/
public Collection<NodeSubscription> getAllSubscriptions() {
if (isMultipleSubscriptionsEnabled()) {
return subscriptionsByID.values();
} else {
return subscriptionsByJID.values();
}
}
/**
* Returns the {@link NodeAffiliate} of the specified {@link JID} or <tt>null</tt>
* if none was found. Users that have a subscription with the node will ALWAYS
......@@ -1730,11 +1759,12 @@ public abstract class Node {
}
}
void addAffiliate(NodeAffiliate affiliate) {
public void addAffiliate(NodeAffiliate affiliate) {
affiliates.add(affiliate);
}
void addSubscription(NodeSubscription subscription) {
public void addSubscription(NodeSubscription subscription)
{
subscriptionsByID.put(subscription.getID(), subscription);
subscriptionsByJID.put(subscription.getJID().toString(), subscription);
}
......@@ -1809,6 +1839,7 @@ public abstract class Node {
cancelPresenceSubscriptions();
// Remove the node from memory
service.removeNode(getNodeID());
CacheFactory.doClusterTask(new RemoveNodeTask(this));
// Clear collections in memory (clear them after broadcast was sent)
affiliates.clear();
subscriptionsByID.clear();
......@@ -2017,8 +2048,7 @@ public abstract class Node {
// Generate a subscription ID (override even if one was sent by the client)
String id = StringUtils.randomString(40);
// Create new subscription
NodeSubscription subscription =
new NodeSubscription(service, this, owner, subscriber, subState, id);
NodeSubscription subscription = new NodeSubscription(this, owner, subscriber, subState, id);
// Configure the subscription with the specified configuration (if any)
if (options != null) {
subscription.configure(options);
......@@ -2042,6 +2072,9 @@ public abstract class Node {
subscription.sendAuthorizationRequest();
}
// Update the other members with the new subscription
CacheFactory.doClusterTask(new NewSubscriptionTask(subscription));
// Send last published item (if node is leaf node and subscription status is ok)
if (isSendItemSubscribe() && subscription.isActive()) {
PublishedItem lastItem = getLastPublishedItem();
......@@ -2067,8 +2100,9 @@ public abstract class Node {
* remove the existing affiliation too.
*
* @param subscription the subscription to cancel.
* @param sendToCluster True to forward cancel order to cluster peers
*/
public void cancelSubscription(NodeSubscription subscription) {
public void cancelSubscription(NodeSubscription subscription, boolean sendToCluster) {
// Remove subscription from memory
subscriptionsByID.remove(subscription.getID());
subscriptionsByJID.remove(subscription.getJID().toString());
......@@ -2083,12 +2117,27 @@ public abstract class Node {
// Remove the subscription from the database
PubSubPersistenceManager.removeSubscription(subscription);
}
if (sendToCluster) {
CacheFactory.doClusterTask(new CancelSubscriptionTask(subscription));
}
// Check if we need to unsubscribe from the presence of the owner
if (isPresenceBasedDelivery() && getSubscriptions(subscription.getOwner()).isEmpty()) {
service.presenceSubscriptionNotRequired(this, subscription.getOwner());
}
}
/**
* Cancels an existing subscription to the node. If the subscriber does not have any
* other subscription to the node and his affiliation was of type <tt>none</tt> then
* remove the existing affiliation too.
*
* @param subscription the subscription to cancel.
*/
public void cancelSubscription(NodeSubscription subscription) {
cancelSubscription(subscription, ClusterManager.isClusteringEnabled());
}
/**
* Returns the {@link PublishedItem} whose ID matches the specified item ID or <tt>null</tt>
* if none was found. Item ID may or may not exist and it depends on the node's configuration.
......@@ -2150,7 +2199,7 @@ public abstract class Node {
}
@Override
public String toString() {
public String toString() {
return super.toString() + " - ID: " + getNodeID();
}
......@@ -2183,6 +2232,7 @@ public abstract class Node {
if (approved) {
// Mark that the subscription to the node has been approved
subscription.approved();
CacheFactory.doClusterTask(new ModifySubscriptionTask(subscription));
}
else {
// Cancel the subscription to the node
......
......@@ -40,7 +40,7 @@ public class NodeAffiliate {
private Affiliation affiliation;
NodeAffiliate(Node node, JID jid) {
public NodeAffiliate(Node node, JID jid) {
this.node = node;
this.jid = jid;
}
......@@ -57,7 +57,7 @@ public class NodeAffiliate {
return affiliation;
}
void setAffiliation(Affiliation affiliation) {
public void setAffiliation(Affiliation affiliation) {
this.affiliation = affiliation;
}
......@@ -102,7 +102,7 @@ public class NodeAffiliate {
//
// If the node ID looks like a JID, replace it with the published item's node ID.
if (getNode().getNodeID().indexOf("@") >= 0) {
items.addAttribute("node", publishedItem.getNode().getNodeID());
items.addAttribute("node", publishedItem.getNodeID());
}
// Add item information to the event notification
......
......@@ -69,10 +69,7 @@ public class NodeSubscription {
private static final SimpleDateFormat dateFormat;
private static final FastDateFormat fastDateFormat;
/**
* Reference to the publish and subscribe service.
*/
private PubSubService service;
/**
* The node to which this subscription is interested in.
*/
......@@ -153,15 +150,14 @@ public class NodeSubscription {
/**
* Creates a new subscription of the specified user with the node.
*
* @param service the pubsub service hosting the node where this subscription lives.
* @param node Node to which this subscription is interested in.
* @param owner the JID of the entity that owns this subscription.
* @param jid the JID of the user that owns the subscription.
* @param state the state of the subscription with the node.
* @param id the id the uniquely identifies this subscriptin within the node.
*/
NodeSubscription(PubSubService service, Node node, JID owner, JID jid, State state, String id) {
this.service = service;
public NodeSubscription(Node node, JID owner, JID jid, State state, String id)
{
this.node = node;
this.jid = jid;
this.owner = owner;
......@@ -405,7 +401,7 @@ public class NodeSubscription {
configure(options);
if (originalIQ != null) {
// Return success response
service.send(IQ.createResultIQ(originalIQ));
node.getService().send(IQ.createResultIQ(originalIQ));
}
if (wasUnconfigured) {
......@@ -519,10 +515,10 @@ public class NodeSubscription {
// Check if the service needs to subscribe or unsubscribe from the owner presence
if (!node.isPresenceBasedDelivery() && wasUsingPresence != !presenceStates.isEmpty()) {
if (presenceStates.isEmpty()) {
service.presenceSubscriptionNotRequired(node, owner);
node.getService().presenceSubscriptionNotRequired(node, owner);
}
else {
service.presenceSubscriptionRequired(node, owner);
node.getService().presenceSubscriptionRequired(node, owner);
}
}
}
......@@ -719,7 +715,7 @@ public class NodeSubscription {
}
// Check if delivery is subject to presence-based policy
if (!getPresenceStates().isEmpty()) {
Collection<String> shows = service.getShowPresences(jid);
Collection<String> shows = node.getService().getShowPresences(jid);
if (shows.isEmpty() || Collections.disjoint(getPresenceStates(), shows)) {
return false;
}
......@@ -727,7 +723,8 @@ public class NodeSubscription {
// Check if node is only sending events when user is online
if (node.isPresenceBasedDelivery()) {
// Check that user is online
if (service.getShowPresences(jid).isEmpty()) {
if (node.getService().getShowPresences(jid).isEmpty())
{
return false;
}
}
......@@ -795,7 +792,7 @@ public class NodeSubscription {
subscribeOptions.addElement("required");
}
// Send the result
service.send(result);
node.getService().send(result);
}
/**
......@@ -834,7 +831,7 @@ public class NodeSubscription {
notification.getElement().addElement("delay", "urn:xmpp:delay")
.addAttribute("stamp", fastDateFormat.format(publishedItem.getCreationDate()));
// Send the event notification to the subscriber
service.sendNotification(node, notification, jid);
node.getService().sendNotification(node, notification, jid);
}
/**
......@@ -846,7 +843,7 @@ public class NodeSubscription {
* @return true if the specified user is allowed to modify or cancel the subscription.
*/
boolean canModify(JID user) {
return user.equals(getJID()) || user.equals(getOwner()) || service.isServiceAdmin(user);
return user.equals(getJID()) || user.equals(getOwner()) || node.getService().isServiceAdmin(user);
}
/**
......@@ -899,9 +896,9 @@ public class NodeSubscription {
Message authRequest = new Message();
authRequest.addExtension(node.getAuthRequestForm(this));
authRequest.setTo(owner);
authRequest.setFrom(service.getAddress());
authRequest.setFrom(node.getService().getAddress());
// Send authentication request to node owners
service.send(authRequest);
node.getService().send(authRequest);
}
/**
......@@ -912,7 +909,7 @@ public class NodeSubscription {
Message authRequest = new Message();
authRequest.addExtension(node.getAuthRequestForm(this));
// Send authentication request to node owners
service.broadcast(node, authRequest, node.getOwners());
node.getService().broadcast(node, authRequest, node.getOwners());
}
/**
......
......@@ -39,9 +39,11 @@ import org.jivesoftware.openfire.RoutingTable;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.XMPPServerListener;
import org.jivesoftware.openfire.component.InternalComponentManager;
import org.jivesoftware.openfire.pubsub.cluster.RefreshNodeTask;
import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.forms.DataForm;
......@@ -1210,6 +1212,8 @@ public class PubSubEngine {
else {
newNode.saveToDB();
}
CacheFactory.doClusterTask(new RefreshNodeTask(newNode));
}
else {
conflict = true;
......@@ -1302,6 +1306,8 @@ public class PubSubEngine {
// Update node configuration with the provided data form
// (and update the backend store)
node.configure(completedForm);
CacheFactory.doClusterTask(new RefreshNodeTask(node));
// Return that node configuration was successful
router.route(IQ.createResultIQ(iq));
}
......
......@@ -25,11 +25,8 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
......@@ -166,10 +163,6 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
}
public void process(Packet packet) {
// TODO Remove this method when moving PubSub as a component and removing module code
// The MUC service will receive all the packets whose domain matches the domain of the MUC
// service. This means that, for instance, a disco request should be responded by the
// service itself instead of relying on the server to handle the request.
try {
// Check if the packet is a disco request or a packet with namespace iq:register
if (packet instanceof IQ) {
......@@ -510,27 +503,32 @@ public class PubSubModule extends BasicModule implements ServerItemsProvider, Di
return serviceEnabled;
}
@Override
public void joinedCluster() {
// Disable the service until we know that we are the senior cluster member
enableService(false);
// enableService(false);
}
@Override
public void joinedCluster(byte[] nodeID) {
// Do nothing
}
@Override
public void leftCluster() {
// Offer the service when not running in a cluster
enableService(true);
// enableService(true);
}
@Override
public void leftCluster(byte[] nodeID) {
// Do nothing
}
@Override
public void markedAsSeniorClusterMember() {
// Offer the service since we are the senior cluster member
enableService(true);
// enableService(true);
}
public Iterator<DiscoServerItem> getItems() {
......
......@@ -20,10 +20,19 @@
package org.jivesoftware.openfire.pubsub;
import org.xmpp.packet.JID;
import org.dom4j.Element;
import java.io.Serializable;
import java.io.StringReader;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.pep.PEPServiceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID;
/**
* A published item to a node. Once an item was published to a node, node subscribers will be
......@@ -37,9 +46,28 @@ import java.util.Date;
*
* @author Matt Tucker
*/
public class PublishedItem {
public class PublishedItem implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PublishedItem.class);
private static final int POOL_SIZE = 50;
/**
* Pool of SAX Readers. SAXReader is not thread safe so we need to have a pool of readers.
*/
private static BlockingQueue<SAXReader> xmlReaders = new LinkedBlockingQueue<SAXReader>(POOL_SIZE);
private static final long serialVersionUID = 7012925993623144574L;
static {
// Initialize the pool of sax readers
for (int i=0; i<POOL_SIZE; i++) {
SAXReader xmlReader = new SAXReader();
xmlReader.setEncoding("UTF-8");
xmlReaders.add(xmlReader);
}
}
/**
* JID of the entity that published the item to the node. This is the full JID
* of the publisher.
*/
......@@ -47,7 +75,15 @@ public class PublishedItem {
/**
* The node where the item was published.
*/
private LeafNode node;
private transient LeafNode node;
/**
* The id for the node where the item was published.
*/
private String nodeId;
/**
* The id for the service hosting the node for this item
*/
private String serviceId;
/**
* ID that uniquely identifies the published item in the node.
*/
......@@ -57,29 +93,49 @@ public class PublishedItem {
*/
private Date creationDate;
/**
* The payload included when publishing the item.
* The optional payload is included when publishing the item. This value
* is created from the payload XML and cached as/when needed.
*/
private Element payload;
private transient Element payload;
/**
* XML representation of the payload. This is actually a cache that avoids
* doing Element#asXML.
* XML representation of the payload (for serialization)
*/
private String payloadXML;
PublishedItem(LeafNode node, JID publisher, String id, Date creationDate) {
this.node = node;
this.nodeId = node.getNodeID();
this.serviceId = node.getService().getServiceID();
this.publisher = publisher;
this.id = id;
this.creationDate = creationDate;
}
/**
* Returns the id for the {@link LeafNode} where this item was published.
*
* @return the ID for the leaf node where this item was published.
*/
public String getNodeID() {
return nodeId;
}
/**
* Returns the {@link LeafNode} where this item was published.
*
* @return the leaf node where this item was published.
*/
public LeafNode getNode() {
return node;
if (node == null) {
if (Node.PUBSUB_SVC_ID.equals(serviceId)) {
node = (LeafNode) XMPPServer.getInstance().getPubSubModule().getNode(nodeId);
} else {
PEPServiceManager serviceMgr = XMPPServer.getInstance().getIQPEPHandler().getServiceManager();
node = serviceMgr.hasCachedService(new JID(serviceId)) ?
(LeafNode) serviceMgr.getPEPService(serviceId).getNode(nodeId) : null;
}
}
return node;
}
/**
......@@ -117,6 +173,20 @@ public class PublishedItem {
* @return the payload included when publishing the item or <tt>null</tt> if none was found.
*/
public Element getPayload() {
if (payload == null && payloadXML != null) {
// payload initialized as XML string from DB
SAXReader xmlReader = null;
try {
xmlReader = xmlReaders.take();
payload = xmlReader.read(new StringReader(payloadXML)).getRootElement();
} catch (Exception ex) {
log.error("Failed to parse payload XML", ex);
} finally {
if (xmlReader != null) {
xmlReaders.add(xmlReader);
}
}
}
return payload;
}
......@@ -131,6 +201,19 @@ public class PublishedItem {
return payloadXML;
}
/**
* Sets the payload included when publishing the item. A published item may or may not
* have a payload. Transient nodes that are configured to not broadcast payloads may allow
* published items to have no payload.
*
* @param payloadXML the payload included when publishing the item or <tt>null</tt>
* if none was found.
*/
void setPayloadXML(String payloadXML) {
this.payloadXML = payloadXML;
this.payload = null; // will be recreated only if needed
}
/**
* Sets the payload included when publishing the item. A published item may or may not
* have a payload. Transient nodes that are configured to not broadcast payloads may allow
......@@ -144,8 +227,7 @@ public class PublishedItem {
// Update XML representation of the payload
if (payload == null) {
payloadXML = null;
}
else {
} else {
payloadXML = payload.asXML();
}
}
......@@ -158,7 +240,7 @@ public class PublishedItem {
* @return true if payload contains the specified keyword.
*/
boolean containsKeyword(String keyword) {
if (payloadXML == null || keyword == null) {
if (getPayloadXML() == null || keyword == null) {
return true;
}
return payloadXML.contains(keyword);
......@@ -173,9 +255,41 @@ public class PublishedItem {
*/
public boolean canDelete(JID user) {
if (publisher.equals(user) || publisher.toBareJID().equals(user.toBareJID()) ||
node.isAdmin(user)) {
getNode().isAdmin(user)) {
return true;
}
return false;
}
/**
* Returns a string that uniquely identifies this published item
* in the following format: <i>nodeId:itemId</i>
* @return Unique identifier for this item
*/
public String getItemKey() {
return getItemKey(nodeId,id);
}
/**
* Returns a string that uniquely identifies this published item
* in the following format: <i>nodeId:itemId</i>
* @param node Node for the published item
* @param itemId Id for the published item (unique within the node)
* @return Unique identifier for this item
*/
public static String getItemKey(LeafNode node, String itemId) {
return getItemKey(node.getNodeID(), itemId);
}
/**
* Returns a string that uniquely identifies this published item
* in the following format: <i>nodeId:itemId</i>
* @param nodeId Node id for the published item
* @param itemId Id for the published item (unique within the node)
* @return Unique identifier for this item
*/
public static String getItemKey(String nodeId, String itemId) {
return new StringBuilder(nodeId)
.append(":").append(itemId).toString();
}
}
package org.jivesoftware.openfire.pubsub.cluster;
import org.jivesoftware.openfire.pubsub.Node;
import org.jivesoftware.openfire.pubsub.NodeSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CancelSubscriptionTask extends SubscriptionTask
{
private static final Logger log = LoggerFactory.getLogger(CancelSubscriptionTask.class);
public CancelSubscriptionTask()
{
}
public CancelSubscriptionTask(NodeSubscription subscription)
{
super(subscription);
}
@Override
public void run()
{
log.debug("[TASK] Cancel Subscription : {}", toString());
Node node = getNode();
// This will only occur if a PEP service is not loaded. We can safely do nothing in this
// case since any changes will get loaded from the db when it is loaded.
if (node == null)
return;
// This method will make a db call, but it will simply do nothing since
// the record will already be deleted.
node.cancelSubscription(getSubscription(), false);
}
}
package org.jivesoftware.openfire.pubsub.cluster;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.jivesoftware.openfire.pubsub.PubSubPersistenceManager;
import org.jivesoftware.util.cache.ClusterTask;
public class FlushTask implements ClusterTask
{
public FlushTask()
{
}
@Override
public void run()
{
PubSubPersistenceManager.flushPendingItems(false); // just this member
}
@Override
public Object getResult()
{
return null;
}
@Override
public void writeExternal(ObjectOutput out) throws IOException
{
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
}
}
package org.jivesoftware.openfire.pubsub.cluster;
import org.jivesoftware.openfire.pubsub.NodeSubscription;
import org.jivesoftware.openfire.pubsub.PubSubPersistenceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ModifySubscriptionTask extends SubscriptionTask
{
private static final Logger log = LoggerFactory.getLogger(ModifySubscriptionTask.class);
public ModifySubscriptionTask()
{
}
public ModifySubscriptionTask(NodeSubscription subscription)
{
super(subscription);
}
@Override
public void run()
{
log.debug("[TASK] Modify subscription : {}", toString());
PubSubPersistenceManager.loadSubscription(getService(), getNode(), getSubscriptionId());
}
}
package org.jivesoftware.openfire.pubsub.cluster;
import org.jivesoftware.openfire.pubsub.Node;
import org.jivesoftware.openfire.pubsub.NodeAffiliate;
import org.jivesoftware.openfire.pubsub.NodeSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID;
public class NewSubscriptionTask extends SubscriptionTask
{
private static final Logger log = LoggerFactory.getLogger(NewSubscriptionTask.class);
public NewSubscriptionTask()
{
}
public NewSubscriptionTask(NodeSubscription subscription)
{
super(subscription);
}
@Override
public void run()
{
log.debug("[TASK] New subscription : {}", toString());
Node node = getNode();
// This will only occur if a PEP service is not loaded. We can safely do nothing in this
// case since any changes will get loaded from the db when it is loaded.
if (node == null)
return;
if (node.getAffiliate(getOwner()) == null)
{
// add the missing 'none' affiliation
NodeAffiliate affiliate = new NodeAffiliate(node, getOwner());
affiliate.setAffiliation(NodeAffiliate.Affiliation.none);
node.addAffiliate(affiliate);
}
node.addSubscription(getSubscription());
if (node.isPresenceBasedDelivery() && node.getSubscriptions(getSubscription().getOwner()).size() == 1)
{
if (getSubscription().getPresenceStates().isEmpty())
{
// Subscribe to the owner's presence since the node is only
// sending events to online subscribers and this is the first
// subscription of the user and the subscription is not
// filtering notifications based on presence show values.
getService().presenceSubscriptionRequired(getNode(), getOwner());
}
}
}
}
package org.jivesoftware.openfire.pubsub.cluster;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.pubsub.Node;
import org.jivesoftware.util.cache.ClusterTask;
import org.jivesoftware.util.cache.ExternalizableUtil;
/**
* Base class of clustering tasks for pubsub. It simply stores/retrieves the
* node.
*
* @author Robin Collier
*
*/
public abstract class NodeChangeTask implements ClusterTask
{
private String nodeId;
transient private Node node;
public NodeChangeTask()
{
}
public NodeChangeTask(String nodeIdent)
{
nodeId = nodeIdent;
}
public NodeChangeTask(Node node)
{
this.node = node;
nodeId = node.getNodeID();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException
{
ExternalizableUtil.getInstance().writeSafeUTF(out, nodeId);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
nodeId = ExternalizableUtil.getInstance().readSafeUTF(in);
}
public Node getNode()
{
if (node == null)
node = XMPPServer.getInstance().getPubSubModule().getNode(nodeId);
return node;
}
public String getNodeId()
{
return nodeId;
}
}
package org.jivesoftware.openfire.pubsub.cluster;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.pep.PEPServiceManager;
import org.jivesoftware.openfire.pubsub.Node;
import org.jivesoftware.openfire.pubsub.PubSubService;
import org.jivesoftware.util.cache.ClusterTask;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.xmpp.packet.JID;
public abstract class NodeTask implements ClusterTask
{
protected String nodeId;
protected String serviceId;
protected NodeTask()
{
}
protected NodeTask(Node node)
{
nodeId = node.getNodeID();
serviceId = node.getService().getServiceID();
}
public String getNodeId()
{
return nodeId;
}
public Node getNode()
{
PubSubService svc = getService();
return svc != null ? svc.getNode(nodeId) : null;
}
public PubSubService getService()
{
if (Node.PUBSUB_SVC_ID.equals(serviceId))
return XMPPServer.getInstance().getPubSubModule();
else
{
PEPServiceManager serviceMgr = XMPPServer.getInstance().getIQPEPHandler().getServiceManager();
return serviceMgr.hasCachedService(new JID(serviceId)) ? serviceMgr.getPEPService(serviceId) : null;
}
}
@Override
public Object getResult()
{
return null;
}
@Override
public void writeExternal(ObjectOutput out) throws IOException
{
ExternalizableUtil.getInstance().writeSafeUTF(out, nodeId);
ExternalizableUtil.getInstance().writeSafeUTF(out, serviceId);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
nodeId = ExternalizableUtil.getInstance().readSafeUTF(in);
serviceId = ExternalizableUtil.getInstance().readSafeUTF(in);
}
}
package org.jivesoftware.openfire.pubsub.cluster;
import org.jivesoftware.openfire.pubsub.Node;
import org.jivesoftware.openfire.pubsub.PubSubPersistenceManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Forces the node to be refreshed from the database. This will load a node from
* the database and then add it to the service. If the node already existed it
* will be replaced, thereby refreshing it from persistence.
*
* @author Robin Collier
*
*/
public class RefreshNodeTask extends NodeTask
{
private static final Logger log = LoggerFactory.getLogger(RefreshNodeTask.class);
public RefreshNodeTask()
{
}
public RefreshNodeTask(Node node)
{
super(node);
}
@Override
public void run()
{
log.debug("[TASK] Refreshing node - nodeID: {}", getNodeId());
PubSubPersistenceManager.loadNode(getService(), getNodeId());
}
}
package org.jivesoftware.openfire.pubsub.cluster;
import org.jivesoftware.openfire.pubsub.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Removes a newly deleted node from memory across the cluster.
*
* @author Tom Evans
*
*/
public class RemoveNodeTask extends NodeTask
{
private static final Logger log = LoggerFactory.getLogger(RemoveNodeTask.class);
public RemoveNodeTask()
{
}
public RemoveNodeTask(Node node)
{
super(node);
}
@Override
public void run()
{
log.debug("[TASK] Removing node - nodeID: {}", getNodeId());
getService().removeNode(getNodeId());
}
}
package org.jivesoftware.openfire.pubsub.cluster;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.jivesoftware.openfire.pubsub.NodeSubscription;
import org.jivesoftware.openfire.pubsub.NodeSubscription.State;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.xmpp.packet.JID;
public abstract class SubscriptionTask extends NodeTask
{
private String subId;
private JID owner;
private JID subJid;
private NodeSubscription.State state;
transient private NodeSubscription subscription;
public SubscriptionTask()
{
}
public SubscriptionTask(NodeSubscription subscription)
{
super(subscription.getNode());
subId = subscription.getID();
state = subscription.getState();
owner = subscription.getOwner();
subJid = subscription.getJID();
}
public String getSubscriptionId()
{
return subId;
}
public JID getOwner()
{
return owner;
}
public JID getSubscriberJid()
{
return subJid;
}
public NodeSubscription.State getState()
{
return state;
}
public NodeSubscription getSubscription()
{
if (subscription == null)
{
subscription = new NodeSubscription(getNode(), owner, subJid, state, subId);
}
return subscription;
}
@Override
public void writeExternal(ObjectOutput out) throws IOException
{
super.writeExternal(out);
ExternalizableUtil.getInstance().writeSafeUTF(out, subId);
ExternalizableUtil.getInstance().writeSafeUTF(out, owner.toString());
ExternalizableUtil.getInstance().writeSafeUTF(out, subJid.toString());
ExternalizableUtil.getInstance().writeSerializable(out, state);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
super.readExternal(in);
subId = ExternalizableUtil.getInstance().readSafeUTF(in);
owner = new JID(ExternalizableUtil.getInstance().readSafeUTF(in));
subJid = new JID(ExternalizableUtil.getInstance().readSafeUTF(in));
state = (State) ExternalizableUtil.getInstance().readSerializable(in);
}
@Override
public String toString()
{
return getClass().getSimpleName() + " [(service=" + serviceId + "), (nodeId=" + nodeId + "), (owner=" + owner
+ "),(subscriber=" + subJid + "),(state=" + state + "),(id=" + subId + ")]";
}
}
......@@ -281,7 +281,7 @@ public class TaskEngine {
*/
public void shutdown() {
if (executor != null) {
executor.shutdownNow();
executor.shutdown();
executor = null;
}
......
......@@ -88,7 +88,7 @@ public class CacheFactory {
localCacheFactoryClass = JiveGlobals.getProperty(LOCAL_CACHE_PROPERTY_NAME,
"org.jivesoftware.util.cache.DefaultLocalCacheStrategy");
clusteredCacheFactoryClass = JiveGlobals.getProperty(CLUSTERED_CACHE_PROPERTY_NAME,
"com.jivesoftware.util.cache.CoherenceClusteredCacheFactory");
"com.jivesoftware.util.cache.ClusteredCacheFactory");
cacheNames.put("Favicon Hits", "faviconHits");
cacheNames.put("Favicon Misses", "faviconMisses");
......@@ -128,6 +128,7 @@ public class CacheFactory {
cacheNames.put("Entity Capabilities Users", "entityCapabilitiesUsers");
cacheNames.put("Clearspace SSO Nonce", "clearspaceSSONonce");
cacheNames.put("PEPServiceManager", "pepServiceManager");
cacheNames.put("Published Items", "publishedItems");
cacheProps.put("cache.fileTransfer.size", 128 * 1024l);
cacheProps.put("cache.fileTransfer.maxLifetime", 1000 * 60 * 10l);
......@@ -199,6 +200,8 @@ public class CacheFactory {
cacheProps.put("cache.clearspaceSSONonce.maxLifetime", JiveConstants.MINUTE * 2);
cacheProps.put("cache.pepServiceManager.size", 1024l * 1024 * 10);
cacheProps.put("cache.pepServiceManager.maxLifetime", JiveConstants.MINUTE * 30);
cacheProps.put("cache.publishedItems.size", 1024l * 1024 * 10);
cacheProps.put("cache.publishedItems.maxLifetime", JiveConstants.MINUTE * 15);
}
private CacheFactory() {
......@@ -216,7 +219,7 @@ public class CacheFactory {
}
/**
* Sets a local property which overrides the maximum cache size as configured in coherence-cache-config.xml for the
* Sets a local property which overrides the maximum cache size for the
* supplied cache name.
* @param cacheName the name of the cache to store a value for.
* @param size the maximum cache size.
......@@ -242,7 +245,7 @@ public class CacheFactory {
}
/**
* Sets a local property which overrides the maximum cache entry lifetime as configured in coherence-cache-config.xml
* Sets a local property which overrides the maximum cache entry lifetime
* for the supplied cache name.
* @param cacheName the name of the cache to store a value for.
* @param lifetime the maximum cache entry lifetime.
......@@ -482,7 +485,7 @@ public class CacheFactory {
getClusteredCacheStrategyClassLoader()).newInstance();
return cacheFactory.getMaxClusterNodes();
} catch (ClassNotFoundException e) {
// Do nothing
Log.warn("Clustering implementation class " + clusteredCacheFactoryClass + " not found");
} catch (Exception e) {
Log.error("Error instantiating clustered cache factory", e);
}
......@@ -538,6 +541,10 @@ public class CacheFactory {
return cacheFactoryStrategy.doSynchronousClusterTask(task, nodeID);
}
public static String getPluginName() {
return cacheFactoryStrategy.getPluginName();
}
public static synchronized void initialize() throws InitializationException {
try {
cacheFactoryStrategy = (CacheFactoryStrategy) Class
......@@ -556,9 +563,12 @@ public class CacheFactory {
private static ClassLoader getClusteredCacheStrategyClassLoader() {
PluginManager pluginManager = XMPPServer.getInstance().getPluginManager();
Plugin plugin = pluginManager.getPlugin("clustering");
Plugin plugin = pluginManager.getPlugin("hazelcast");
if (plugin == null) {
plugin = pluginManager.getPlugin("enterprise");
plugin = pluginManager.getPlugin("clustering");
if (plugin == null) {
plugin = pluginManager.getPlugin("enterprise");
}
}
PluginClassLoader pluginLoader = pluginManager.getPluginClassloader(plugin);
if (pluginLoader != null) {
......
......@@ -181,4 +181,11 @@ public interface CacheFactoryStrategy {
* @return an existing lock on the specified key or creates a new one if none was found.
*/
Lock getLock(Object key, Cache cache);
/**
* Get the plugin name corresponding to this clustering implementation
*
* @return the plugin name for this clustering implementation
*/
String getPluginName();
}
......@@ -105,6 +105,11 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy {
public void updateCacheStats(Map<String, Cache> caches) {
}
@Override
public String getPluginName() {
return "local";
}
public Lock getLock(Object key, Cache cache) {
Object lockKey = key;
if (key instanceof String) {
......
......@@ -44,6 +44,12 @@
Clustering Plugin Changelog
</h1>
<p><b>1.2.2</b> -- Aug 31, 2012</p>
<ul>
<li>Updated plugin sources to be compatible with Coherence 3.7.1.</li>
<li>Updated README docs with Coherence requirements (EE) and clustering setup.</li>
</ul>
<p><b>1.2.1</b> -- Dec 16, 2011</p>
<ul>
......
After you have licensed and downloaded Coherence EE from Oracle, place
the following jar files in this folder:
coherence.jar
coherence-work.jar
To build the clustering plugin, issue the following command from
the Openfire (source) /build/ folder:
$OPENFIRE_SRC/build> ant -Dplugin=clustering plugin
Also note that due to classpath loading order, it may be necessary to
either remove the coherence-cache-config.xml file from the Coherence
runtime JAR, or rename the plugin-clustering.jar file to force it to
load before coherence.jar (e.g. "clustering-plugin.jar" or similar).
In order to run Oracle Coherence in production mode, you will need to
secure licensing for the Enterprise Edition (EE) of Coherence. While
clustered caching for Openfire is available in the Standard Edition (SE),
per the Oracle Fusion licensing docs the InvocationService (which is
used by Openfire to distribute tasks among the cluster members) is only
available in EE or Grid Edition (GE).
Note that Coherence is configured to run GE in development mode by default.
You can change this setting by overriding the following Java system properties
via /etc/sysconfig/openfire (RPM) or openfired.vmoptions (Windows):
-Dtangosol.coherence.edition=EE
-Dtangosol.coherence.mode=prod
The current Coherence release is version 3.7.1.
\ No newline at end of file
......@@ -5,8 +5,8 @@
<name>${plugin.name}</name>
<description>${plugin.description}</description>
<author>Jive Software</author>
<version>1.2.1</version>
<date>12/17/2011</date>
<version>1.2.2</version>
<date>08/31/2012</date>
<minServerVersion>3.7.2</minServerVersion>
</plugin>
......@@ -59,28 +59,41 @@ Clustering Plugin Readme
<p>
The clustering plugin adds support for running multiple redundant Openfire
servers together in a cluster. By running Openfire in a cluster, you can
distribute the load amongst a number of servers, as well as having some
form of redundency in the event that one of your servers dies. <font color="red">This
distribute the connection load among several servers, as well as having some
form of failover in the event that one of your servers dies. <font color="red">This
plugin requires a valid <a href="http://www.oracle.com/technology/products/coherence/index.html">Oracle Coherence</a> license.</font>
</p>
</p><p>
In order to run Oracle Coherence in production mode, you will need to secure
licensing for (at least) the Enterprise Edition (EE) of Coherence. Refer to the
<a href="http://docs.oracle.com/cd/E14571_01/doc.1111/e14860/products.htm#BABIEAJF">
Oracle Fusion licensing docs</a> for more information. Openfire uses the Compute Grid
InvocationService to distribute tasks among the cluster members, but this feature is
currently available only in EE or Grid Edition (GE).
</p><p>
Note that Coherence is configured to run as GE in development mode by default.
You can change this setting by overriding the following Java system properties
via /etc/sysconfig/openfire (RPM) or openfired.vmoptions (Windows):
</p><pre>
-Dtangosol.coherence.edition=EE
-Dtangosol.coherence.mode=prod
</pre><p>
The current Coherence release is version 3.7.1.</p>
<h2>Installation</h2>
<p>
Follow steps 1 through 4 for adding Oracle Coherence libraries to Openfire. Step 5
<p>After installing the clustering plugin (by copying clustering.jar into the Openfire plugins directory),
follow steps 1 through 4 for adding Oracle Coherence libraries to Openfire. Step 5
explains how to add this plugin to your Openfire setup.
<ol>
<li>Get <a href="http://www.oracle.com/technology/products/coherence/index.html">Oracle Coherence for Java Version</a>.</li>
<li>Unzip the coherence file and locate <b>coherence.jar</b>, <b>coherence-work.jar</b> and <b>tangosol.jar</b> in folder coherence/lib.</li>
<li>Copy <b>coherence.jar</b>, <b>coherence-work.jar</b> and <b>tangosol.jar</b> to [openfire_home]/lib.</li>
<li>Unzip the coherence file and locate <b>coherence.jar</b> and <b>coherence-work.jar</b> in folder coherence/lib.</li>
<li>Copy <b>coherence.jar</b> and <b>coherence-work.jar</b> to [openfire_home]/plugins/clustering/lib.</li>
<li>Restart Openfire server.</li>
<li>Copy clustering.jar into the plugins directory of your Openfire installation. The plugin will then be automatically deployed.</li>
</ol>
This plugin has been tested with Oracle Coherence Version 3.3.1/389. To upgrade
to a new Oracle Coherence version follow steps 1 through 4 as explained above.
To upgrade to a new version of the plugin just get the latest version from igniterealtime.org
and follow step 5.
This version of the plugin has been updated to be compatible with Oracle Coherence Version 3.7.1. Earlier
versions of the clustering plugin are incompatible with Coherence 3.7.1 due to certain changes in the
Coherence API.
</p>
<h2>Upgrading from Openfire Enterprise</h2>
......@@ -97,6 +110,15 @@ outlined in the <i>Installation</i> section above.
<p>
To enable clustering or monitor the cluster go to: Server --&gt; Server Manager --&gt; Clustering
</p>
<p>
You can change a number of Coherence properties by overriding Java system properties
via /etc/sysconfig/openfire (RPM) or openfired.vmoptions (Windows). For example,
to change the default cluster multicast port number, use the following:
</p><pre>
-Dtangosol.coherence.clusterport=32380
</pre><p>
Other such settings may be found in the tangosol-coherence.xml file located in the
coherence.jar file, identified via the "system-property" attribute.</p>
<h2>Compiling from source code</h2>
......@@ -108,9 +130,10 @@ these steps to have a working environment:
<ol>
<li>Get <a href="http://www.igniterealtime.org/downloads/source.jsp">Openfire's source code</a>.</li>
<li>Get <a href="http://www.oracle.com/technology/products/coherence/index.html">Oracle Coherence for Java Version</a>.</li>
<li>Unzip the coherence file and locate <b>coherence.jar</b>, <b>coherence-work.jar</b> and <b>tangosol.jar</b> in folder coherence/lib.</li>
<li>Copy <b>coherence.jar</b>, <b>coherence-work.jar</b> and <b>tangosol.jar</b> to [openfire]/build/lib.</li>
<li>Add <b>coherence.jar</b>, <b>coherence-work.jar</b> and <b>tangosol.jar</b> to your build path.</li>
<li>Unzip the coherence file and locate <b>coherence.jar</b> and <b>coherence-work.jar</b> in folder coherence/lib.</li>
<li>Copy <b>coherence.jar</b> and <b>coherence-work.jar</b> to [openfire]/plugins/clustering/lib.</li>
<li>Add <b>coherence.jar</b> and <b>coherence-work.jar</b> to your IDE build path, or use the Ant build script provided
with the Openfire source distribution (e.g. "ant -Dplugin=clustering plugin").</li>
</ol>
</p>
......
......@@ -523,6 +523,25 @@ http://www.tangosol.com/UserGuide-Reference-CacheConfig.jsp
</init-params>
</cache-mapping>
<cache-mapping>
<cache-name>Published Items</cache-name>
<scheme-name>replicated</scheme-name>
<init-params>
<init-param>
<param-name>back-size-high</param-name>
<param-value>10485760</param-value>
</init-param>
<init-param>
<param-name>back-expiry</param-name>
<param-value>15m</param-value>
</init-param>
<init-param>
<param-name>back-size-low</param-name>
<param-value>9437180</param-value>
</init-param>
</init-params>
</cache-mapping>
<!-- partitioned caches -->
<cache-mapping>
......@@ -783,7 +802,7 @@ http://www.tangosol.com/UserGuide-Reference-CacheConfig.jsp
</init-param>
<init-param>
<param-name>back-size-low</param-name>
<param-value>943718</param-value>
<param-value>9437180</param-value>
</init-param>
</init-params>
</cache-mapping>
......@@ -802,7 +821,7 @@ http://www.tangosol.com/UserGuide-Reference-CacheConfig.jsp
</init-param>
<init-param>
<param-name>back-size-low</param-name>
<param-value>943718</param-value>
<param-value>9437180</param-value>
</init-param>
</init-params>
</cache-mapping>
......
......@@ -23,6 +23,8 @@ package com.jivesoftware.openfire;
import com.jivesoftware.openfire.session.RemoteSessionLocator;
import com.jivesoftware.util.cache.CoherenceExternalizableUtil;
import com.jivesoftware.util.cluster.CoherencePacketRouter;
import com.tangosol.net.CacheFactory;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.container.Plugin;
......@@ -67,13 +69,13 @@ public class ClusteringPlugin implements Plugin, PropertyEventListener {
File[] jars = pluginDir.listFiles(new FileFilter() {
public boolean accept(File pathname) {
String fileName = pathname.getName().toLowerCase();
return (fileName.equalsIgnoreCase("enterprise.jar"));
return (fileName.equalsIgnoreCase("enterprise.jar") || fileName.equalsIgnoreCase("hazelcast.jar"));
}
});
if (jars.length > 0) {
// Do not load this plugin since Enterprise is still installed
System.out.println("Enterprise plugin found. Stopping Clustering Plugin");
throw new IllegalStateException("This plugin cannot run next to the Enterprise plugin");
System.out.println("Conflicting plugin found. Stopping Clustering Plugin");
throw new IllegalStateException("This plugin cannot run with the Enterprise or Hazelcast plugin");
}
// Make sure that the enteprise folder exists under the home directory
......@@ -143,6 +145,8 @@ public class ClusteringPlugin implements Plugin, PropertyEventListener {
XMPPServer.getInstance().setRemoteSessionLocator(new RemoteSessionLocator());
// Set packet router to use to deliver packets to remote cluster nodes
XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(new CoherencePacketRouter());
// Initialize the Coherence cluster configuration
CacheFactory.getClusterConfig();
}
/**
......
......@@ -190,7 +190,7 @@ public class ClusteredCache implements Cache, QueryMap, InvocableMap {
public long getCacheHits() {
if (map instanceof NearCache) {
return ((NearCache)map).getCacheHits();
return ((NearCache)map).getCacheStatistics().getCacheHits();
}
else if (backingCache != null) {
return backingCache.getCacheHits();
......@@ -202,7 +202,7 @@ public class ClusteredCache implements Cache, QueryMap, InvocableMap {
public long getCacheMisses() {
if (map instanceof NearCache) {
return ((NearCache)map).getCacheMisses();
return ((NearCache)map).getCacheStatistics().getCacheMisses();
}
else if (backingCache != null) {
return backingCache.getCacheMisses();
......
......@@ -41,7 +41,7 @@ import java.util.concurrent.locks.Lock;
*
* @author Gaston Dombiak
*/
public class CoherenceClusteredCacheFactory implements CacheFactoryStrategy {
public class ClusteredCacheFactory implements CacheFactoryStrategy {
/**
* Storage for cache statistics
......@@ -93,7 +93,7 @@ public class CoherenceClusteredCacheFactory implements CacheFactoryStrategy {
}
else {
com.tangosol.net.CacheFactory.getCache("opt-$cacheStats");
taskService = com.tangosol.net.CacheFactory.getInvocationService("OpenFire Cluster Service");
taskService = (InvocationService) com.tangosol.net.CacheFactory.getService("OpenFire Cluster Service");
// Update the running state of the cluster
state = cluster != null ? State.started : State.stopped;
......@@ -332,6 +332,11 @@ public class CoherenceClusteredCacheFactory implements CacheFactoryStrategy {
}
}
@Override
public String getPluginName() {
return "clustering";
}
private static Invocable buildInvocable(final ClusterTask task) {
return new AbstractInvocable() {
public void run() {
......
......@@ -286,7 +286,7 @@ public class CoherenceExternalizableUtil implements ExternalizableUtilStrategy {
}
public Serializable readSerializable(DataInput in) throws IOException {
return ExternalizableHelper.readSerializable(in);
return (Serializable) ExternalizableHelper.readSerializable(in);
}
public void writeSafeUTF(DataOutput out, String value) throws IOException {
......
......@@ -64,7 +64,7 @@ public class CoherenceInfo {
*/
public static Map getNodeInfo() {
InvocationService service = com.tangosol.net.CacheFactory.getInvocationService("OpenFire Cluster Service");
InvocationService service = (InvocationService) com.tangosol.net.CacheFactory.getService("OpenFire Cluster Service");
// Run cluster-wide stats query
Map results = service.query(new AbstractInvocable() {
......@@ -107,7 +107,7 @@ public class CoherenceInfo {
*/
public static void clearCacheStats() {
InvocationService service = com.tangosol.net.CacheFactory.getInvocationService("OpenFire Cluster Service");
InvocationService service = (InvocationService) com.tangosol.net.CacheFactory.getService("OpenFire Cluster Service");
service.execute(new AbstractInvocable() {
public void run() {
......
......@@ -123,7 +123,7 @@
}
// Get the cache stats object:
Map cacheStats = com.tangosol.net.CacheFactory.getReplicatedCache(
Map cacheStats = com.tangosol.net.CacheFactory.getCache(
"opt-$cacheStats", com.tangosol.net.CacheFactory.class.getClassLoader());
// Decimal formatter for nubmers
......@@ -354,8 +354,8 @@ Cache statistics for this cluster node appear below.
double memUsed = (double) size / (1024 * 1024);
double totalMem = (double) maxSize / (1024 * 1024);
double freeMem = 100 - 100 * memUsed / totalMem;
double usedMem = 100 * memUsed / totalMem;
double freeMem = 100 - 100 * memUsed / Math.max(1, totalMem);
double usedMem = 100 * memUsed / Math.max(1, totalMem);
long hits = theStats[3];
long misses = theStats[4];
double hitPercent = 0.0;
......
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
<html>
<head>
<title>Hazelcast Clustering Plugin Changelog</title>
<style type="text/css">
BODY {
font-size : 100%;
}
BODY, TD, TH {
font-family : tahoma, verdana, arial, helvetica, sans-serif;
font-size : 0.8em;
}
H2 {
font-size : 10pt;
font-weight : bold;
padding-left : 1em;
}
A:hover {
text-decoration : none;
}
H1 {
font-family : tahoma, arial, helvetica, sans-serif;
font-size : 1.4em;
font-weight: bold;
border-bottom : 1px #ccc solid;
padding-bottom : 2px;
}
TT {
font-family : courier new;
font-weight : bold;
color : #060;
}
PRE {
font-family : courier new;
font-size : 100%;
}
</style>
</head>
<body>
<h1>
Hazelcast Clustering Plugin Changelog
</h1>
<p><b>1.0.0</b> -- September 22, 2012</p>
<ul>
<li>Initial release. </li>
</ul>
</body>
</html>
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2008-2012, Hazel Bilisim Ltd. All Rights Reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-2.3.xsd"
xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<group>
<name>openfire</name>
<password>openfire</password>
</group>
<management-center enabled="false"/>
<network>
<port auto-increment="true">5701</port>
<join>
<multicast enabled="true">
<multicast-group>224.2.2.3</multicast-group>
<multicast-port>54327</multicast-port>
</multicast>
<tcp-ip enabled="false"/>
<aws enabled="false"/>
</join>
<interfaces enabled="false"/>
<ssl enabled="false" />
<socket-interceptor enabled="false" />
<symmetric-encryption enabled="false">
<!--
encryption algorithm such as
DES/ECB/PKCS5Padding,
PBEWithMD5AndDES,
AES/CBC/PKCS5Padding,
Blowfish,
DESede
-->
<algorithm>PBEWithMD5AndDES</algorithm>
<!-- salt value to use when generating the secret key -->
<salt>thesalt</salt>
<!-- pass phrase to use when generating the secret key -->
<password>thepass</password>
<!-- iteration count to use when generating the secret key -->
<iteration-count>19</iteration-count>
</symmetric-encryption>
<asymmetric-encryption enabled="false">
<!-- encryption algorithm -->
<algorithm>RSA/NONE/PKCS1PADDING</algorithm>
<!-- private key password -->
<keyPassword>thekeypass</keyPassword>
<!-- private key alias -->
<keyAlias>local</keyAlias>
<!-- key store type -->
<storeType>JKS</storeType>
<!-- key store password -->
<storePassword>thestorepass</storePassword>
<!-- path to the key store -->
<storePath>keystore</storePath>
</asymmetric-encryption>
</network>
<partition-group enabled="false"/>
<executor-service>
<core-pool-size>16</core-pool-size>
<max-pool-size>64</max-pool-size>
<keep-alive-seconds>60</keep-alive-seconds>
</executor-service>
<queue name="default">
<!--
Maximum size of the queue. When a JVM's local queue size reaches the maximum,
all put/offer operations will get blocked until the queue size
of the JVM goes down below the maximum.
Any integer between 0 and Integer.MAX_VALUE. 0 means
Integer.MAX_VALUE. Default is 0.
-->
<max-size-per-jvm>0</max-size-per-jvm>
<!--
Name of the map configuration that will be used for the backing distributed
map for this queue.
-->
<backing-map-ref>default</backing-map-ref>
</queue>
<map name="default">
<!--
Number of backups. If 1 is set as the backup-count for example,
then all entries of the map will be copied to another JVM for
fail-safety. 0 means no backup.
-->
<backup-count>1</backup-count>
<!--
Number of async backups. 0 means no backup.
-->
<async-backup-count>0</async-backup-count>
<!--
Maximum number of seconds for each entry to stay in the map. Entries that are
older than <time-to-live-seconds> and not updated for <time-to-live-seconds>
will get automatically evicted from the map.
Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
-->
<time-to-live-seconds>0</time-to-live-seconds>
<!--
Maximum number of seconds for each entry to stay idle in the map. Entries that are
idle(not touched) for more than <max-idle-seconds> will get
automatically evicted from the map. Entry is touched if get, put or containsKey is called.
Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
-->
<max-idle-seconds>0</max-idle-seconds>
<!--
Valid values are:
NONE (no eviction),
LRU (Least Recently Used),
LFU (Least Frequently Used).
NONE is the default.
-->
<eviction-policy>NONE</eviction-policy>
<!--
Maximum size of the map. When max size is reached,
map is evicted based on the policy defined.
Any integer between 0 and Integer.MAX_VALUE. 0 means
Integer.MAX_VALUE. Default is 0.
-->
<max-size policy="cluster_wide_map_size">0</max-size>
<!--
When max. size is reached, specified percentage of
the map will be evicted. Any integer between 0 and 100.
If 25 is set for example, 25% of the entries will
get evicted.
-->
<eviction-percentage>25</eviction-percentage>
<!--
While recovering from split-brain (network partitioning),
map entries in the small cluster will merge into the bigger cluster
based on the policy set here. When an entry merge into the
cluster, there might an existing entry with the same key already.
Values of these entries might be different for that same key.
Which value should be set for the key? Conflict is resolved by
the policy set here. Default policy is hz.ADD_NEW_ENTRY
There are built-in merge policies such as
hz.NO_MERGE ; no entry will merge.
hz.ADD_NEW_ENTRY ; entry will be added if the merging entry's key
doesn't exist in the cluster.
hz.HIGHER_HITS ; entry with the higher hits wins.
hz.LATEST_UPDATE ; entry with the latest update wins.
-->
<merge-policy>hz.ADD_NEW_ENTRY</merge-policy>
</map>
</hazelcast>
\ No newline at end of file
After you have licensed and downloaded Coherence EE from Oracle, place
the following jar files in this folder:
coherence.jar
coherence-work.jar
To build the clustering plugin, issue the following command from
the Openfire (source) /build/ folder:
$OPENFIRE_SRC/build> ant -Dplugin=clustering plugin
Also note that due to classpath loading order, it may be necessary to
either remove the coherence-cache-config.xml file from the Coherence
runtime JAR, or rename the plugin-clustering.jar file to force it to
load before coherence.jar (e.g. "clustering-plugin.jar" or similar).
In order to run Oracle Coherence in production mode, you will need to
secure licensing for the Enterprise Edition (EE) of Coherence. While
clustered caching for Openfire is available in the Standard Edition (SE),
per the Oracle Fusion licensing docs the InvocationService (which is
used by Openfire to distribute tasks among the cluster members) is only
available in EE or Grid Edition (GE).
Note that Coherence is configured to run GE in development mode by default.
You can change this setting by overriding the following Java system properties
via /etc/sysconfig/openfire (RPM) or openfired.vmoptions (Windows):
-Dtangosol.coherence.edition=EE
-Dtangosol.coherence.mode=prod
The current Coherence release is version 3.7.1.
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<plugin>
<class>com.jivesoftware.openfire.HazelcastPlugin</class>
<name>${plugin.name}</name>
<description>${plugin.description}</description>
<author>Tom Evans</author>
<version>1.0.0</version>
<date>09/10/2012</date>
<minServerVersion>3.7.2</minServerVersion>
</plugin>
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
<html>
<head>
<title>Hazelcast Clustering Plugin Readme</title>
<style type="text/css">
BODY {
font-size : 100%;
}
BODY, TD, TH {
font-family : tahoma, verdana, arial, helvetica, sans-serif;
font-size : 0.8em;
}
H2 {
font-size : 10pt;
font-weight : bold;
}
A:hover {
text-decoration : none;
}
H1 {
font-family : tahoma, arial, helvetica, sans-serif;
font-size : 1.4em;
font-weight: bold;
border-bottom : 1px #ccc solid;
padding-bottom : 2px;
}
TT {
font-family : courier new;
font-weight : bold;
color : #060;
}
PRE {
font-family : courier new;
font-size : 100%;
}
#datatable TH {
color : #fff;
background-color : #2A448C;
text-align : left;
}
#datatable TD {
background-color : #FAF6EF;
}
#datatable .name {
background-color : #DCE2F5;
}
</style>
</head>
<body>
<h1>Hazelcast Clustering Plugin Readme</h1>
<h2>Overview</h2>
<p>
The Hazelcast plugin adds support for running multiple redundant Openfire
servers together in a cluster. By running Openfire in a cluster, you can
distribute the connection load among several servers, while also providing
failover in the event that one of your servers fails. This plugin is a
drop-in replacement for the original Openfire clustering plugin, using the
open source <a href="http://www.hazelcast.com">Hazelcast</a> data distribution
framework in lieu of an expensive proprietary third-party product.
</p>
<p>
The current Hazelcast release is version 2.3.1.
</p>
<h2>Installation</h2>
<p>
To install Hazelcast, simply drop the hazelcast.jar into $OPENFIRE_HOME/plugins along
with any other plugins you may have installed. Note that Hazelcast and the original
Openfire clustering plugin (clustering.jar) are mutually exclusive. You will need to
remove the clustering plugin before installing Hazelcast into your Openfire instance.
</p>
<p>
To create an Openfire cluster, you will need at least two separate Openfire servers,
and each server must have the Hazelcast plugin installed. By default, the servers
will discover each other by exchanging UDP (multicast) packets via a configurable
IP address and port, but other initialization options are available if your network
does not support multicast communication (see "Configuration" below).
</p>
<p>
In addition, you will need some form of load balancer to distribute the connection
load among the members of your Openfire cluster. There are several commercial and
open source alternatives for this, including the Apache web server (httpd) plus
<a href="http://httpd.apache.org/docs/current/mod/mod_proxy_balancer.html">mod_proxy_balancer</a>
(if you are using the HTTP/BOSH Openfire connector). Some popular solutions include the
<a href="http://www.f5.com/products/big-ip/big-ip-local-traffic-manager/overview/">F5 LTM</a>
(commercial) and <a href="http://haproxy.1wt.eu/">HAProxy</a> (open source), among
<a href="http://en.wikipedia.org/wiki/Load_balancing_%28computing%29">many others</a>.
</p>
<h2>Configuration</h2>
<p>
There are several configuration options built into the Hazelcast plugin
as Openfire system properties:
<ol>
<li><i>hazelcast.startup.delay.seconds</i> (5): Number of seconds
to wait before launching the Hazelcast plugin. This allows Openfire to
deploy any other plugins before initializing the cluster caches, etc.</li>
<li><i>hazelcast.startup.retry.count</i> (1): Number of times to retry
initialization if the cluster fails to start on the first attempt.</li>
<li><i>hazelcast.startup.retry.seconds</i> (10): Number of seconds to wait
between subsequent attempts to start the cluster.</li>
<li><i>hazelcast.max.execution.seconds</i> (30): Maximum time to wait
when running a synchronous task across members of the cluster.</li>
<li><i>hazelcast.config.xml.filename</i> (hazelcast-cache-config.xml): Name
of the Hazelcast configuration file. By overriding this value you can easily
install a custom cluster configuration file in the Hazelcast plugin /classes/
directory, or in the classpath of your own custom plugin.</li>
</ol>
</p>
<p>
The Hazelcast plugin uses the <a href="http://www.hazelcast.com/docs/2.3/manual/single_html/#Config">
XML configuration builder</a> to initialize the cluster from the XML file described above.
By default the cluster members will attempt to discover each other via multicast at the
following location:
<ul>
<li>IP Address: 224.2.2.3</li>
<li>Port: 54327</li>
</ul>
Note that these values can be overridden in the plugin's /classes/hazelcast-cache-config.xml
file (via the multicast-group and multicast-port elements). Many other initialization and
discovery options exist, as documented in the Hazelcast configuration docs noted above. For
example, to set up a two-node cluster using well-known DNS name/port values, try the
following alternative:
<pre>
...
&lt;join&gt;
&lt;multicast enabled="false"/&gt;
&lt;tcp-ip enabled="true"&gt;
&lt;hostname&gt;of-node-a.example.com:5701&lt;/hostname&gt;
&lt;hostname&gt;of-node-b.example.com:5701&lt;/hostname&gt;
&lt;/tcp-ip&gt;
&lt;aws enabled="false"/&gt;
&lt;/join&gt;
...
</pre>
Please refer to the <a href="http://www.hazelcast.com/docs/2.3/manual/single_html/">
Hazelcast reference manual</a> for more information.
</p>
</body>
</html>
##
## Hazelcast Clustering Resource Bundle
##
## REVISION HISTORY
##
## 1.0.0
## Initial Release
plugin.name=Hazelcast Clustering Plugin
plugin.description=Clustering support for Openfire, powered by Hazelcast.
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2004-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.openfire;
import java.io.File;
import java.io.FileFilter;
import java.text.DateFormat;
import java.text.ParseException;
import java.util.Date;
import java.util.Locale;
import java.util.Map;
import java.util.TimerTask;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.container.Plugin;
import org.jivesoftware.openfire.container.PluginManager;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.PropertyEventDispatcher;
import org.jivesoftware.util.PropertyEventListener;
import org.jivesoftware.util.TaskEngine;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.jivesoftware.util.cache.ExternalizableUtilStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.jivesoftware.openfire.session.RemoteSessionLocator;
import com.jivesoftware.util.cache.ClusterExternalizableUtil;
import com.jivesoftware.util.cluster.ClusterPacketRouter;
/**
* Hazelcast clustering plugin. This implementation is based upon
* (and borrows heavily from) the original Openfire clustering plugin.
* See this plugin's README file for more information.
*
* @author Tom Evans
* @author Matt Tucker
*/
public class HazelcastPlugin extends TimerTask implements Plugin, PropertyEventListener {
private static Logger logger = LoggerFactory.getLogger(HazelcastPlugin.class);
private static final long CLUSTER_STARTUP_DELAY_TIME =
JiveGlobals.getLongProperty("hazelcast.startup.delay.seconds", 5);
/**
* Keep serialization strategy the server was using before we set our strategy. We will
* restore old strategy when plugin is unloaded.
*/
private ExternalizableUtilStrategy serializationStrategy;
public void initializePlugin(PluginManager manager, File pluginDirectory) {
// start cluster using a separate thread after a short delay
// this will allow other plugins to initialize during startup
TaskEngine.getInstance().schedule(this, CLUSTER_STARTUP_DELAY_TIME*1000);
}
@Override
public void run() {
System.out.println("Starting Hazelcast Clustering Plugin");
// Check if another cluster is installed and stop loading this plugin if found
File pluginDir = new File(JiveGlobals.getHomeDirectory(), "plugins");
File[] jars = pluginDir.listFiles(new FileFilter() {
public boolean accept(File pathname) {
String fileName = pathname.getName().toLowerCase();
return (fileName.equalsIgnoreCase("enterprise.jar") ||
fileName.equalsIgnoreCase("coherence.jar"));
}
});
if (jars.length > 0) {
// Do not load this plugin if a conflicting implementation exists
logger.warn("Conflicting clustering plugins found; remove Coherence and/or Enterprise jar files");
throw new IllegalStateException("Clustering plugin configuration conflict (Coherence)");
}
// List for clustering setting events (e.g. enabled/disabled)
PropertyEventDispatcher.addListener(this);
if (ClusterManager.isClusteringEnabled()) {
initForClustering();
// Start up or join the cluster and initialize caches
ClusterManager.startup();
}
}
private void initForClustering() {
// Set the serialization strategy to use for transmitting objects between node clusters
serializationStrategy = ExternalizableUtil.getInstance().getStrategy();
ExternalizableUtil.getInstance().setStrategy(new ClusterExternalizableUtil());
// Set session locator to use when in a cluster
XMPPServer.getInstance().setRemoteSessionLocator(new RemoteSessionLocator());
// Set packet router to use to deliver packets to remote cluster nodes
XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(new ClusterPacketRouter());
}
/**
* Returns the date when this release of Openfire clustering plugin was released.
*
* @return the date when this release of Openfire clustering plugin was released.
*/
public static Date getReleaseDate() {
try {
// @DATE@ should be replaced with a date with the following format: Jan 31, 2007
// Automatically set by ANT build tasks
return DateFormat.getDateInstance(DateFormat.MEDIUM, Locale.US).parse("@DATE@");
}
catch (ParseException e) {
logger.error("Error parsing date", e);
return null;
}
}
public void destroyPlugin() {
ClusterManager.shutdown();
// Set the old serialization strategy was using before clustering was loaded
ExternalizableUtil.getInstance().setStrategy(serializationStrategy);
// Stop listing for clustering setting events (e.g. enabled/disabled)
PropertyEventDispatcher.removeListener(this);
}
public void propertySet(String property, Map<String, Object> params) {
// Ignore
}
public void propertyDeleted(String property, Map<String, Object> params) {
// Ignore
}
public void xmlPropertySet(String property, Map<String, Object> params) {
if (ClusterManager.CLUSTER_PROPERTY_NAME.equals(property)) {
if (Boolean.parseBoolean((String) params.get("value"))) {
// Clustering was enabled
initForClustering();
}
else {
// Clustering was disabled
}
}
}
public void xmlPropertyDeleted(String property, Map<String, Object> params) {
// Do nothing
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.NodeID;
import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.spi.ClientRoute;
import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.xmpp.packet.JID;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
/**
* Class that defines possible remote operations that could be performed
* on remote client sessions.
*
* @author Gaston Dombiak
*/
public class ClientSessionTask extends RemoteSessionTask {
private JID address;
public ClientSessionTask() {
super();
}
protected ClientSessionTask(JID address, Operation operation) {
super(operation);
this.address = address;
}
Session getSession() {
return XMPPServer.getInstance().getRoutingTable().getClientRoute(address);
}
public void run() {
super.run();
ClientSession session = (ClientSession) getSession();
if (session instanceof RemoteClientSession) {
// The session is being hosted by other cluster node so log this unexpected case
Cache<String, ClientRoute> usersCache = CacheFactory.createCache(RoutingTableImpl.C2S_CACHE_NAME);
ClientRoute route = usersCache.get(address.toString());
NodeID nodeID = route.getNodeID();
Log.warn("Found remote session instead of local session. JID: " + address + " found in Node: " +
nodeID.toByteArray() + " and local node is: " + XMPPServer.getInstance().getNodeID().toByteArray());
}
if (operation == Operation.isInitialized) {
if (session instanceof RemoteClientSession) {
// Something is wrong since the session shoud be local instead of remote
// Assume some default value
result = true;
}
else {
result = session.isInitialized();
}
}
else if (operation == Operation.incrementConflictCount) {
if (session instanceof RemoteClientSession) {
// Something is wrong since the session shoud be local instead of remote
// Assume some default value
result = 2;
}
else {
result = session.incrementConflictCount();
}
}
}
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
ExternalizableUtil.getInstance().writeSafeUTF(out, address.toString());
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
address = new JID(ExternalizableUtil.getInstance().readSafeUTF(in));
}
public String toString() {
return super.toString() + " operation: " + operation + " address: " + address;
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.session.ComponentSession;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.xmpp.packet.JID;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
/**
* Class that defines possible remote operations that could be performed
* on remote component sessions (for external components only).
*
* @author Gaston Dombiak
*/
public class ComponentSessionTask extends RemoteSessionTask {
private JID address;
public ComponentSessionTask() {
}
protected ComponentSessionTask(JID address, Operation operation) {
super(operation);
this.address = address;
}
Session getSession() {
return SessionManager.getInstance().getComponentSession(address.getDomain());
}
public void run() {
super.run();
if (operation == Operation.getType) {
result = ((ComponentSession) getSession()).getExternalComponent().getType();
}
else if (operation == Operation.getCategory) {
result = ((ComponentSession) getSession()).getExternalComponent().getCategory();
}
else if (operation == Operation.getInitialSubdomain) {
result = ((ComponentSession) getSession()).getExternalComponent().getInitialSubdomain();
}
else if (operation == Operation.getSubdomains) {
result = ((ComponentSession) getSession()).getExternalComponent().getSubdomains();
}
else if (operation == Operation.getName) {
result = ((ComponentSession) getSession()).getExternalComponent().getName();
}
else if (operation == Operation.getDescription) {
result = ((ComponentSession) getSession()).getExternalComponent().getDescription();
}
else if (operation == Operation.start) {
((ComponentSession) getSession()).getExternalComponent().start();
}
else if (operation == Operation.shutdown) {
((ComponentSession) getSession()).getExternalComponent().shutdown();
}
}
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
ExternalizableUtil.getInstance().writeSafeUTF(out, address.toString());
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
address = new JID(ExternalizableUtil.getInstance().readSafeUTF(in));
}
public String toString() {
return super.toString() + " operation: " + operation + " address: " + address;
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.session.Session;
import org.xmpp.packet.JID;
/**
* Class that defines possible remote operations that could be performed
* on remote connection manager sessions.
*
* @author Gaston Dombiak
*/
public class ConnectionMultiplexerSessionTask extends RemoteSessionTask {
private JID address;
public ConnectionMultiplexerSessionTask() {
}
protected ConnectionMultiplexerSessionTask(JID address, Operation operation) {
super(operation);
this.address = address;
}
Session getSession() {
return SessionManager.getInstance().getConnectionMultiplexerSession(address);
}
public String toString() {
return super.toString() + " operation: " + operation + " address: " + address;
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.ClusterTask;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.xmpp.packet.JID;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
/**
* Cluster task that will ask a remote cluster node to deliver some raw text to a local session.
*
* @author Gaston Dombiak
*/
public class DeliverRawTextTask implements ClusterTask {
private SessionType sessionType;
private JID address;
private String streamID;
private String text;
public DeliverRawTextTask() {
super();
}
protected DeliverRawTextTask(RemoteSession remoteSession, JID address, String text) {
if (remoteSession instanceof RemoteClientSession) {
this.sessionType = SessionType.client;
}
else if (remoteSession instanceof RemoteOutgoingServerSession) {
this.sessionType = SessionType.outgoingServer;
}
else if (remoteSession instanceof RemoteComponentSession) {
this.sessionType = SessionType.component;
}
else if (remoteSession instanceof RemoteConnectionMultiplexerSession) {
this.sessionType = SessionType.connectionManager;
}
else {
Log.error("Invalid RemoteSession was used for task: " + remoteSession);
}
this.address = address;
this.text = text;
}
public DeliverRawTextTask(String streamID, String text) {
this.sessionType = SessionType.incomingServer;
this.streamID = streamID;
this.text = text;
}
public Object getResult() {
return null;
}
public void run() {
getSession().deliverRawText(text);
}
public void writeExternal(ObjectOutput out) throws IOException {
ExternalizableUtil.getInstance().writeSafeUTF(out, text);
ExternalizableUtil.getInstance().writeInt(out, sessionType.ordinal());
ExternalizableUtil.getInstance().writeBoolean(out, address != null);
if (address != null) {
ExternalizableUtil.getInstance().writeSafeUTF(out, address.toString());
}
ExternalizableUtil.getInstance().writeBoolean(out, streamID != null);
if (streamID != null) {
ExternalizableUtil.getInstance().writeSafeUTF(out, streamID);
}
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
text = ExternalizableUtil.getInstance().readSafeUTF(in);
sessionType = SessionType.values()[ExternalizableUtil.getInstance().readInt(in)];
if (ExternalizableUtil.getInstance().readBoolean(in)) {
address = new JID(ExternalizableUtil.getInstance().readSafeUTF(in));
}
if (ExternalizableUtil.getInstance().readBoolean(in)) {
streamID = ExternalizableUtil.getInstance().readSafeUTF(in);
}
}
Session getSession() {
if (sessionType == SessionType.client) {
return XMPPServer.getInstance().getRoutingTable().getClientRoute(address);
}
else if (sessionType == SessionType.component) {
return SessionManager.getInstance().getComponentSession(address.getDomain());
}
else if (sessionType == SessionType.connectionManager) {
return SessionManager.getInstance().getConnectionMultiplexerSession(address);
}
else if (sessionType == SessionType.outgoingServer) {
return SessionManager.getInstance().getOutgoingServerSession(address.getDomain());
}
else if (sessionType == SessionType.incomingServer) {
return SessionManager.getInstance().getIncomingServerSession(streamID);
}
Log.error("Found unknown session type: " + sessionType);
return null;
}
public String toString() {
return super.toString() + " sessionType: " + sessionType + " address: " + address;
}
private enum SessionType {
client,
outgoingServer,
incomingServer,
component,
connectionManager
}
}
\ No newline at end of file
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.session.IncomingServerSession;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.cache.ExternalizableUtil;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
/**
* Class that defines possible remote operations that could be performed
* on remote incoming server sessions.
*
* @author Gaston Dombiak
*/
public class IncomingServerSessionTask extends RemoteSessionTask {
private String streamID;
public IncomingServerSessionTask() {
super();
}
protected IncomingServerSessionTask(Operation operation, String streamID) {
super(operation);
this.streamID = streamID;
}
Session getSession() {
return SessionManager.getInstance().getIncomingServerSession(streamID);
}
public void run() {
super.run();
if (operation == Operation.getLocalDomain) {
result = ((IncomingServerSession) getSession()).getLocalDomain();
}
else if (operation == Operation.getAddress) {
result = getSession().getAddress();
}
}
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
ExternalizableUtil.getInstance().writeSafeUTF(out, streamID);
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
streamID = ExternalizableUtil.getInstance().readSafeUTF(in);
}
public String toString() {
return super.toString() + " operation: " + operation + " streamID: " + streamID;
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.session.OutgoingServerSession;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.xmpp.packet.JID;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
/**
* Class that defines possible remote operations that could be performed
* on remote outgoing server sessions.
*
* @author Gaston Dombiak
*/
public class OutgoingServerSessionTask extends RemoteSessionTask {
private JID address;
public OutgoingServerSessionTask() {
}
protected OutgoingServerSessionTask(JID address, Operation operation) {
super(operation);
this.address = address;
}
Session getSession() {
return SessionManager.getInstance().getOutgoingServerSession(address.getDomain());
}
public void run() {
super.run();
if (operation == Operation.getAuthenticatedDomains) {
result = ((OutgoingServerSession) getSession()).getAuthenticatedDomains();
}
else if (operation == Operation.getHostnames) {
result = ((OutgoingServerSession) getSession()).getHostnames();
}
else if (operation == Operation.isUsingServerDialback) {
result = ((OutgoingServerSession) getSession()).isUsingServerDialback();
}
}
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
ExternalizableUtil.getInstance().writeSafeUTF(out, address.toString());
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
address = new JID(ExternalizableUtil.getInstance().readSafeUTF(in));
}
public String toString() {
return super.toString() + " operation: " + operation + " address: " + address;
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.openfire.session;
import org.dom4j.Element;
import org.dom4j.tree.DefaultElement;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.ClusterTask;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.xmpp.packet.*;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
/**
* Cluster task that will ask a remote cluster node to deliver some packet to a local session.
*
* @author Gaston Dombiak
*/
public class ProcessPacketTask implements ClusterTask {
private SessionType sessionType;
private JID address;
private String streamID;
private Packet packet;
public ProcessPacketTask() {
super();
}
protected ProcessPacketTask(RemoteSession remoteSession, JID address, Packet packet) {
if (remoteSession instanceof RemoteClientSession) {
this.sessionType = SessionType.client;
}
else if (remoteSession instanceof RemoteOutgoingServerSession) {
this.sessionType = SessionType.outgoingServer;
}
else if (remoteSession instanceof RemoteComponentSession) {
this.sessionType = SessionType.component;
}
else if (remoteSession instanceof RemoteConnectionMultiplexerSession) {
this.sessionType = SessionType.connectionManager;
}
else {
Log.error("Invalid RemoteSession was used for task: " + remoteSession);
}
this.address = address;
this.packet = packet;
}
protected ProcessPacketTask(String streamID, Packet packet) {
this.sessionType = SessionType.incomingServer;
this.streamID = streamID;
this.packet = packet;
}
public Object getResult() {
return null;
}
public void run() {
getSession().process(packet);
}
public void writeExternal(ObjectOutput out) throws IOException {
ExternalizableUtil.getInstance().writeBoolean(out, address != null);
if (address != null) {
ExternalizableUtil.getInstance().writeSafeUTF(out, address.toString());
}
ExternalizableUtil.getInstance().writeBoolean(out, streamID != null);
if (streamID != null) {
ExternalizableUtil.getInstance().writeSafeUTF(out, streamID);
}
ExternalizableUtil.getInstance().writeInt(out, sessionType.ordinal());
if (packet instanceof IQ) {
ExternalizableUtil.getInstance().writeInt(out, 1);
} else if (packet instanceof Message) {
ExternalizableUtil.getInstance().writeInt(out, 2);
} else if (packet instanceof Presence) {
ExternalizableUtil.getInstance().writeInt(out, 3);
}
ExternalizableUtil.getInstance().writeSerializable(out, (DefaultElement) packet.getElement());
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
if (ExternalizableUtil.getInstance().readBoolean(in)) {
address = new JID(ExternalizableUtil.getInstance().readSafeUTF(in));
}
if (ExternalizableUtil.getInstance().readBoolean(in)) {
streamID = ExternalizableUtil.getInstance().readSafeUTF(in);
}
sessionType = SessionType.values()[ExternalizableUtil.getInstance().readInt(in)];
int packetType = ExternalizableUtil.getInstance().readInt(in);
Element packetElement = (Element) ExternalizableUtil.getInstance().readSerializable(in);
switch (packetType) {
case 1:
packet = new IQ(packetElement, true);
break;
case 2:
packet = new Message(packetElement, true);
break;
case 3:
packet = new Presence(packetElement, true);
break;
}
}
Session getSession() {
if (sessionType == SessionType.client) {
return XMPPServer.getInstance().getRoutingTable().getClientRoute(address);
}
else if (sessionType == SessionType.component) {
return SessionManager.getInstance().getComponentSession(address.getDomain());
}
else if (sessionType == SessionType.connectionManager) {
return SessionManager.getInstance().getConnectionMultiplexerSession(address);
}
else if (sessionType == SessionType.outgoingServer) {
return SessionManager.getInstance().getOutgoingServerSession(address.getDomain());
}
else if (sessionType == SessionType.incomingServer) {
return SessionManager.getInstance().getIncomingServerSession(streamID);
}
Log.error("Found unknown session type: " + sessionType);
return null;
}
public String toString() {
return super.toString() + " sessionType: " + sessionType + " address: " + address;
}
private enum SessionType {
client,
outgoingServer,
incomingServer,
component,
connectionManager
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.openfire.session;
import org.dom4j.Element;
import org.dom4j.tree.DefaultElement;
import org.jivesoftware.openfire.component.InternalComponentManager;
import org.jivesoftware.openfire.session.ComponentSession;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.ClusterTask;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.xmpp.component.ComponentException;
import org.xmpp.component.ComponentManager;
import org.xmpp.packet.*;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
/**
* Surrogate for sessions of external components hosted in some remote cluster node.
*
* @author Gaston Dombiak
*/
public class RemoteComponentSession extends RemoteSession implements ComponentSession {
private ExternalComponent component;
public RemoteComponentSession(byte[] nodeID, JID address) {
super(nodeID, address);
component = new RemoteExternalComponent(address);
}
public ExternalComponent getExternalComponent() {
return component;
}
RemoteSessionTask getRemoteSessionTask(RemoteSessionTask.Operation operation) {
return new ComponentSessionTask(address, operation);
}
ClusterTask getDeliverRawTextTask(String text) {
return new DeliverRawTextTask(this, address, text);
}
ClusterTask getProcessPacketTask(Packet packet) {
return new ProcessPacketTask(this, address, packet);
}
private class RemoteExternalComponent implements ExternalComponent {
private JID address;
public RemoteExternalComponent(JID address) {
this.address = address;
}
public void setName(String name) {
RemoteSessionTask task = new SetterTask(address, SetterTask.Type.name, name);
doClusterTask(task);
}
public String getType() {
ClusterTask task = new ComponentSessionTask(address, RemoteSessionTask.Operation.getType);
return (String) doSynchronousClusterTask(task);
}
public void setType(String type) {
RemoteSessionTask task = new SetterTask(address, SetterTask.Type.type, type);
doClusterTask(task);
}
public String getCategory() {
ClusterTask task = new ComponentSessionTask(address, RemoteSessionTask.Operation.getCategory);
return (String) doSynchronousClusterTask(task);
}
public void setCategory(String category) {
RemoteSessionTask task = new SetterTask(address, SetterTask.Type.catergory, category);
doClusterTask(task);
}
public String getInitialSubdomain() {
ClusterTask task =
new ComponentSessionTask(address, RemoteSessionTask.Operation.getInitialSubdomain);
return (String) doSynchronousClusterTask(task);
}
public Collection<String> getSubdomains() {
ClusterTask task = new ComponentSessionTask(address, RemoteSessionTask.Operation.getSubdomains);
return (Collection<String>) doSynchronousClusterTask(task);
}
public String getName() {
ClusterTask task = new ComponentSessionTask(address, RemoteSessionTask.Operation.getName);
return (String) doSynchronousClusterTask(task);
}
public String getDescription() {
ClusterTask task = new ComponentSessionTask(address, RemoteSessionTask.Operation.getDescription);
return (String) doSynchronousClusterTask(task);
}
public void processPacket(Packet packet) {
RemoteSessionTask task = new ProcessComponentPacketTask(address, packet);
doClusterTask(task);
}
public void initialize(JID jid, ComponentManager componentManager) throws ComponentException {
RemoteSessionTask task = new InitializeTask(address, jid);
doClusterTask(task);
}
public void start() {
RemoteSessionTask task = new ComponentSessionTask(address, RemoteSessionTask.Operation.start);
doClusterTask(task);
}
public void shutdown() {
RemoteSessionTask task = new ComponentSessionTask(address, RemoteSessionTask.Operation.shutdown);
doClusterTask(task);
}
}
private static class SetterTask extends ComponentSessionTask {
private Type type;
private String value;
public SetterTask() {
super();
}
protected SetterTask(JID address, Type type, String value) {
super(address, null);
this.type = type;
this.value = value;
}
public void run() {
if (type == Type.name) {
((ComponentSession) getSession()).getExternalComponent().setName(value);
} else if (type == Type.type) {
((ComponentSession) getSession()).getExternalComponent().setType(value);
} else if (type == Type.catergory) {
((ComponentSession) getSession()).getExternalComponent().setCategory(value);
}
}
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
ExternalizableUtil.getInstance().writeInt(out, type.ordinal());
ExternalizableUtil.getInstance().writeBoolean(out, value != null);
if (value != null) {
ExternalizableUtil.getInstance().writeSafeUTF(out, value);
}
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
type = Type.values()[ExternalizableUtil.getInstance().readInt(in)];
if (ExternalizableUtil.getInstance().readBoolean(in)) {
value = ExternalizableUtil.getInstance().readSafeUTF(in);
}
}
private static enum Type {
name,
type,
catergory
}
}
private static class ProcessComponentPacketTask extends ComponentSessionTask {
private Packet packet;
public ProcessComponentPacketTask() {
super();
}
protected ProcessComponentPacketTask(JID address, Packet packet) {
super(address, null);
this.packet = packet;
}
public void run() {
((ComponentSession) getSession()).getExternalComponent().processPacket(packet);
}
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
if (packet instanceof IQ) {
ExternalizableUtil.getInstance().writeInt(out, 1);
} else if (packet instanceof Message) {
ExternalizableUtil.getInstance().writeInt(out, 2);
} else if (packet instanceof Presence) {
ExternalizableUtil.getInstance().writeInt(out, 3);
}
ExternalizableUtil.getInstance().writeSerializable(out, (DefaultElement) packet.getElement());
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
int packetType = ExternalizableUtil.getInstance().readInt(in);
Element packetElement = (Element) ExternalizableUtil.getInstance().readSerializable(in);
switch (packetType) {
case 1:
packet = new IQ(packetElement, true);
break;
case 2:
packet = new Message(packetElement, true);
break;
case 3:
packet = new Presence(packetElement, true);
break;
}
}
}
private static class InitializeTask extends ComponentSessionTask {
private JID componentJID;
public InitializeTask() {
super();
}
protected InitializeTask(JID address, JID componentJID) {
super(address, null);
this.componentJID = componentJID;
}
public void run() {
try {
((ComponentSession) getSession()).getExternalComponent()
.initialize(componentJID, InternalComponentManager.getInstance());
} catch (ComponentException e) {
Log.error("Error initializing component", e);
}
}
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
ExternalizableUtil.getInstance().writeSafeUTF(out, componentJID.toString());
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
componentJID = new JID(ExternalizableUtil.getInstance().readSafeUTF(in));
}
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.session.ConnectionMultiplexerSession;
import org.jivesoftware.util.cache.ClusterTask;
import org.xmpp.packet.JID;
import org.xmpp.packet.Packet;
/**
* Surrogate for connection manager sessions hosted in some remote cluster node.
*
* @author Gaston Dombiak
*/
public class RemoteConnectionMultiplexerSession extends RemoteSession implements ConnectionMultiplexerSession {
public RemoteConnectionMultiplexerSession(byte[] nodeID, JID address) {
super(nodeID, address);
}
RemoteSessionTask getRemoteSessionTask(RemoteSessionTask.Operation operation) {
return new ConnectionMultiplexerSessionTask(address, operation);
}
ClusterTask getDeliverRawTextTask(String text) {
return new DeliverRawTextTask(this, address, text);
}
ClusterTask getProcessPacketTask(Packet packet) {
return new ProcessPacketTask(this, address, packet);
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.session.IncomingServerSession;
import org.jivesoftware.util.cache.ClusterTask;
import org.xmpp.packet.JID;
import org.xmpp.packet.Packet;
import java.util.Collection;
/**
* Surrogate for incoming server sessions hosted in some remote cluster node.
*
* @author Gaston Dombiak
*/
public class RemoteIncomingServerSession extends RemoteSession implements IncomingServerSession {
private String localDomain;
public RemoteIncomingServerSession(byte[] nodeID, String streamID) {
super(nodeID, null);
this.streamID = new BasicStreamID(streamID);
}
public JID getAddress() {
if (address == null) {
RemoteSessionTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getAddress);
address = (JID) doSynchronousClusterTask(task);
}
return address;
}
public Collection<String> getValidatedDomains() {
// Content is stored in a clustered cache so that even in the case of the node hosting
// the sessions is lost we can still have access to this info to be able to perform
// proper clean up logic {@link ClusterListener#cleanupNode(NodeCacheKey)
return SessionManager.getInstance().getValidatedDomains(streamID.getID());
}
public String getLocalDomain() {
if (localDomain == null) {
RemoteSessionTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getLocalDomain);
localDomain = (String) doSynchronousClusterTask(task);
}
return localDomain;
}
RemoteSessionTask getRemoteSessionTask(RemoteSessionTask.Operation operation) {
return new IncomingServerSessionTask(operation, streamID.getID());
}
ClusterTask getDeliverRawTextTask(String text) {
return new DeliverRawTextTask(streamID.getID(), text);
}
ClusterTask getProcessPacketTask(Packet packet) {
return new ProcessPacketTask(streamID.getID(), packet);
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.session.OutgoingServerSession;
import org.jivesoftware.util.cache.ClusterTask;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.xmpp.packet.JID;
import org.xmpp.packet.Packet;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
/**
* Surrogate for outgoing server sessions hosted in some remote cluster node.
*
* @author Gaston Dombiak
*/
public class RemoteOutgoingServerSession extends RemoteSession implements OutgoingServerSession {
private long usingServerDialback = -1;
public RemoteOutgoingServerSession(byte[] nodeID, JID address) {
super(nodeID, address);
}
public Collection<String> getAuthenticatedDomains() {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getAuthenticatedDomains);
return (Collection<String>) doSynchronousClusterTask(task);
}
public void addAuthenticatedDomain(String domain) {
doClusterTask(new AddAuthenticatedDomainTask(address, domain));
}
public Collection<String> getHostnames() {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getHostnames);
return (Collection<String>) doSynchronousClusterTask(task);
}
public void addHostname(String hostname) {
doClusterTask(new AddHostnameTask(address, hostname));
}
public boolean authenticateSubdomain(String domain, String hostname) {
ClusterTask task = new AuthenticateSubdomainTask(address, domain, hostname);
return (Boolean) doSynchronousClusterTask(task);
}
public boolean isUsingServerDialback() {
if (usingServerDialback == -1) {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.isUsingServerDialback);
usingServerDialback = (Boolean) doSynchronousClusterTask(task) ? 1 : 0;
}
return usingServerDialback == 1;
}
RemoteSessionTask getRemoteSessionTask(RemoteSessionTask.Operation operation) {
return new OutgoingServerSessionTask(address, operation);
}
ClusterTask getDeliverRawTextTask(String text) {
return new DeliverRawTextTask(this, address, text);
}
ClusterTask getProcessPacketTask(Packet packet) {
return new ProcessPacketTask(this, address, packet);
}
private static class AddAuthenticatedDomainTask extends OutgoingServerSessionTask {
private String domain;
public AddAuthenticatedDomainTask() {
super();
}
protected AddAuthenticatedDomainTask(JID address, String domain) {
super(address, null);
this.domain = domain;
}
public void run() {
((OutgoingServerSession) getSession()).addAuthenticatedDomain(domain);
}
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
ExternalizableUtil.getInstance().writeSafeUTF(out, domain);
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
domain = ExternalizableUtil.getInstance().readSafeUTF(in);
}
}
private static class AddHostnameTask extends OutgoingServerSessionTask {
private String hostname;
public AddHostnameTask() {
super();
}
protected AddHostnameTask(JID address, String hostname) {
super(address, null);
this.hostname = hostname;
}
public void run() {
((OutgoingServerSession) getSession()).addHostname(hostname);
}
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
ExternalizableUtil.getInstance().writeSafeUTF(out, hostname);
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
hostname = ExternalizableUtil.getInstance().readSafeUTF(in);
}
}
private static class AuthenticateSubdomainTask extends OutgoingServerSessionTask {
private String domain;
private String hostname;
public AuthenticateSubdomainTask() {
super();
}
protected AuthenticateSubdomainTask(JID address, String domain, String hostname) {
super(address, null);
this.domain = domain;
this.hostname = hostname;
}
public void run() {
result = ((OutgoingServerSession) getSession()).authenticateSubdomain(domain, hostname);
}
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
ExternalizableUtil.getInstance().writeSafeUTF(out, domain);
ExternalizableUtil.getInstance().writeSafeUTF(out, hostname);
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
domain = ExternalizableUtil.getInstance().readSafeUTF(in);
hostname = ExternalizableUtil.getInstance().readSafeUTF(in);
}
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.cache.ClusterTask;
import org.xmpp.packet.JID;
import org.xmpp.packet.Packet;
import java.net.UnknownHostException;
import java.util.Date;
/**
* Base class for sessions being hosted in other cluster nodes. Almost all
* messages will be forwarded to the actual session in some remote cluster node.
* Only some few messages will be local operations like getting the session's address
* or the session status. And only some operations will be cached locally for a brief
* period for content that is highly used and not frequently modified.
*
* @author Gaston Dombiak
*/
public abstract class RemoteSession implements Session {
protected byte[] nodeID;
protected JID address;
// Cache content that never changes
protected StreamID streamID;
private Date creationDate;
private String serverName;
private String hostAddress;
private String hostName;
public RemoteSession(byte[] nodeID, JID address) {
this.nodeID = nodeID;
this.address = address;
}
public JID getAddress() {
return address;
}
/**
* Remote sessions are always authenticated. Otherwise, they won't be visibile to other
* cluster nodes. When the session is closed it will no longer be visible to other nodes
* so {@link #STATUS_CLOSED} is never returned.
*
* @return the authenticated status.
*/
public int getStatus() {
return STATUS_AUTHENTICATED;
}
public StreamID getStreamID() {
// Get it once and cache it since it never changes
if (streamID == null) {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getStreamID);
String id = (String) doSynchronousClusterTask(task);
streamID = new BasicStreamID(id);
}
return streamID;
}
public String getServerName() {
if (serverName == null) {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getServerName);
serverName = (String) doSynchronousClusterTask(task);
}
return serverName;
}
public Date getCreationDate() {
// Get it once and cache it since it never changes
if (creationDate == null) {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getCreationDate);
creationDate = (Date) doSynchronousClusterTask(task);
}
return creationDate;
}
public Date getLastActiveDate() {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getLastActiveDate);
return (Date) doSynchronousClusterTask(task);
}
public long getNumClientPackets() {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getNumClientPackets);
return (Long) doSynchronousClusterTask(task);
}
public long getNumServerPackets() {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getNumServerPackets);
return (Long) doSynchronousClusterTask(task);
}
public void process(Packet packet) {
doClusterTask(getProcessPacketTask(packet));
}
public void close() {
doSynchronousClusterTask(getRemoteSessionTask(RemoteSessionTask.Operation.close));
}
public boolean isClosed() {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.isClosed);
return (Boolean) doSynchronousClusterTask(task);
}
public boolean isSecure() {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.isSecure);
return (Boolean) doSynchronousClusterTask(task);
}
public String getHostAddress() throws UnknownHostException {
if (hostAddress == null) {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getHostAddress);
hostAddress = (String) doSynchronousClusterTask(task);
}
return hostAddress;
}
public String getHostName() throws UnknownHostException {
if (hostName == null) {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.getHostName);
hostName = (String) doSynchronousClusterTask(task);
}
return hostName;
}
public void deliverRawText(String text) {
doClusterTask(getDeliverRawTextTask(text));
}
public boolean validate() {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.validate);
return (Boolean) doSynchronousClusterTask(task);
}
abstract RemoteSessionTask getRemoteSessionTask(RemoteSessionTask.Operation operation);
abstract ClusterTask getDeliverRawTextTask(String text);
abstract ClusterTask getProcessPacketTask(Packet packet);
/**
* Invokes a task on the remote cluster member synchronously and returns the result of
* the remote operation.
*
* @param task the ClusterTask object to be invoked on a given cluster member.
* @return result of remote operation.
* @throws IllegalStateException if requested node was not found or not running in a cluster.
*/
protected Object doSynchronousClusterTask(ClusterTask task) {
return CacheFactory.doSynchronousClusterTask(task, nodeID);
}
/**
* Invokes a task on the remote cluster member in an asynchronous fashion.
*
* @param task the task to be invoked on the specified cluster member.
* @throws IllegalStateException if requested node was not found or not running in a cluster.
*/
protected void doClusterTask(ClusterTask task) {
CacheFactory.doClusterTask(task, nodeID);
}
/**
* Simple implementation of the StreamID interface to hold the stream ID of
* the surrogated session.
*/
protected static class BasicStreamID implements StreamID {
String id;
public BasicStreamID(String id) {
this.id = id;
}
public String getID() {
return id;
}
public String toString() {
return id;
}
public int hashCode() {
return id.hashCode();
}
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.openfire.session;
import org.jivesoftware.openfire.session.*;
import org.xmpp.packet.JID;
/**
* Locator of sessions that know how to talk to Hazelcast cluster nodes.
*
* @author Gaston Dombiak
*/
public class RemoteSessionLocator implements org.jivesoftware.openfire.session.RemoteSessionLocator {
// TODO Keep a cache for a brief moment so we can reuse same instances (that use their own cache)
public ClientSession getClientSession(byte[] nodeID, JID address) {
return new RemoteClientSession(nodeID, address);
}
public ComponentSession getComponentSession(byte[] nodeID, JID address) {
return new RemoteComponentSession(nodeID, address);
}
public ConnectionMultiplexerSession getConnectionMultiplexerSession(byte[] nodeID, JID address) {
return new RemoteConnectionMultiplexerSession(nodeID, address);
}
public IncomingServerSession getIncomingServerSession(byte[] nodeID, String streamID) {
return new RemoteIncomingServerSession(nodeID, streamID);
}
public OutgoingServerSession getOutgoingServerSession(byte[] nodeID, JID address) {
return new RemoteOutgoingServerSession(nodeID, address);
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007-2009 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jivesoftware.util.cache;
import java.util.Set;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.NodeID;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
/**
* Base listener for cache events in the cluster. This class helps keep track
* of nodes and their elements. The actual tracking information is kept in
* {@link ClusterListener}. This information is then used when a node goes
* down to proper clean up can be done.
*
* @author Tom Evans
* @author Pete Matern
* @author Gaston Dombiak
*/
class CacheListener implements EntryListener {
protected final String cacheName;
private ClusterListener clusterListener;
public CacheListener(ClusterListener clusterListener, String cacheName) {
this.clusterListener = clusterListener;
this.cacheName = cacheName;
}
@Override
public void entryAdded(EntryEvent event) {
handleMapEvent(event, false);
}
@Override
public void entryUpdated(EntryEvent event) {
handleMapEvent(event, false);
}
@Override
public void entryRemoved(EntryEvent event) {
handleMapEvent(event, true);
}
@Override
public void entryEvicted(EntryEvent event) {
handleMapEvent(event, true);
}
void handleMapEvent(EntryEvent event, boolean removal) {
NodeID nodeID = NodeID.getInstance(event.getMember().getUuid().getBytes());
//ignore items which this node has added
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
Set<String> sessionJIDS = clusterListener.lookupJIDList(nodeID, cacheName);
if (removal) {
sessionJIDS.remove(event.getKey().toString());
}
else {
sessionJIDS.add(event.getKey().toString());
}
}
}
}
This diff is collapsed.
......@@ -283,12 +283,12 @@
%>
<tr class="<%= (isLocalMember ? "local" : "") %>" valign="middle">
<td align="center" width="1%">
<a href="plugins/clustering/system-clustering-node.jsp?UID=<%= nodeID %>"
<a href="plugins/<%= CacheFactory.getPluginName() %>/system-clustering-node.jsp?UID=<%= nodeID %>"
title="Click for more details"
><img src="images/server-network-24x24.gif" width="24" height="24" border="0" alt=""></a>
</td>
<td class="jive-description" nowrap width="1%" valign="middle">
<a href="plugins/clustering/system-clustering-node.jsp?UID=<%= nodeID %>">
<a href="plugins/<%= CacheFactory.getPluginName() %>/system-clustering-node.jsp?UID=<%= nodeID %>">
<% if (isLocalMember) { %>
<b><%= nodeInfo.getHostName() %></b>
<% } else { %>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment