Commit b1599c97 authored by Tom Evans's avatar Tom Evans Committed by Dave Cridland

OF-863: Fix multiple NPEs in cluster mode

This patch fixes several NullPointerExceptions that were observed in an
cluster when running under high load and latency, including related
issues in the BOSH connector and hazelcast plugin. Some related logging
improvements are also included.

Some of the NPEs were generated during the startup phase before the
server was fully initialized. These have been corrected by checking
appropriate "ready state" indicators.

In general, sessions hosted in remote cluster nodes that cannot be
addressed due to timeouts during remote task execution should be
considered temporarily "unavailable" to the cluster member that
initiated the call.
parent b2c6447d
......@@ -97,6 +97,7 @@ public class IQRouter extends BasicModule {
}
JID sender = packet.getFrom();
ClientSession session = sessionManager.getSession(sender);
Element childElement = packet.getChildElement(); // may be null
try {
// Invoke the interceptors before we process the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true, false);
......@@ -106,23 +107,25 @@ public class IQRouter extends BasicModule {
// User is requesting this server to authenticate for another server. Return
// a bad-request error
IQ reply = IQ.createResultIQ(packet);
reply.setChildElement(packet.getChildElement().createCopy());
if (childElement != null) {
reply.setChildElement(childElement.createCopy());
}
reply.setError(PacketError.Condition.bad_request);
session.process(reply);
Log.warn("User tried to authenticate with this server using an unknown receipient: " +
packet.toXML());
}
else if (session == null || session.getStatus() == Session.STATUS_AUTHENTICATED || (
isLocalServer(to) && (
"jabber:iq:auth".equals(packet.getChildElement().getNamespaceURI()) ||
"jabber:iq:register"
.equals(packet.getChildElement().getNamespaceURI()) ||
"urn:ietf:params:xml:ns:xmpp-bind"
.equals(packet.getChildElement().getNamespaceURI())))) {
childElement != null && isLocalServer(to) && (
"jabber:iq:auth".equals(childElement.getNamespaceURI()) ||
"jabber:iq:register".equals(childElement.getNamespaceURI()) ||
"urn:ietf:params:xml:ns:xmpp-bind".equals(childElement.getNamespaceURI())))) {
handle(packet);
} else if (packet.getType() == IQ.Type.get || packet.getType() == IQ.Type.set) {
IQ reply = IQ.createResultIQ(packet);
reply.setChildElement(packet.getChildElement().createCopy());
if (childElement != null) {
reply.setChildElement(childElement.createCopy());
}
reply.setError(PacketError.Condition.not_authorized);
session.process(reply);
}
......@@ -133,7 +136,9 @@ public class IQRouter extends BasicModule {
if (session != null) {
// An interceptor rejected this packet so answer a not_allowed error
IQ reply = new IQ();
reply.setChildElement(packet.getChildElement().createCopy());
if (childElement != null) {
reply.setChildElement(childElement.createCopy());
}
reply.setID(packet.getID());
reply.setTo(session.getAddress());
reply.setFrom(packet.getTo());
......
......@@ -872,7 +872,7 @@ public class SessionManager extends BasicModule implements ClusterEventListener
public Collection<ClientSession> getSessions(String username) {
List<ClientSession> sessionList = new ArrayList<ClientSession>();
if (username != null) {
if (username != null && serverName != null) {
List<JID> addresses = routingTable.getRoutes(new JID(username, serverName, null, true), null);
for (JID address : addresses) {
sessionList.add(routingTable.getClientRoute(address));
......
......@@ -243,8 +243,10 @@ public class HttpBindServlet extends HttpServlet {
HttpSession session = sessionManager.getSession(sid);
if (session == null) {
Log.warn("Client provided invalid session: " + sid + ". [" +
request.getRemoteAddr() + "]");
if (Log.isDebugEnabled()) {
Log.debug("Client provided invalid session: " + sid + ". [" +
request.getRemoteAddr() + "]");
}
response.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid SID.");
return;
}
......
......@@ -39,6 +39,7 @@ import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.TaskEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -400,14 +401,18 @@ public class HttpSessionManager {
public void run() {
long currentTime = System.currentTimeMillis();
for (HttpSession session : sessionMap.values()) {
long lastActive = currentTime - session.getLastActivity();
if (Log.isDebugEnabled()) {
Log.debug("Session was last active " + lastActive + " ms ago: " + session.getAddress());
}
if (lastActive > session.getInactivityTimeout() * JiveConstants.SECOND) {
Log.info("Closing idle session: " + session.getAddress());
session.close();
}
try {
long lastActive = currentTime - session.getLastActivity();
if (Log.isDebugEnabled()) {
Log.debug("Session was last active " + lastActive + " ms ago: " + session.getAddress());
}
if (lastActive > session.getInactivityTimeout() * JiveConstants.SECOND) {
Log.info("Closing idle session: " + session.getAddress());
session.close();
}
} catch (Exception e) {
Log.error("Failed to determine idle state for session: " + session, e);
}
}
}
}
......
......@@ -520,7 +520,7 @@ public class RoutingTableImpl extends BasicModule implements RoutingTable, Clust
// Get existing AVAILABLE sessions of this user or AVAILABLE to the sender of the packet
for (JID address : getRoutes(recipientJID, packet.getFrom())) {
ClientSession session = getClientRoute(address);
if (session != null) {
if (session != null && session.isInitialized()) {
sessions.add(session);
}
}
......
......@@ -132,9 +132,9 @@ public interface CacheFactoryStrategy {
*
* @param task the task to be invoked on the specified cluster member.
* @param nodeID the byte array that identifies the target cluster member.
* @return false if not in a cluster or specified cluster node was not found.
* @throws IllegalStateException if requested node was not found.
*/
boolean doClusterTask(ClusterTask task, byte[] nodeID);
void doClusterTask(ClusterTask task, byte[] nodeID);
/**
* Invokes a task on other cluster members synchronously and returns the result as a Collection
......
......@@ -94,7 +94,7 @@ public class DefaultLocalCacheStrategy implements CacheFactoryStrategy {
public void doClusterTask(final ClusterTask task) {
}
public boolean doClusterTask(ClusterTask task, byte[] nodeID) {
public void doClusterTask(ClusterTask task, byte[] nodeID) {
throw new IllegalStateException("Cluster service is not available");
}
......
......@@ -44,6 +44,13 @@
Hazelcast Clustering Plugin Changelog
</h1>
<p><b>1.3.4</b> -- January 10, 2015</p>
<p>Bug fixes:</p>
<ul>
<li>Updated clustering SPI to better match design and documentation.</li>
<li>Fixed multiple session-related NPEs that manifest under high load/latency in the cluster.</li>
</ul>
<p><b>1.3.3</b> -- December 16, 2014</p>
<p>Hazelcast update:</p>
<ul>
......
......@@ -24,6 +24,9 @@
</group>
<properties>
<property name="hazelcast.logging.type">slf4j</property>
<property name="hazelcast.operation.call.timeout.millis">30000</property>
<property name="hazelcast.memcache.enabled">false</property>
<property name="hazelcast.rest.enabled">false</property>
</properties>
<management-center enabled="false"/>
<network>
......
......@@ -5,7 +5,7 @@
<name>${plugin.name}</name>
<description>${plugin.description}</description>
<author>Tom Evans</author>
<version>1.3.3</version>
<date>12/16/2014</date>
<version>1.3.4</version>
<date>01/10/2015</date>
<minServerVersion>3.9.4</minServerVersion>
</plugin>
......@@ -98,7 +98,8 @@ public class RemoteClientSession extends RemoteSession implements ClientSession
}
else {
ClusterTask task = getRemoteSessionTask(RemoteSessionTask.Operation.isInitialized);
initialized = (Boolean) doSynchronousClusterTask(task) ? 1 : 0;
Object result = doSynchronousClusterTask(task);
initialized = result != null && (Boolean) result ? 1 : 0;
}
}
return initialized == 1;
......@@ -133,8 +134,9 @@ public class RemoteClientSession extends RemoteSession implements ClientSession
ClientSessionInfo sessionInfo = cache.get(getAddress().toString());
if (sessionInfo != null) {
return sessionInfo.getPresence();
}
return null;
}
// this can happen if a cluster node becomes unreachable
return new Presence(Presence.Type.unavailable);
}
public void setPresence(Presence presence) {
......
......@@ -25,6 +25,7 @@ import java.util.Date;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.cluster.ClusterNodeInfo;
import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.util.cache.CacheFactory;
import org.jivesoftware.util.cache.ClusterTask;
......@@ -177,8 +178,10 @@ public abstract class RemoteSession implements Session {
ClusterNodeInfo info = CacheFactory.getClusterNodeInfo(nodeID);
Object result = null;
if (info == null && task instanceof RemoteSessionTask) { // clean up invalid session
SessionManager.getInstance().removeSession(null,
((RemoteSessionTask)task).getSession().getAddress(), false, false);
Session remoteSession = ((RemoteSessionTask)task).getSession();
if (remoteSession instanceof ClientSession) {
SessionManager.getInstance().removeSession(null, remoteSession.getAddress(), false, false);
}
} else {
result = (info == null) ? null : CacheFactory.doSynchronousClusterTask(task, nodeID);
}
......@@ -193,8 +196,10 @@ public abstract class RemoteSession implements Session {
protected void doClusterTask(ClusterTask task) {
ClusterNodeInfo info = CacheFactory.getClusterNodeInfo(nodeID);
if (info == null && task instanceof RemoteSessionTask) { // clean up invalid session
SessionManager.getInstance().removeSession(null,
((RemoteSessionTask)task).getSession().getAddress(), false, false);
Session remoteSession = ((RemoteSessionTask)task).getSession();
if (remoteSession instanceof ClientSession) {
SessionManager.getInstance().removeSession(null, remoteSession.getAddress(), false, false);
}
} else {
CacheFactory.doClusterTask(task, nodeID);
}
......
......@@ -20,6 +20,7 @@
package com.jivesoftware.util.cache;
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
......@@ -291,8 +292,8 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
* Note that this method does not provide the result set for the given
* task, as the task is run asynchronously across the cluster.
*/
public boolean doClusterTask(final ClusterTask task, byte[] nodeID) {
if (cluster == null) { return false; }
public void doClusterTask(final ClusterTask task, byte[] nodeID) {
if (cluster == null) { return; }
Member member = getMember(nodeID);
// Check that the requested member was found
if (member != null) {
......@@ -300,10 +301,10 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
logger.debug("Executing asynchronous DistributedTask: " + task.getClass().getName());
hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMember(
new CallableTask<Object>(task), member);
return true;
} else {
logger.warn("Requested node " + StringUtils.getString(nodeID) + " not found in cluster");
return false;
String msg = MessageFormat.format("Requested node {0} not found in cluster", StringUtils.getString(nodeID));
logger.warn(msg);
throw new IllegalArgumentException(msg);
}
}
......@@ -369,7 +370,9 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
logger.error("Failed to execute cluster task", e);
}
} else {
logger.warn("Requested node " + StringUtils.getString(nodeID) + " not found in cluster");
String msg = MessageFormat.format("Requested node {0} not found in cluster", StringUtils.getString(nodeID));
logger.warn(msg);
throw new IllegalArgumentException(msg);
}
return result;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment