Commit 649b1efc authored by Tom Evans's avatar Tom Evans Committed by tevans

OF-699/OF-729: Upgrade Hazelcast to latest release (3.1.5); fix initialization timing issue

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@13928 b35dd754-fafc-0310-a699-88a17e54d16e
parent 32a70212
...@@ -44,6 +44,13 @@ ...@@ -44,6 +44,13 @@
Hazelcast Clustering Plugin Changelog Hazelcast Clustering Plugin Changelog
</h1> </h1>
<p><b>1.2.0</b> -- February 10, 2014</p>
<p>Miscellaneous enhancements:</p>
<ul>
<li>Fix cluster initialization logic (<a href="http://issues.igniterealtime.org/browse/OF-699">OF-699</a>)</li>
<li>Updated Hazelcast to release 3.1.5 (<a href="http://www.hazelcast.org/docs/3.1/manual/html-single/#WhatsNew31">what's new</a>).</li>
</ul>
<p><b>1.1.0</b> -- Sep 13, 2013</p> <p><b>1.1.0</b> -- Sep 13, 2013</p>
<ul> <ul>
<li>Requires Openfire 3.9.0.</li> <li>Requires Openfire 3.9.0.</li>
...@@ -54,7 +61,7 @@ Hazelcast Clustering Plugin Changelog ...@@ -54,7 +61,7 @@ Hazelcast Clustering Plugin Changelog
<ul> <ul>
<li>Added support for cluster time (<a href="http://issues.igniterealtime.org/browse/OF-666">OF-666</a>)</li> <li>Added support for cluster time (<a href="http://issues.igniterealtime.org/browse/OF-666">OF-666</a>)</li>
<li>Added <code>hazelcast-cloud.jar</code> to support AWS deployments (<a href="http://community.igniterealtime.org/blogs/ignite/2012/09/23/introducing-hazelcast-a-new-way-to-cluster-openfire#comment-8027">more info</a>).</li> <li>Added <code>hazelcast-cloud.jar</code> to support AWS deployments (<a href="http://community.igniterealtime.org/blogs/ignite/2012/09/23/introducing-hazelcast-a-new-way-to-cluster-openfire#comment-8027">more info</a>).</li>
<li>Updated Hazelcast to release 2.5.1 (<a href="http://www.hazelcast.com/docs/2.5/manual/single_html/#ReleaseNotes">bug fixes</a>).</li> <li>Updated Hazelcast to release 2.5.1 (<a href="http://www.hazelcast.org/docs/2.5/manual/single_html/#ReleaseNotes">bug fixes</a>).</li>
</ul> </ul>
<p><b>1.0.5</b> -- March 26, 2013</p> <p><b>1.0.5</b> -- March 26, 2013</p>
......
...@@ -137,7 +137,7 @@ ...@@ -137,7 +137,7 @@
Any integer between 0 and Integer.MAX_VALUE. 0 means Any integer between 0 and Integer.MAX_VALUE. 0 means
Integer.MAX_VALUE. Default is 0. Integer.MAX_VALUE. Default is 0.
--> -->
<max-size policy="cluster_wide_map_size">100000</max-size> <max-size policy="per_partition">100000</max-size>
<!-- <!--
When max. size is reached, specified percentage of When max. size is reached, specified percentage of
the map will be evicted. Any integer between 0 and 100. the map will be evicted. Any integer between 0 and 100.
...@@ -250,13 +250,13 @@ ...@@ -250,13 +250,13 @@
<map name="POP3 Authentication"> <map name="POP3 Authentication">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<max-size policy="cluster_wide_map_size">10000</max-size> <max-size policy="per_partition">10000</max-size>
<time-to-live-seconds>3600</time-to-live-seconds> <time-to-live-seconds>3600</time-to-live-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
<map name="LDAP Authentication"> <map name="LDAP Authentication">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<max-size policy="cluster_wide_map_size">10000</max-size> <max-size policy="per_partition">10000</max-size>
<time-to-live-seconds>7200</time-to-live-seconds> <time-to-live-seconds>7200</time-to-live-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
...@@ -267,7 +267,7 @@ ...@@ -267,7 +267,7 @@
</map> </map>
<map name="File Transfer Cache"> <map name="File Transfer Cache">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<max-size policy="cluster_wide_map_size">10000</max-size> <max-size policy="per_partition">10000</max-size>
<time-to-live-seconds>600</time-to-live-seconds> <time-to-live-seconds>600</time-to-live-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
...@@ -278,7 +278,7 @@ ...@@ -278,7 +278,7 @@
</map> </map>
<map name="Javascript Cache"> <map name="Javascript Cache">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<max-size policy="cluster_wide_map_size">10000</max-size> <max-size policy="per_partition">10000</max-size>
<time-to-live-seconds>864000</time-to-live-seconds> <time-to-live-seconds>864000</time-to-live-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
...@@ -304,7 +304,7 @@ ...@@ -304,7 +304,7 @@
</map> </map>
<map name="Last Activity Cache"> <map name="Last Activity Cache">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<max-size policy="cluster_wide_map_size">10000</max-size> <max-size policy="per_partition">10000</max-size>
<time-to-live-seconds>21600</time-to-live-seconds> <time-to-live-seconds>21600</time-to-live-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
...@@ -315,37 +315,37 @@ ...@@ -315,37 +315,37 @@
</map> </map>
<map name="Multicast Service"> <map name="Multicast Service">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<max-size policy="cluster_wide_map_size">10000</max-size> <max-size policy="per_partition">10000</max-size>
<time-to-live-seconds>86400</time-to-live-seconds> <time-to-live-seconds>86400</time-to-live-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
<map name="Offline Message Size"> <map name="Offline Message Size">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<max-size policy="cluster_wide_map_size">100000</max-size> <max-size policy="per_partition">100000</max-size>
<time-to-live-seconds>43200</time-to-live-seconds> <time-to-live-seconds>43200</time-to-live-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
<map name="Offline Presence Cache"> <map name="Offline Presence Cache">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<max-size policy="cluster_wide_map_size">100000</max-size> <max-size policy="per_partition">100000</max-size>
<time-to-live-seconds>21600</time-to-live-seconds> <time-to-live-seconds>21600</time-to-live-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
<map name="Privacy Lists"> <map name="Privacy Lists">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<max-size policy="cluster_wide_map_size">100000</max-size> <max-size policy="per_partition">100000</max-size>
<time-to-live-seconds>21600</time-to-live-seconds> <time-to-live-seconds>21600</time-to-live-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
<map name="Remote Users Existence"> <map name="Remote Users Existence">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<max-size policy="cluster_wide_map_size">100000</max-size> <max-size policy="per_partition">100000</max-size>
<time-to-live-seconds>600</time-to-live-seconds> <time-to-live-seconds>600</time-to-live-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
<map name="Remote Server Configurations"> <map name="Remote Server Configurations">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<max-size policy="cluster_wide_map_size">100000</max-size> <max-size policy="per_partition">100000</max-size>
<time-to-live-seconds>1800</time-to-live-seconds> <time-to-live-seconds>1800</time-to-live-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
...@@ -355,28 +355,28 @@ ...@@ -355,28 +355,28 @@
<map name="Group Metadata Cache"> <map name="Group Metadata Cache">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<read-backup-data>true</read-backup-data> <read-backup-data>true</read-backup-data>
<max-size policy="cluster_wide_map_size">100000</max-size> <max-size policy="per_partition">100000</max-size>
<max-idle-seconds>3600</max-idle-seconds> <max-idle-seconds>3600</max-idle-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
<map name="Group"> <map name="Group">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<read-backup-data>true</read-backup-data> <read-backup-data>true</read-backup-data>
<max-size policy="cluster_wide_map_size">100000</max-size> <max-size policy="per_partition">100000</max-size>
<max-idle-seconds>3600</max-idle-seconds> <max-idle-seconds>3600</max-idle-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
<map name="Roster"> <map name="Roster">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<read-backup-data>true</read-backup-data> <read-backup-data>true</read-backup-data>
<max-size policy="cluster_wide_map_size">100000</max-size> <max-size policy="per_partition">100000</max-size>
<max-idle-seconds>3600</max-idle-seconds> <max-idle-seconds>3600</max-idle-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
<map name="User"> <map name="User">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<read-backup-data>true</read-backup-data> <read-backup-data>true</read-backup-data>
<max-size policy="cluster_wide_map_size">100000</max-size> <max-size policy="per_partition">100000</max-size>
<max-idle-seconds>3600</max-idle-seconds> <max-idle-seconds>3600</max-idle-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
</map> </map>
...@@ -386,7 +386,7 @@ ...@@ -386,7 +386,7 @@
<map name="VCard"> <map name="VCard">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<read-backup-data>true</read-backup-data> <read-backup-data>true</read-backup-data>
<max-size policy="cluster_wide_map_size">100000</max-size> <max-size policy="per_partition">100000</max-size>
<time-to-live-seconds>21600</time-to-live-seconds> <time-to-live-seconds>21600</time-to-live-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
<near-cache> <near-cache>
...@@ -399,7 +399,7 @@ ...@@ -399,7 +399,7 @@
<map name="Published Items"> <map name="Published Items">
<backup-count>1</backup-count> <backup-count>1</backup-count>
<read-backup-data>true</read-backup-data> <read-backup-data>true</read-backup-data>
<max-size policy="cluster_wide_map_size">100000</max-size> <max-size policy="per_partition">100000</max-size>
<time-to-live-seconds>900</time-to-live-seconds> <time-to-live-seconds>900</time-to-live-seconds>
<eviction-policy>LRU</eviction-policy> <eviction-policy>LRU</eviction-policy>
<near-cache> <near-cache>
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<name>${plugin.name}</name> <name>${plugin.name}</name>
<description>${plugin.description}</description> <description>${plugin.description}</description>
<author>Tom Evans</author> <author>Tom Evans</author>
<version>1.1.0</version> <version>1.2.0</version>
<date>09/13/2013</date> <date>02/10/2014</date>
<minServerVersion>3.9.0</minServerVersion> <minServerVersion>3.9.0</minServerVersion>
</plugin> </plugin>
...@@ -61,11 +61,11 @@ servers together in a cluster. By running Openfire as a cluster, you can ...@@ -61,11 +61,11 @@ servers together in a cluster. By running Openfire as a cluster, you can
distribute the connection load among several servers, while also providing distribute the connection load among several servers, while also providing
failover in the event that one of your servers fails. This plugin is a failover in the event that one of your servers fails. This plugin is a
drop-in replacement for the original Openfire clustering plugin, using the drop-in replacement for the original Openfire clustering plugin, using the
open source <a href="http://www.hazelcast.com">Hazelcast</a> data distribution open source <a href="http://www.hazelcast.org">Hazelcast</a> data distribution
framework in lieu of an expensive proprietary third-party product. framework in lieu of an expensive proprietary third-party product.
</p> </p>
<p> <p>
The current Hazelcast release is version 2.5.1. The current Hazelcast release is version 3.1.5.
</p> </p>
<h2>Installation</h2> <h2>Installation</h2>
<p> <p>
...@@ -186,7 +186,7 @@ Hazelcast JMX docs</a> for additional information.</li> ...@@ -186,7 +186,7 @@ Hazelcast JMX docs</a> for additional information.</li>
</ol> </ol>
</p> </p>
<p> <p>
The Hazelcast plugin uses the <a href="http://www.hazelcast.com/docs/2.5/manual/single_html/#Config"> The Hazelcast plugin uses the <a href="http://www.hazelcast.org/docs/3.1/manual/single_html/#Config">
XML configuration builder</a> to initialize the cluster from the XML file described above. XML configuration builder</a> to initialize the cluster from the XML file described above.
By default the cluster members will attempt to discover each other via multicast at the By default the cluster members will attempt to discover each other via multicast at the
following location: following location:
...@@ -211,7 +211,7 @@ following alternative: ...@@ -211,7 +211,7 @@ following alternative:
&lt;/join&gt; &lt;/join&gt;
... ...
</pre> </pre>
Please refer to the <a href="http://www.hazelcast.com/docs/2.5/manual/single_html/"> Please refer to the <a href="http://www.hazelcast.org/docs/3.1/manual/single_html/">
Hazelcast reference manual</a> for more information. Hazelcast reference manual</a> for more information.
</p> </p>
</body> </body>
......
...@@ -54,6 +54,7 @@ import org.xmpp.packet.Presence; ...@@ -54,6 +54,7 @@ import org.xmpp.packet.Presence;
import com.hazelcast.core.Cluster; import com.hazelcast.core.Cluster;
import com.hazelcast.core.EntryEvent; import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryEventType;
import com.hazelcast.core.EntryListener; import com.hazelcast.core.EntryListener;
import com.hazelcast.core.LifecycleEvent; import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.LifecycleEvent.LifecycleState; import com.hazelcast.core.LifecycleEvent.LifecycleState;
...@@ -138,18 +139,6 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -138,18 +139,6 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
directedPresencesCache = CacheFactory.createCache(PresenceUpdateHandler.PRESENCE_CACHE_NAME); directedPresencesCache = CacheFactory.createCache(PresenceUpdateHandler.PRESENCE_CACHE_NAME);
addEntryListener(C2SCache, new CacheListener(this, C2SCache.getName()));
addEntryListener(anonymousC2SCache, new CacheListener(this, anonymousC2SCache.getName()));
addEntryListener(S2SCache, new CacheListener(this, S2SCache.getName()));
addEntryListener(componentsCache, new ComponentCacheListener());
addEntryListener(sessionInfoCache, new CacheListener(this, sessionInfoCache.getName()));
addEntryListener(componentSessionsCache, new CacheListener(this, componentSessionsCache.getName()));
addEntryListener(multiplexerSessionsCache, new CacheListener(this, multiplexerSessionsCache.getName()));
addEntryListener(incomingServerSessionsCache, new CacheListener(this, incomingServerSessionsCache.getName()));
addEntryListener(directedPresencesCache, new DirectedPresenceListener());
joinCluster(); joinCluster();
} }
...@@ -173,7 +162,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -173,7 +162,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
ClusteredCache clusteredCache = (ClusteredCache) wrapped; ClusteredCache clusteredCache = (ClusteredCache) wrapped;
for (Map.Entry entry : (Set<Map.Entry>) cache.entrySet()) { for (Map.Entry entry : (Set<Map.Entry>) cache.entrySet()) {
EntryEvent event = new EntryEvent(clusteredCache.map.getName(), cluster.getLocalMember(), EntryEvent event = new EntryEvent(clusteredCache.map.getName(), cluster.getLocalMember(),
EntryEvent.TYPE_ADDED, entry.getKey(), null, entry.getValue()); EntryEventType.ADDED.getType(), entry.getKey(), null, entry.getValue());
EntryListener.entryAdded(event); EntryListener.entryAdded(event);
} }
} }
...@@ -555,6 +544,20 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -555,6 +544,20 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
if (!isDone()) { // already joined if (!isDone()) { // already joined
return; return;
} }
// Trigger events
ClusterManager.fireJoinedCluster(false);
addEntryListener(C2SCache, new CacheListener(this, C2SCache.getName()));
addEntryListener(anonymousC2SCache, new CacheListener(this, anonymousC2SCache.getName()));
addEntryListener(S2SCache, new CacheListener(this, S2SCache.getName()));
addEntryListener(componentsCache, new ComponentCacheListener());
addEntryListener(sessionInfoCache, new CacheListener(this, sessionInfoCache.getName()));
addEntryListener(componentSessionsCache, new CacheListener(this, componentSessionsCache.getName()));
addEntryListener(multiplexerSessionsCache, new CacheListener(this, multiplexerSessionsCache.getName()));
addEntryListener(incomingServerSessionsCache, new CacheListener(this, incomingServerSessionsCache.getName()));
addEntryListener(directedPresencesCache, new DirectedPresenceListener());
// Simulate insert events of existing cache content // Simulate insert events of existing cache content
simulateCacheInserts(C2SCache); simulateCacheInserts(C2SCache);
simulateCacheInserts(anonymousC2SCache); simulateCacheInserts(anonymousC2SCache);
...@@ -566,8 +569,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener { ...@@ -566,8 +569,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
simulateCacheInserts(incomingServerSessionsCache); simulateCacheInserts(incomingServerSessionsCache);
simulateCacheInserts(directedPresencesCache); simulateCacheInserts(directedPresencesCache);
// Trigger events
ClusterManager.fireJoinedCluster(false);
if (CacheFactory.isSeniorClusterMember()) { if (CacheFactory.isSeniorClusterMember()) {
seniorClusterMember = true; seniorClusterMember = true;
ClusterManager.fireMarkedAsSeniorClusterMember(); ClusterManager.fireMarkedAsSeniorClusterMember();
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package com.jivesoftware.util.cache; package com.jivesoftware.util.cache;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -40,6 +41,8 @@ public class ClusteredCache implements Cache { ...@@ -40,6 +41,8 @@ public class ClusteredCache implements Cache {
private static Logger logger = LoggerFactory.getLogger(ClusteredCache.class); private static Logger logger = LoggerFactory.getLogger(ClusteredCache.class);
private final Map<EntryListener, String> registrations = new HashMap<EntryListener, String>();
/** /**
* The map is used for distributed operations such as get, put, etc. * The map is used for distributed operations such as get, put, etc.
*/ */
...@@ -59,11 +62,14 @@ public class ClusteredCache implements Cache { ...@@ -59,11 +62,14 @@ public class ClusteredCache implements Cache {
} }
public void addEntryListener(EntryListener listener, boolean includeValue) { public void addEntryListener(EntryListener listener, boolean includeValue) {
map.addEntryListener(listener, includeValue); registrations.put(listener, map.addEntryListener(listener, includeValue));
} }
public void removeEntryListener(EntryListener listener) { public void removeEntryListener(EntryListener listener) {
map.removeEntryListener(listener); String registrationId = registrations.get(listener);
if (registrationId != null) {
map.removeEntryListener(registrationId);
}
} }
// Cache Interface // Cache Interface
...@@ -168,7 +174,12 @@ public class ClusteredCache implements Cache { ...@@ -168,7 +174,12 @@ public class ClusteredCache implements Cache {
} else if (timeout == 0) { } else if (timeout == 0) {
result = map.tryLock(key); result = map.tryLock(key);
} else { } else {
try {
result = map.tryLock(key, timeout, TimeUnit.MILLISECONDS); result = map.tryLock(key, timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.error("Failed to get cluster lock", e);
result = false;
}
} }
return result; return result;
} }
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
package com.jivesoftware.util.cache; package com.jivesoftware.util.cache;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
...@@ -29,6 +30,7 @@ import java.util.Iterator; ...@@ -29,6 +30,7 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
...@@ -52,11 +54,9 @@ import org.slf4j.LoggerFactory; ...@@ -52,11 +54,9 @@ import org.slf4j.LoggerFactory;
import com.hazelcast.config.ClasspathXmlConfig; import com.hazelcast.config.ClasspathXmlConfig;
import com.hazelcast.config.Config; import com.hazelcast.config.Config;
import com.hazelcast.core.Cluster; import com.hazelcast.core.Cluster;
import com.hazelcast.core.DistributedTask;
import com.hazelcast.core.Hazelcast; import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Member; import com.hazelcast.core.Member;
import com.hazelcast.core.MultiTask;
import com.jivesoftware.openfire.session.RemoteSessionLocator; import com.jivesoftware.openfire.session.RemoteSessionLocator;
import com.jivesoftware.util.cluster.ClusterPacketRouter; import com.jivesoftware.util.cluster.ClusterPacketRouter;
import com.jivesoftware.util.cluster.HazelcastClusterNodeInfo; import com.jivesoftware.util.cluster.HazelcastClusterNodeInfo;
...@@ -69,6 +69,8 @@ import com.jivesoftware.util.cluster.HazelcastClusterNodeInfo; ...@@ -69,6 +69,8 @@ import com.jivesoftware.util.cluster.HazelcastClusterNodeInfo;
*/ */
public class ClusteredCacheFactory implements CacheFactoryStrategy { public class ClusteredCacheFactory implements CacheFactoryStrategy {
public static final String HAZELCAST_EXECUTOR_SERVICE_NAME =
JiveGlobals.getProperty("hazelcast.executor.service.name", "openfire::cluster::executor");
private static final long MAX_CLUSTER_EXECUTION_TIME = private static final long MAX_CLUSTER_EXECUTION_TIME =
JiveGlobals.getLongProperty("hazelcast.max.execution.seconds", 30); JiveGlobals.getLongProperty("hazelcast.max.execution.seconds", 30);
private static final long CLUSTER_STARTUP_RETRY_TIME = private static final long CLUSTER_STARTUP_RETRY_TIME =
...@@ -276,8 +278,8 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -276,8 +278,8 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
if (members.size() > 0) { if (members.size() > 0) {
// Asynchronously execute the task on the other cluster members // Asynchronously execute the task on the other cluster members
logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName()); logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName());
hazelcast.getExecutorService().execute( hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(
new MultiTask<Object>(new CallableTask<Object>(task), members)); new CallableTask<Object>(task), members);
} else { } else {
logger.warn("No cluster members selected for cluster task " + task.getClass().getName()); logger.warn("No cluster members selected for cluster task " + task.getClass().getName());
} }
...@@ -295,8 +297,8 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -295,8 +297,8 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
if (member != null) { if (member != null) {
// Asynchronously execute the task on the target member // Asynchronously execute the task on the target member
logger.debug("Executing asynchronous DistributedTask: " + task.getClass().getName()); logger.debug("Executing asynchronous DistributedTask: " + task.getClass().getName());
hazelcast.getExecutorService().execute( hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMember(
new DistributedTask<Object>(new CallableTask<Object>(task), member)); new CallableTask<Object>(task), member);
return true; return true;
} else { } else {
logger.warn("Requested node " + StringUtils.getString(nodeID) + " not found in cluster"); logger.warn("Requested node " + StringUtils.getString(nodeID) + " not found in cluster");
...@@ -310,8 +312,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -310,8 +312,7 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
* (seconds) until the task is run on all members. * (seconds) until the task is run on all members.
*/ */
public Collection<Object> doSynchronousClusterTask(ClusterTask task, boolean includeLocalMember) { public Collection<Object> doSynchronousClusterTask(ClusterTask task, boolean includeLocalMember) {
Collection<Object> result = Collections.emptyList(); if (cluster == null) { return Collections.emptyList(); }
if (cluster == null) { return result; }
Set<Member> members = new HashSet<Member>(); Set<Member> members = new HashSet<Member>();
Member current = cluster.getLocalMember(); Member current = cluster.getLocalMember();
for(Member member : cluster.getMembers()) { for(Member member : cluster.getMembers()) {
...@@ -319,15 +320,19 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -319,15 +320,19 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
members.add(member); members.add(member);
} }
} }
Collection<Object> result = new ArrayList<Object>();
if (members.size() > 0) { if (members.size() > 0) {
// Asynchronously execute the task on the other cluster members // Asynchronously execute the task on the other cluster members
MultiTask<Object> multiTask = new MultiTask<Object>(
new CallableTask<Object>(task), members);
try { try {
logger.debug("Executing MultiTask: " + task.getClass().getName()); logger.debug("Executing MultiTask: " + task.getClass().getName());
hazelcast.getExecutorService().execute(multiTask); Map<Member, Future<Object>> futures = hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME)
result = multiTask.get(MAX_CLUSTER_EXECUTION_TIME,TimeUnit.SECONDS); .submitToMembers(new CallableTask<Object>(task), members);
logger.debug("MultiTask result: " + (result == null ? "null" : result.size())); long nanosLeft = TimeUnit.SECONDS.toNanos(MAX_CLUSTER_EXECUTION_TIME);
for (Future<Object> future : futures.values()) {
long start = System.nanoTime();
result.add(future.get(nanosLeft, TimeUnit.NANOSECONDS));
nanosLeft = (System.nanoTime() - start);
}
} catch (TimeoutException te) { } catch (TimeoutException te) {
logger.error("Failed to execute cluster task within " + MAX_CLUSTER_EXECUTION_TIME + " seconds", te); logger.error("Failed to execute cluster task within " + MAX_CLUSTER_EXECUTION_TIME + " seconds", te);
} catch (Exception e) { } catch (Exception e) {
...@@ -351,12 +356,11 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy { ...@@ -351,12 +356,11 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
// Check that the requested member was found // Check that the requested member was found
if (member != null) { if (member != null) {
// Asynchronously execute the task on the target member // Asynchronously execute the task on the target member
DistributedTask<Object> distributedTask = new DistributedTask<Object>(
new CallableTask<Object>(task), member);
logger.debug("Executing DistributedTask: " + task.getClass().getName()); logger.debug("Executing DistributedTask: " + task.getClass().getName());
hazelcast.getExecutorService().execute(distributedTask);
try { try {
result = distributedTask.get(MAX_CLUSTER_EXECUTION_TIME, TimeUnit.SECONDS); Future<Object> future = hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME)
.submitToMember(new CallableTask<Object>(task), member);
result = future.get(MAX_CLUSTER_EXECUTION_TIME, TimeUnit.SECONDS);
logger.debug("DistributedTask result: " + (result == null ? "null" : result)); logger.debug("DistributedTask result: " + (result == null ? "null" : result));
} catch (TimeoutException te) { } catch (TimeoutException te) {
logger.error("Failed to execute cluster task within " + MAX_CLUSTER_EXECUTION_TIME + " seconds", te); logger.error("Failed to execute cluster task within " + MAX_CLUSTER_EXECUTION_TIME + " seconds", te);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment