Commit 62004133 authored by Tom Evans's avatar Tom Evans Committed by tevans

OF-590: Improved exception handling and error recovery in core clustering...

OF-590: Improved exception handling and error recovery in core clustering classes; retooled cache configuration to prevent eviction of core components; updated versions and documentation for clustering and hazelcast plugins

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@13392 b35dd754-fafc-0310-a699-88a17e54d16e
parent 70078b67
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
package org.jivesoftware.openfire; package org.jivesoftware.openfire;
import java.util.StringTokenizer;
import org.jivesoftware.openfire.container.BasicModule; import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.interceptor.InterceptorManager; import org.jivesoftware.openfire.interceptor.InterceptorManager;
import org.jivesoftware.openfire.interceptor.PacketRejectedException; import org.jivesoftware.openfire.interceptor.PacketRejectedException;
...@@ -27,13 +29,13 @@ import org.jivesoftware.openfire.session.ClientSession; ...@@ -27,13 +29,13 @@ import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.user.UserManager; import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID; import org.xmpp.packet.JID;
import org.xmpp.packet.Message; import org.xmpp.packet.Message;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError; import org.xmpp.packet.PacketError;
import java.util.StringTokenizer;
/** /**
* <p>Route message packets throughout the server.</p> * <p>Route message packets throughout the server.</p>
* <p>Routing is based on the recipient and sender addresses. The typical * <p>Routing is based on the recipient and sender addresses. The typical
...@@ -44,6 +46,8 @@ import java.util.StringTokenizer; ...@@ -44,6 +46,8 @@ import java.util.StringTokenizer;
* @author Iain Shigeoka * @author Iain Shigeoka
*/ */
public class MessageRouter extends BasicModule { public class MessageRouter extends BasicModule {
private static Logger log = LoggerFactory.getLogger(MessageRouter.class);
private OfflineMessageStrategy messageStrategy; private OfflineMessageStrategy messageStrategy;
private RoutingTable routingTable; private RoutingTable routingTable;
...@@ -105,6 +109,7 @@ public class MessageRouter extends BasicModule { ...@@ -105,6 +109,7 @@ public class MessageRouter extends BasicModule {
routingTable.routePacket(recipientJID, packet, false); routingTable.routePacket(recipientJID, packet, false);
} }
catch (Exception e) { catch (Exception e) {
log.error("Failed to route packet: " + packet.toXML(), e);
routingFailed(recipientJID, packet); routingFailed(recipientJID, packet);
} }
} }
......
...@@ -47,6 +47,7 @@ import org.jivesoftware.openfire.admin.AdminManager; ...@@ -47,6 +47,7 @@ import org.jivesoftware.openfire.admin.AdminManager;
import org.jivesoftware.openfire.audit.AuditManager; import org.jivesoftware.openfire.audit.AuditManager;
import org.jivesoftware.openfire.audit.spi.AuditManagerImpl; import org.jivesoftware.openfire.audit.spi.AuditManagerImpl;
import org.jivesoftware.openfire.clearspace.ClearspaceManager; import org.jivesoftware.openfire.clearspace.ClearspaceManager;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.cluster.NodeID; import org.jivesoftware.openfire.cluster.NodeID;
import org.jivesoftware.openfire.commands.AdHocCommandHandler; import org.jivesoftware.openfire.commands.AdHocCommandHandler;
import org.jivesoftware.openfire.component.InternalComponentManager; import org.jivesoftware.openfire.component.InternalComponentManager;
...@@ -930,6 +931,7 @@ public class XMPPServer { ...@@ -930,6 +931,7 @@ public class XMPPServer {
*/ */
private void shutdownServer() { private void shutdownServer() {
shuttingDown = true; shuttingDown = true;
ClusterManager.shutdown();
// Notify server listeners that the server is about to be stopped // Notify server listeners that the server is about to be stopped
for (XMPPServerListener listener : listeners) { for (XMPPServerListener listener : listeners) {
listener.serverStopping(); listener.serverStopping();
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
package org.jivesoftware.openfire.cluster; package org.jivesoftware.openfire.cluster;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.ExternalizableUtil; import org.jivesoftware.util.cache.ExternalizableUtil;
import java.io.Externalizable; import java.io.Externalizable;
...@@ -96,7 +97,7 @@ public class NodeID implements Externalizable { ...@@ -96,7 +97,7 @@ public class NodeID implements Externalizable {
@Override @Override
public String toString() { public String toString() {
return new String(nodeID); return StringUtils.getString(nodeID);
} }
public byte[] toByteArray() { public byte[] toByteArray() {
......
...@@ -792,13 +792,13 @@ public class MultiUserChatManager extends BasicModule implements ClusterEventLis ...@@ -792,13 +792,13 @@ public class MultiUserChatManager extends BasicModule implements ClusterEventLis
} }
} }
} }
@SuppressWarnings("unchecked")
public void joinedCluster(byte[] nodeID) { public void joinedCluster(byte[] nodeID) {
@SuppressWarnings("unchecked") Object result = CacheFactory.doSynchronousClusterTask(new GetNewMemberRoomsRequest(), nodeID);
List<RoomInfo> result = if (result instanceof List<?>) {
(List<RoomInfo>) CacheFactory.doSynchronousClusterTask(new GetNewMemberRoomsRequest(), nodeID); List<RoomInfo> rooms = (List<RoomInfo>) result;
if (result != null) { for (RoomInfo roomInfo : rooms) {
for (RoomInfo roomInfo : result) {
LocalMUCRoom remoteRoom = roomInfo.getRoom(); LocalMUCRoom remoteRoom = roomInfo.getRoom();
MultiUserChatServiceImpl service = (MultiUserChatServiceImpl)remoteRoom.getMUCService(); MultiUserChatServiceImpl service = (MultiUserChatServiceImpl)remoteRoom.getMUCService();
LocalMUCRoom localRoom = service.getLocalChatRoom(remoteRoom.getName()); LocalMUCRoom localRoom = service.getLocalChatRoom(remoteRoom.getName());
......
...@@ -300,7 +300,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust ...@@ -300,7 +300,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
routed = false; routed = false;
} }
else { else {
if (server.getNodeID().equals(clientRoute.getNodeID())) { if (localRoutingTable.isLocalRoute(jid)) {
// This is a route to a local user hosted in this node // This is a route to a local user hosted in this node
try { try {
localRoutingTable.getRoute(jid.toString()).process(packet); localRoutingTable.getRoute(jid.toString()).process(packet);
......
...@@ -1113,8 +1113,7 @@ public class StringUtils { ...@@ -1113,8 +1113,7 @@ public class StringUtils {
* &lt; &gt; &quot; ' % ; ) ( &amp; + - * &lt; &gt; &quot; ' % ; ) ( &amp; + -
* </pre> * </pre>
* *
* @param string * @param input the string to be scrubbed
* input
* @return Input without certain characters; * @return Input without certain characters;
*/ */
public static String removeXSSCharacters(String input) { public static String removeXSSCharacters(String input) {
...@@ -1125,4 +1124,37 @@ public class StringUtils { ...@@ -1125,4 +1124,37 @@ public class StringUtils {
} }
return input; return input;
} }
/**
* Returns the UTF-8 bytes for the given String, suppressing
* UnsupportedEncodingException (in lieu of log message)
*
* @param input The source string
* @return The UTF-8 encoding for the given string
*/
public static byte[] getBytes(String input) {
try {
return input.getBytes("UTF-8");
} catch (UnsupportedEncodingException uee) {
Log.warn("Unable to encode string using UTF-8: " + input);
return input.getBytes(); // default encoding
}
}
/**
* Returns the UTF-8 String for the given byte array, suppressing
* UnsupportedEncodingException (in lieu of log message)
*
* @param input The source byte array
* @return The UTF-8 encoded String for the given byte array
*/
public static String getString(byte[] input) {
try {
return new String(input, "UTF-8");
} catch (UnsupportedEncodingException uee) {
String result = new String(input); // default encoding
Log.warn("Unable to decode byte array using UTF-8: " + result);
return result;
}
}
} }
\ No newline at end of file
...@@ -565,6 +565,15 @@ public class CacheFactory { ...@@ -565,6 +565,15 @@ public class CacheFactory {
public static Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) { public static Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) {
return cacheFactoryStrategy.doSynchronousClusterTask(task, nodeID); return cacheFactoryStrategy.doSynchronousClusterTask(task, nodeID);
} }
/**
* Returns the node info for the given cluster node
* @param nodeID The target cluster node
* @return The info for the cluster node or null if not found
*/
public static ClusterNodeInfo getClusterNodeInfo(byte[] nodeID) {
return cacheFactoryStrategy.getClusterNodeInfo(nodeID);
}
public static String getPluginName() { public static String getPluginName() {
return cacheFactoryStrategy.getPluginName(); return cacheFactoryStrategy.getPluginName();
......
...@@ -188,4 +188,11 @@ public interface CacheFactoryStrategy { ...@@ -188,4 +188,11 @@ public interface CacheFactoryStrategy {
* @return the plugin name for this clustering implementation * @return the plugin name for this clustering implementation
*/ */
String getPluginName(); String getPluginName();
/**
* Returns the node info for the given cluster node
* @param nodeID The target cluster node
* @return The info for the cluster node, or null if not found
*/
ClusterNodeInfo getClusterNodeInfo(byte[] nodeID);
} }
...@@ -204,4 +204,9 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy { ...@@ -204,4 +204,9 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy {
this.lock = lock; this.lock = lock;
} }
} }
public ClusterNodeInfo getClusterNodeInfo(byte[] nodeID) {
// not clustered
return null;
}
} }
...@@ -44,6 +44,12 @@ ...@@ -44,6 +44,12 @@
Clustering Plugin Changelog Clustering Plugin Changelog
</h1> </h1>
<p><b>1.2.3</b> -- January 8, 2013</p>
<p><b>NOTE</b>: This plugin has been deprecated and is not actively maintained. See the
<a href="readme.html">README</a> document for more information.</p>
<ul>
<li>Updated plugin sources to be compatible with Openfire 3.7.2.</li>
</ul>
<p><b>1.2.2</b> -- Aug 31, 2012</p> <p><b>1.2.2</b> -- Aug 31, 2012</p>
<ul> <ul>
......
...@@ -5,8 +5,8 @@ ...@@ -5,8 +5,8 @@
<name>${plugin.name}</name> <name>${plugin.name}</name>
<description>${plugin.description}</description> <description>${plugin.description}</description>
<author>Jive Software</author> <author>Jive Software</author>
<version>1.2.2</version> <version>1.2.3</version>
<date>08/31/2012</date> <date>01/09/2013</date>
<minServerVersion>3.7.2</minServerVersion> <minServerVersion>3.7.2</minServerVersion>
</plugin> </plugin>
...@@ -54,6 +54,20 @@ ...@@ -54,6 +54,20 @@
Clustering Plugin Readme Clustering Plugin Readme
</h1> </h1>
<div id="datatable">
<p class="name">&nbsp;<br />
<b>PLEASE NOTE</b> -- Clustering Plugin Users:<br /><br />
Starting with Openfire 3.7.2, this Coherence-based clustering plugin
has been deprecated in favor of the new Hazelcast-based plugin (/plugins/hazelcast).
Refer to the Hazelcast plugin documentation or
<a href="http://community.igniterealtime.org/message/224947#224947">this community post</a>
for additional information.<br /><br />
This plugin has been updated and is expected to be compatible with Openfire 3.7.2. However,
please be advised that <b>no functional testing</b> has been performed on the latest version.
<i>This plugin is no longer actively maintained.</i><br />&nbsp;
</p>
</div>
<h2>Overview</h2> <h2>Overview</h2>
<p> <p>
......
...@@ -332,12 +332,31 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -332,12 +332,31 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
} }
} }
@Override
public String getPluginName() { public String getPluginName() {
return "clustering"; return "clustering";
} }
private static Invocable buildInvocable(final ClusterTask task) { public ClusterNodeInfo getClusterNodeInfo(byte[] nodeID) {
// Get members of the service
Set setMembers = taskService.getInfo().getServiceMembers();
Member member = null;
// Find the member matching the requested nodeID
for (Iterator it=setMembers.iterator(); it.hasNext();) {
member = (Member) it.next();
if (Arrays.equals(member.getUid().toByteArray(), nodeID)) {
break;
}
}
// Check that the requested member was found
if (member != null) {
return new CoherenceClusterNodeInfo(member);
} else {
return null;
}
}
private static Invocable buildInvocable(final ClusterTask task) {
return new AbstractInvocable() { return new AbstractInvocable() {
public void run() { public void run() {
task.run(); task.run();
......
...@@ -44,6 +44,19 @@ ...@@ -44,6 +44,19 @@
Hazelcast Clustering Plugin Changelog Hazelcast Clustering Plugin Changelog
</h1> </h1>
<p><b>1.0.2</b> -- January 8, 2013</p>
<p>This release addresses a number of issues and other feedback received via the
<a href="http://community.igniterealtime.org/message/224947#224947">Hazelcast announcement</a>
posted in the Openfire community forum:</p>
<ul>
<li>Fixed cluster serialization issues for null collection objects.</li>
<li>Improved error handling for remote sessions from invalid/offline cluster members.</li>
<li>Removed extraneous IllegalStateException thrown during certain cluster tasks.</li>
<li>Avoid encoding issues across cluster members by forcing UTF-8 for nodeID strings.</li>
<li>Added nodeID to the system clustering overview page in the admin console.</li>
<li>Updated cache configuration to prevent eviction for critical application components.</li>
</ul>
<p><b>1.0.1</b> -- December 14, 2012</p> <p><b>1.0.1</b> -- December 14, 2012</p>
<ul> <ul>
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<name>${plugin.name}</name> <name>${plugin.name}</name>
<description>${plugin.description}</description> <description>${plugin.description}</description>
<author>Tom Evans</author> <author>Tom Evans</author>
<version>1.0.1</version> <version>1.0.2</version>
<date>12/14/2012</date> <date>01/09/2013</date>
<minServerVersion>3.7.2</minServerVersion> <minServerVersion>3.7.2</minServerVersion>
</plugin> </plugin>
...@@ -103,8 +103,12 @@ between subsequent attempts to start the cluster.</li> ...@@ -103,8 +103,12 @@ between subsequent attempts to start the cluster.</li>
when running a synchronous task across members of the cluster.</li> when running a synchronous task across members of the cluster.</li>
<li><i>hazelcast.config.xml.filename</i> (hazelcast-cache-config.xml): Name <li><i>hazelcast.config.xml.filename</i> (hazelcast-cache-config.xml): Name
of the Hazelcast configuration file. By overriding this value you can easily of the Hazelcast configuration file. By overriding this value you can easily
install a custom cluster configuration file in the Hazelcast plugin /classes/ install a custom cache configuration file in the Hazelcast plugin /classes/
directory, or in the classpath of your own custom plugin.</li> directory, in the directory named via the <i>hazelcast.config.xml.directory</i>
property (described below), or in the classpath of your own custom plugin.</li>
<li><i>hazelcast.config.xml.directory</i> ({OPENFIRE_HOME}/conf): Directory
that will be added to the plugin's classpath. This allows a custom Hazelcast
configuration file to be located outside the Openfire home directory.</li>
</ol> </ol>
</p> </p>
<p> <p>
......
...@@ -131,7 +131,8 @@ public class HazelcastPlugin extends TimerTask implements Plugin, PropertyEventL ...@@ -131,7 +131,8 @@ public class HazelcastPlugin extends TimerTask implements Plugin, PropertyEventL
} }
public void destroyPlugin() { public void destroyPlugin() {
ClusterManager.shutdown(); // Shutdown is initiated by XMPPServer before unloading plugins
// ClusterManager.shutdown();
// Set the old serialization strategy was using before clustering was loaded // Set the old serialization strategy was using before clustering was loaded
ExternalizableUtil.getInstance().setStrategy(serializationStrategy); ExternalizableUtil.getInstance().setStrategy(serializationStrategy);
......
...@@ -24,6 +24,7 @@ import java.util.Date; ...@@ -24,6 +24,7 @@ import java.util.Date;
import org.jivesoftware.openfire.SessionManager; import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID; import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.cluster.ClusterNodeInfo;
import org.jivesoftware.openfire.session.Session; import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.cache.ClusterTask; import org.jivesoftware.util.cache.ClusterTask;
...@@ -166,38 +167,32 @@ public abstract class RemoteSession implements Session { ...@@ -166,38 +167,32 @@ public abstract class RemoteSession implements Session {
* *
* @param task the ClusterTask object to be invoked on a given cluster member. * @param task the ClusterTask object to be invoked on a given cluster member.
* @return result of remote operation. * @return result of remote operation.
* @throws IllegalStateException if requested node was not found or not running in a cluster.
*/ */
protected Object doSynchronousClusterTask(ClusterTask task) { protected Object doSynchronousClusterTask(ClusterTask task) {
try { ClusterNodeInfo info = CacheFactory.getClusterNodeInfo(nodeID);
return CacheFactory.doSynchronousClusterTask(task, nodeID); Object result = null;
} catch (IllegalStateException ise) { if (info == null && task instanceof RemoteSessionTask) { // clean up invalid session
if (task instanceof RemoteSessionTask) { SessionManager.getInstance().removeSession(null,
// clean up invalid session ((RemoteSessionTask)task).getSession().getAddress(), false, false);
SessionManager.getInstance().removeSession(null, } else {
((RemoteSessionTask)task).getSession().getAddress(), false, false); result = (info == null) ? null : CacheFactory.doSynchronousClusterTask(task, nodeID);
}
throw ise;
} }
return result;
} }
/** /**
* Invokes a task on the remote cluster member in an asynchronous fashion. * Invokes a task on the remote cluster member in an asynchronous fashion.
* *
* @param task the task to be invoked on the specified cluster member. * @param task the task to be invoked on the specified cluster member.
* @throws IllegalStateException if requested node was not found or not running in a cluster.
*/ */
protected void doClusterTask(ClusterTask task) { protected void doClusterTask(ClusterTask task) {
try { ClusterNodeInfo info = CacheFactory.getClusterNodeInfo(nodeID);
CacheFactory.doClusterTask(task, nodeID); if (info == null && task instanceof RemoteSessionTask) { // clean up invalid session
} catch (IllegalStateException ise) { SessionManager.getInstance().removeSession(null,
if (task instanceof RemoteSessionTask) { ((RemoteSessionTask)task).getSession().getAddress(), false, false);
// clean up invalid session } else {
SessionManager.getInstance().removeSession(null, CacheFactory.doClusterTask(task, nodeID);
((RemoteSessionTask)task).getSession().getAddress(), false, false); }
}
throw ise;
}
} }
/** /**
......
...@@ -23,6 +23,7 @@ import java.util.Set; ...@@ -23,6 +23,7 @@ import java.util.Set;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.NodeID; import org.jivesoftware.openfire.cluster.NodeID;
import org.jivesoftware.util.StringUtils;
import com.hazelcast.core.EntryEvent; import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener; import com.hazelcast.core.EntryListener;
...@@ -63,7 +64,7 @@ class CacheListener implements EntryListener { ...@@ -63,7 +64,7 @@ class CacheListener implements EntryListener {
} }
void handleMapEvent(EntryEvent event, boolean removal) { void handleMapEvent(EntryEvent event, boolean removal) {
NodeID nodeID = NodeID.getInstance(event.getMember().getUuid().getBytes()); NodeID nodeID = NodeID.getInstance(StringUtils.getBytes(event.getMember().getUuid()));
//ignore items which this node has added //ignore items which this node has added
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) { if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
Set<String> sessionJIDS = clusterListener.lookupJIDList(nodeID, cacheName); Set<String> sessionJIDS = clusterListener.lookupJIDList(nodeID, cacheName);
......
...@@ -88,6 +88,7 @@ public class ClusterExternalizableUtil implements ExternalizableUtilStrategy { ...@@ -88,6 +88,7 @@ public class ClusterExternalizableUtil implements ExternalizableUtilStrategy {
*/ */
public int readStringsMap(DataInput in, Map<String, Set<String>> map) throws IOException { public int readStringsMap(DataInput in, Map<String, Set<String>> map) throws IOException {
Map<String, Set<String>> result = (Map<String, Set<String>>) SerializationHelper.readObject(in); Map<String, Set<String>> result = (Map<String, Set<String>>) SerializationHelper.readObject(in);
if (result == null) return 0;
map.putAll(result); map.putAll(result);
return result.size(); return result.size();
} }
...@@ -224,15 +225,17 @@ public class ClusterExternalizableUtil implements ExternalizableUtilStrategy { ...@@ -224,15 +225,17 @@ public class ClusterExternalizableUtil implements ExternalizableUtilStrategy {
public int readExternalizableCollection(DataInput in, Collection<? extends Externalizable> value, public int readExternalizableCollection(DataInput in, Collection<? extends Externalizable> value,
ClassLoader loader) throws IOException { ClassLoader loader) throws IOException {
Collection<Externalizable> result = (Collection<Externalizable>) SerializationHelper.readObject(in); Collection<Externalizable> result = (Collection<Externalizable>) SerializationHelper.readObject(in);
((Collection<Externalizable>)value).addAll(result); if (result == null) return 0;
return result.size(); ((Collection<Externalizable>)value).addAll(result);
return result.size();
} }
public int readSerializableCollection(DataInput in, Collection<? extends Serializable> value, public int readSerializableCollection(DataInput in, Collection<? extends Serializable> value,
ClassLoader loader) throws IOException { ClassLoader loader) throws IOException {
Collection<Serializable> result = (Collection<Serializable>) SerializationHelper.readObject(in); Collection<Serializable> result = (Collection<Serializable>) SerializationHelper.readObject(in);
((Collection<Serializable>)value).addAll(result); if (result == null) return 0;
return result.size(); ((Collection<Serializable>)value).addAll(result);
return result.size();
} }
public void writeExternalizableMap(DataOutput out, Map<String, ? extends Externalizable> map) throws IOException { public void writeExternalizableMap(DataOutput out, Map<String, ? extends Externalizable> map) throws IOException {
...@@ -245,14 +248,16 @@ public class ClusterExternalizableUtil implements ExternalizableUtilStrategy { ...@@ -245,14 +248,16 @@ public class ClusterExternalizableUtil implements ExternalizableUtilStrategy {
public int readExternalizableMap(DataInput in, Map<String, ? extends Externalizable> map, ClassLoader loader) throws IOException { public int readExternalizableMap(DataInput in, Map<String, ? extends Externalizable> map, ClassLoader loader) throws IOException {
Map<String, Externalizable> result = (Map<String, Externalizable>) SerializationHelper.readObject(in); Map<String, Externalizable> result = (Map<String, Externalizable>) SerializationHelper.readObject(in);
((Map<String, Externalizable>)map).putAll(result); if (result == null) return 0;
return result.size(); ((Map<String, Externalizable>)map).putAll(result);
return result.size();
} }
public int readSerializableMap(DataInput in, Map<? extends Serializable, ? extends Serializable> map, ClassLoader loader) throws IOException { public int readSerializableMap(DataInput in, Map<? extends Serializable, ? extends Serializable> map, ClassLoader loader) throws IOException {
Map<String, Serializable> result = (Map<String, Serializable>) SerializationHelper.readObject(in); Map<String, Serializable> result = (Map<String, Serializable>) SerializationHelper.readObject(in);
((Map<String, Serializable>)map).putAll(result); if (result == null) return 0;
return result.size(); ((Map<String, Serializable>)map).putAll(result);
return result.size();
} }
public void writeStrings(DataOutput out, Collection<String> collection) throws IOException { public void writeStrings(DataOutput out, Collection<String> collection) throws IOException {
...@@ -261,9 +266,8 @@ public class ClusterExternalizableUtil implements ExternalizableUtilStrategy { ...@@ -261,9 +266,8 @@ public class ClusterExternalizableUtil implements ExternalizableUtilStrategy {
public int readStrings(DataInput in, Collection<String> collection) throws IOException { public int readStrings(DataInput in, Collection<String> collection) throws IOException {
Collection<String> result = (Collection<String>) SerializationHelper.readObject(in); Collection<String> result = (Collection<String>) SerializationHelper.readObject(in);
for (String string: result) { if (result == null) return 0;
collection.add(string); collection.addAll(result);
}
return result.size(); return result.size();
} }
} }
...@@ -43,6 +43,7 @@ import org.jivesoftware.openfire.session.IncomingServerSession; ...@@ -43,6 +43,7 @@ import org.jivesoftware.openfire.session.IncomingServerSession;
import org.jivesoftware.openfire.session.RemoteSessionLocator; import org.jivesoftware.openfire.session.RemoteSessionLocator;
import org.jivesoftware.openfire.spi.ClientRoute; import org.jivesoftware.openfire.spi.ClientRoute;
import org.jivesoftware.openfire.spi.RoutingTableImpl; import org.jivesoftware.openfire.spi.RoutingTableImpl;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory; import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.cache.CacheWrapper; import org.jivesoftware.util.cache.CacheWrapper;
...@@ -413,8 +414,8 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -413,8 +414,8 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
private class DirectedPresenceListener implements EntryListener { private class DirectedPresenceListener implements EntryListener {
public void entryAdded(EntryEvent event) { public void entryAdded(EntryEvent event) {
byte[] nodeID = event.getMember().getUuid().getBytes(); byte[] nodeID = StringUtils.getBytes(event.getMember().getUuid());
// Ignore events origintated from this JVM // Ignore events originated from this JVM
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) { if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
// Check if the directed presence was sent to an entity hosted by this JVM // Check if the directed presence was sent to an entity hosted by this JVM
RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable(); RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
...@@ -438,7 +439,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -438,7 +439,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
} }
public void entryUpdated(EntryEvent event) { public void entryUpdated(EntryEvent event) {
byte[] nodeID = event.getMember().getUuid().getBytes(); byte[] nodeID = StringUtils.getBytes(event.getMember().getUuid());
// Ignore events originated from this JVM // Ignore events originated from this JVM
if (nodeID != null && !XMPPServer.getInstance().getNodeID().equals(nodeID)) { if (nodeID != null && !XMPPServer.getInstance().getNodeID().equals(nodeID)) {
// Check if the directed presence was sent to an entity hosted by this JVM // Check if the directed presence was sent to an entity hosted by this JVM
...@@ -471,7 +472,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -471,7 +472,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
// Nothing to remove // Nothing to remove
return; return;
} }
byte[] nodeID = event.getMember().getUuid().getBytes(); byte[] nodeID = StringUtils.getBytes(event.getMember().getUuid());
if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) { if (!XMPPServer.getInstance().getNodeID().equals(nodeID)) {
String sender = event.getKey().toString(); String sender = event.getKey().toString();
nodePresences.get(NodeID.getInstance(nodeID)).remove(sender); nodePresences.get(NodeID.getInstance(nodeID)).remove(sender);
...@@ -564,8 +565,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -564,8 +565,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
simulateCacheInserts(multiplexerSessionsCache); simulateCacheInserts(multiplexerSessionsCache);
simulateCacheInserts(incomingServerSessionsCache); simulateCacheInserts(incomingServerSessionsCache);
simulateCacheInserts(directedPresencesCache); simulateCacheInserts(directedPresencesCache);
// Set the new ID of this cluster node
XMPPServer.getInstance().setNodeID(NodeID.getInstance(CacheFactory.getClusterMemberID()));
// Trigger events // Trigger events
ClusterManager.fireJoinedCluster(false); ClusterManager.fireJoinedCluster(false);
if (CacheFactory.isSeniorClusterMember()) { if (CacheFactory.isSeniorClusterMember()) {
...@@ -612,17 +612,17 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -612,17 +612,17 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
if (event.getMember().localMember()) { // We left and re-joined the cluster if (event.getMember().localMember()) { // We left and re-joined the cluster
joinCluster(); joinCluster();
} else { } else {
nodePresences.put(NodeID.getInstance(event.getMember().getUuid().getBytes()), nodePresences.put(NodeID.getInstance(StringUtils.getBytes(event.getMember().getUuid())),
new ConcurrentHashMap<String, Collection<String>>()); new ConcurrentHashMap<String, Collection<String>>());
// Trigger event that a new node has joined the cluster // Trigger event that a new node has joined the cluster
ClusterManager.fireJoinedCluster(event.getMember().getUuid().getBytes(), true); ClusterManager.fireJoinedCluster(StringUtils.getBytes(event.getMember().getUuid()), true);
} }
clusterNodesInfo.put(event.getMember().getUuid(), clusterNodesInfo.put(event.getMember().getUuid(),
new HazelcastClusterNodeInfo(event.getMember(), cluster.getClusterTime())); new HazelcastClusterNodeInfo(event.getMember(), cluster.getClusterTime()));
} }
public void memberRemoved(MembershipEvent event) { public void memberRemoved(MembershipEvent event) {
byte[] nodeID = event.getMember().getUuid().getBytes(); byte[] nodeID = StringUtils.getBytes(event.getMember().getUuid());
if (event.getMember().localMember()) { if (event.getMember().localMember()) {
logger.info("Leaving cluster: " + nodeID); logger.info("Leaving cluster: " + nodeID);
......
...@@ -38,6 +38,7 @@ import org.jivesoftware.openfire.XMPPServer; ...@@ -38,6 +38,7 @@ import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.cluster.ClusterNodeInfo; import org.jivesoftware.openfire.cluster.ClusterNodeInfo;
import org.jivesoftware.openfire.cluster.NodeID; import org.jivesoftware.openfire.cluster.NodeID;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.StringUtils;
import org.jivesoftware.util.cache.Cache; import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactoryStrategy; import org.jivesoftware.util.cache.CacheFactoryStrategy;
import org.jivesoftware.util.cache.CacheWrapper; import org.jivesoftware.util.cache.CacheWrapper;
...@@ -53,6 +54,7 @@ import com.hazelcast.core.Hazelcast; ...@@ -53,6 +54,7 @@ import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Member; import com.hazelcast.core.Member;
import com.hazelcast.core.MultiTask; import com.hazelcast.core.MultiTask;
import com.jivesoftware.util.cluster.HazelcastClusterNodeInfo;
/** /**
* CacheFactory implementation to use when using Hazelcast in cluster mode. * CacheFactory implementation to use when using Hazelcast in cluster mode.
...@@ -197,7 +199,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -197,7 +199,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
public byte[] getSeniorClusterMemberID() { public byte[] getSeniorClusterMemberID() {
if (cluster != null && !cluster.getMembers().isEmpty()) { if (cluster != null && !cluster.getMembers().isEmpty()) {
Member oldest = cluster.getMembers().iterator().next(); Member oldest = cluster.getMembers().iterator().next();
return oldest.getUuid().getBytes(); return StringUtils.getBytes(oldest.getUuid());
} }
else { else {
return null; return null;
...@@ -206,7 +208,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -206,7 +208,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
public byte[] getClusterMemberID() { public byte[] getClusterMemberID() {
if (cluster != null) { if (cluster != null) {
return cluster.getLocalMember().getUuid().getBytes(); return StringUtils.getBytes(cluster.getLocalMember().getUuid());
} }
else { else {
return null; return null;
...@@ -233,7 +235,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -233,7 +235,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
hazelcast.getExecutorService().execute( hazelcast.getExecutorService().execute(
new MultiTask<Object>(new CallableTask<Object>(task), members)); new MultiTask<Object>(new CallableTask<Object>(task), members));
} else { } else {
logger.debug("No cluster members selected for cluster task " + task.getClass().getName()); logger.warn("No cluster members selected for cluster task " + task.getClass().getName());
} }
} }
...@@ -244,22 +246,18 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -244,22 +246,18 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
*/ */
public boolean doClusterTask(final ClusterTask task, byte[] nodeID) { public boolean doClusterTask(final ClusterTask task, byte[] nodeID) {
if (cluster == null) { return false; } if (cluster == null) { return false; }
Member target = null; Member member = getMember(nodeID);
for(Member member: cluster.getMembers()) {
if (Arrays.equals(member.getUuid().getBytes(), nodeID)) {
target = member;
break;
}
}
// Check that the requested member was found // Check that the requested member was found
if (target != null) { if (member != null) {
// Asynchronously execute the task on the target member // Asynchronously execute the task on the target member
logger.debug("Executing asynchronous DistributedTask: " + task.getClass().getName()); logger.debug("Executing asynchronous DistributedTask: " + task.getClass().getName());
hazelcast.getExecutorService().execute( hazelcast.getExecutorService().execute(
new DistributedTask<Object>(new CallableTask<Object>(task), target)); new DistributedTask<Object>(new CallableTask<Object>(task), member));
return true; return true;
} } else {
throw new IllegalStateException("Requested node " + nodeID + " not found in cluster"); logger.warn("Requested node " + StringUtils.getString(nodeID) + " not found in cluster");
return false;
}
} }
/* /*
...@@ -292,7 +290,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -292,7 +290,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
logger.error("Failed to execute cluster task", e); logger.error("Failed to execute cluster task", e);
} }
} else { } else {
logger.debug("No cluster members selected for cluster task " + task.getClass().getName()); logger.warn("No cluster members selected for cluster task " + task.getClass().getName());
} }
return result; return result;
} }
...@@ -304,19 +302,13 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -304,19 +302,13 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
*/ */
public Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) { public Object doSynchronousClusterTask(ClusterTask task, byte[] nodeID) {
if (cluster == null) { return null; } if (cluster == null) { return null; }
Member target = null; Member member = getMember(nodeID);
for(Member member: cluster.getMembers()) {
if (Arrays.equals(member.getUuid().getBytes(), nodeID)) {
target = member;
break;
}
}
Object result = null; Object result = null;
// Check that the requested member was found // Check that the requested member was found
if (target != null) { if (member != null) {
// Asynchronously execute the task on the target member // Asynchronously execute the task on the target member
DistributedTask<Object> distributedTask = new DistributedTask<Object>( DistributedTask<Object> distributedTask = new DistributedTask<Object>(
new CallableTask<Object>(task), target); new CallableTask<Object>(task), member);
logger.debug("Executing DistributedTask: " + task.getClass().getName()); logger.debug("Executing DistributedTask: " + task.getClass().getName());
hazelcast.getExecutorService().execute(distributedTask); hazelcast.getExecutorService().execute(distributedTask);
try { try {
...@@ -328,10 +320,31 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -328,10 +320,31 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
logger.error("Failed to execute cluster task", e); logger.error("Failed to execute cluster task", e);
} }
} else { } else {
throw new IllegalStateException("Requested node " + nodeID + " not found in cluster"); logger.warn("Requested node " + StringUtils.getString(nodeID) + " not found in cluster");
} }
return result; return result;
} }
public ClusterNodeInfo getClusterNodeInfo(byte[] nodeID) {
if (cluster == null) { return null; }
ClusterNodeInfo result = null;
Member member = getMember(nodeID);
if (member != null) {
result = new HazelcastClusterNodeInfo(member, cluster.getClusterTime());
}
return result;
}
private Member getMember(byte[] nodeID) {
Member result = null;
for(Member member: cluster.getMembers()) {
if (Arrays.equals(StringUtils.getBytes(member.getUuid()), nodeID)) {
result = member;
break;
}
}
return result;
}
public void updateCacheStats(Map<String, Cache> caches) { public void updateCacheStats(Map<String, Cache> caches) {
if (caches.size() > 0 && cluster != null) { if (caches.size() > 0 && cluster != null) {
......
...@@ -22,6 +22,7 @@ package com.jivesoftware.util.cluster; ...@@ -22,6 +22,7 @@ package com.jivesoftware.util.cluster;
import org.jivesoftware.openfire.cluster.ClusterManager; import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.cluster.ClusterNodeInfo; import org.jivesoftware.openfire.cluster.ClusterNodeInfo;
import org.jivesoftware.openfire.cluster.NodeID; import org.jivesoftware.openfire.cluster.NodeID;
import org.jivesoftware.util.StringUtils;
import com.hazelcast.core.Member; import com.hazelcast.core.Member;
...@@ -44,9 +45,9 @@ public class HazelcastClusterNodeInfo implements ClusterNodeInfo { ...@@ -44,9 +45,9 @@ public class HazelcastClusterNodeInfo implements ClusterNodeInfo {
public HazelcastClusterNodeInfo(Member member, Long joinedTime) { public HazelcastClusterNodeInfo(Member member, Long joinedTime) {
hostname = member.getInetSocketAddress().getHostName(); hostname = member.getInetSocketAddress().getHostName();
nodeID = NodeID.getInstance(member.getUuid().getBytes()); nodeID = NodeID.getInstance(StringUtils.getBytes(member.getUuid()));
this.joinedTime = joinedTime; this.joinedTime = joinedTime;
seniorMember = ClusterManager.getSeniorClusterMember().equals(member.getUuid().getBytes()); seniorMember = ClusterManager.getSeniorClusterMember().equals(StringUtils.getBytes(member.getUuid()));
} }
public String getHostName() { public String getHostName() {
......
...@@ -38,6 +38,7 @@ import org.jivesoftware.util.cache.ExternalizableUtil; ...@@ -38,6 +38,7 @@ import org.jivesoftware.util.cache.ExternalizableUtil;
*/ */
public class NodeRuntimeStats { public class NodeRuntimeStats {
// This properties file is located in the Hazelcast JAR
private static final ResourceBundle config = ResourceBundle.getBundle("hazelcast-runtime"); private static final ResourceBundle config = ResourceBundle.getBundle("hazelcast-runtime");
public static String getProviderConfig(String key) { public static String getProviderConfig(String key) {
......
...@@ -81,7 +81,6 @@ ...@@ -81,7 +81,6 @@
boolean usingEmbeddedDB = DbConnectionManager.isEmbeddedDB(); boolean usingEmbeddedDB = DbConnectionManager.isEmbeddedDB();
boolean clusteringAvailable = !usingEmbeddedDB && ClusterManager.isClusteringAvailable(); boolean clusteringAvailable = !usingEmbeddedDB && ClusterManager.isClusteringAvailable();
boolean clusteringStarting = ClusterManager.isClusteringStarting();
int maxClusterNodes = ClusterManager.getMaxClusterNodes(); int maxClusterNodes = ClusterManager.getMaxClusterNodes();
clusteringEnabled = ClusterManager.isClusteringStarted() || ClusterManager.isClusteringStarting(); clusteringEnabled = ClusterManager.isClusteringStarted() || ClusterManager.isClusteringStarting();
...@@ -196,7 +195,7 @@ ...@@ -196,7 +195,7 @@
<tr> <tr>
<td width="1%" valign="top" nowrap> <td width="1%" valign="top" nowrap>
<input type="radio" name="clusteringEnabled" value="false" id="rb01" <input type="radio" name="clusteringEnabled" value="false" id="rb01"
<%= (!clusteringEnabled ? "checked" : "") %> <%= (!clusteringAvailable || clusteringStarting ? "disabled" : "") %>> <%= (!clusteringEnabled ? "checked" : "") %> <%= clusteringAvailable ? "" : "disabled" %>>
</td> </td>
<td width="99%"> <td width="99%">
<label for="rb01"> <label for="rb01">
...@@ -207,7 +206,7 @@ ...@@ -207,7 +206,7 @@
<tr> <tr>
<td width="1%" valign="top" nowrap> <td width="1%" valign="top" nowrap>
<input type="radio" name="clusteringEnabled" value="true" id="rb02" <input type="radio" name="clusteringEnabled" value="true" id="rb02"
<%= (clusteringEnabled ? "checked" : "") %> <%= (!clusteringAvailable || clusteringStarting ? "disabled" : "") %>> <%= (clusteringEnabled ? "checked" : "") %> <%= clusteringAvailable ? "" : "disabled" %>>
</td> </td>
<td width="99%"> <td width="99%">
<label for="rb02"> <label for="rb02">
...@@ -218,7 +217,7 @@ ...@@ -218,7 +217,7 @@
</tbody> </tbody>
</table> </table>
<br/> <br/>
<% if (clusteringAvailable && !clusteringStarting) { %> <% if (clusteringAvailable) { %>
<input type="submit" name="update" value="<fmt:message key="global.save_settings" />"> <input type="submit" name="update" value="<fmt:message key="global.save_settings" />">
<% } %> <% } %>
</div> </div>
...@@ -294,6 +293,8 @@ ...@@ -294,6 +293,8 @@
<% } else { %> <% } else { %>
<%= nodeInfo.getHostName() %> <%= nodeInfo.getHostName() %>
<% } %></a> <% } %></a>
<br />
<%= nodeInfo.getNodeID() %>
</td> </td>
<td class="jive-description" nowrap width="1%" valign="middle"> <td class="jive-description" nowrap width="1%" valign="middle">
<%= JiveGlobals.formatDateTime(new Date(nodeInfo.getJoinedTime())) %> <%= JiveGlobals.formatDateTime(new Date(nodeInfo.getJoinedTime())) %>
...@@ -359,7 +360,7 @@ ...@@ -359,7 +360,7 @@
<td width="20%">&nbsp;</td> <td width="20%">&nbsp;</td>
</tr> </tr>
<% } <% }
} else if (clusteringStarting) { %> } else if (ClusterManager.isClusteringStarting()) { %>
<tr valign="middle" align="middle" class="local"> <tr valign="middle" align="middle" class="local">
<td colspan=8> <td colspan=8>
<fmt:message key="system.clustering.starting"> <fmt:message key="system.clustering.starting">
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment