/**
 * $RCSfile$
 * $Revision: 3170 $
 * $Date: 2005-12-07 14:00:58 -0300 (Wed, 07 Dec 2005) $
 *
 * Copyright (C) 2005-2008 Jive Software. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.jivesoftware.openfire;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;

import org.dom4j.Element;
import org.jivesoftware.openfire.audit.AuditStreamIDFactory;
import org.jivesoftware.openfire.auth.AuthToken;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.cluster.ClusterEventListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.component.InternalComponentManager;
import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.disco.DiscoInfoProvider;
import org.jivesoftware.openfire.disco.DiscoItem;
import org.jivesoftware.openfire.disco.DiscoItemsProvider;
import org.jivesoftware.openfire.disco.DiscoServerItem;
import org.jivesoftware.openfire.disco.ServerItemsProvider;
import org.jivesoftware.openfire.event.SessionEventDispatcher;
import org.jivesoftware.openfire.http.HttpConnection;
import org.jivesoftware.openfire.http.HttpSession;
import org.jivesoftware.openfire.multiplex.ConnectionMultiplexerManager;
import org.jivesoftware.openfire.server.OutgoingSessionPromise;
import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.session.ClientSessionInfo;
import org.jivesoftware.openfire.session.ComponentSession;
import org.jivesoftware.openfire.session.ConnectionMultiplexerSession;
import org.jivesoftware.openfire.session.GetSessionsCountTask;
import org.jivesoftware.openfire.session.IncomingServerSession;
import org.jivesoftware.openfire.session.LocalClientSession;
import org.jivesoftware.openfire.session.LocalComponentSession;
import org.jivesoftware.openfire.session.LocalConnectionMultiplexerSession;
import org.jivesoftware.openfire.session.LocalIncomingServerSession;
import org.jivesoftware.openfire.session.LocalOutgoingServerSession;
import org.jivesoftware.openfire.session.OutgoingServerSession;
import org.jivesoftware.openfire.session.RemoteSessionLocator;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.spi.BasicStreamIDFactory;
import org.jivesoftware.openfire.streammanagement.StreamManager;
import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.XMPPDateTimeFormat;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.forms.DataForm;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import org.xmpp.packet.Presence;

/**
 * Manages the sessions associated with an account. The information
 * maintained by the Session manager is entirely transient and does
 * not need to be preserved between server restarts.
 *
 * @author Derek DeMoro
 */
public class SessionManager extends BasicModule implements ClusterEventListener/*, ServerItemsProvider, DiscoInfoProvider, DiscoItemsProvider */{

	private static final Logger Log = LoggerFactory.getLogger(SessionManager.class);

    public static final String COMPONENT_SESSION_CACHE_NAME = "Components Sessions";
    public static final String CM_CACHE_NAME = "Connection Managers Sessions";
    public static final String ISS_CACHE_NAME = "Incoming Server Sessions";
    public static final String C2S_INFO_CACHE_NAME = "Client Session Info Cache";

    public static final int NEVER_KICK = -1;

    private XMPPServer server;
    private PacketRouter router;
    private String serverName;
    private JID serverAddress;
    private UserManager userManager;
    private int conflictLimit;

    /**
     * Counter of user connections. A connection is counted just after it was created and not
     * after the user became available. This counter only considers sessions local to this JVM.
     * That means that when running inside of a cluster you will need to add up this counter
     * for each cluster node.
     */
    private final AtomicInteger connectionsCounter = new AtomicInteger(0);

    /**
     * Cache (unlimited, never expire) that holds information about client sessions (as soon as
     * a resource has been bound). The cache is used by Remote sessions to avoid generating big
     * number of remote calls.
     * Key: full JID, Value: ClientSessionInfo
     */
    private Cache<String, ClientSessionInfo> sessionInfoCache;

    /**
     * Cache (unlimited, never expire) that holds external component sessions.
     * Key: component address, Value: nodeID
     */
    private Cache<String, byte[]> componentSessionsCache;

    /**
     * Cache (unlimited, never expire) that holds sessions of connection managers. For each
     * socket connection of the CM to the server there is going to be an entry in the cache.
     * Key: full address of the CM that identifies the socket, Value: nodeID
     */
    private Cache<String, byte[]> multiplexerSessionsCache;

    /**
     * Cache (unlimited, never expire) that holds incoming sessions of remote servers.
     * Key: stream ID that identifies the socket/session, Value: nodeID
     */
    private Cache<String, byte[]> incomingServerSessionsCache;
    /**
     * Cache (unlimited, never expire) that holds list of incoming sessions
     * originated from the same remote server (domain/subdomain). For instance, jabber.org
     * may have 2 connections to the server running in jivesoftware.com (one socket to
     * jivesoftware.com and the other socket to conference.jivesoftware.com).
     * Key: remote hostname (domain/subdomain), Value: list of stream IDs that identify each socket.
     */
    private Cache<String, List<String>> hostnameSessionsCache;

    /**
     * Cache (unlimited, never expire) that holds domains, subdomains and virtual
     * hostnames of the remote server that were validated with this server for each
     * incoming server session.
     * Key: stream ID, Value: Domains and subdomains of the remote server that were
     * validated with this server.<p>
     *
     * This same information is stored in {@link LocalIncomingServerSession} but the
     * reason for this duplication is that when running in a cluster other nodes
     * will have access to this clustered cache even in the case of this node going
     * down.
     */
    private Cache<String, Set<String>> validatedDomainsCache;

    private ClientSessionListener clientSessionListener = new ClientSessionListener();
    private ComponentSessionListener componentSessionListener = new ComponentSessionListener();
    private IncomingServerSessionListener incomingServerListener = new IncomingServerSessionListener();
    private OutgoingServerSessionListener outgoingServerListener = new OutgoingServerSessionListener();
    private ConnectionMultiplexerSessionListener multiplexerSessionListener = new ConnectionMultiplexerSessionListener();

    /**
     * Local session manager responsible for keeping sessions connected to this JVM that are not
     * present in the routing table.
     */
    private LocalSessionManager localSessionManager;
    /**
     * <p>Session manager must maintain the routing table as sessions are added and
     * removed.</p>
     */
    private RoutingTable routingTable;

    private StreamIDFactory streamIDFactory;

    /**
     * Returns the instance of <CODE>SessionManagerImpl</CODE> being used by the XMPPServer.
     *
     * @return the instance of <CODE>SessionManagerImpl</CODE> being used by the XMPPServer.
     */
    public static SessionManager getInstance() {
        return XMPPServer.getInstance().getSessionManager();
    }

    public SessionManager() {
        super("Session Manager");
        if (JiveGlobals.getBooleanProperty("xmpp.audit.active")) {
            streamIDFactory = new AuditStreamIDFactory();
        }
        else {
            streamIDFactory = new BasicStreamIDFactory();
        }
        localSessionManager = new LocalSessionManager();
        conflictLimit = JiveGlobals.getIntProperty("xmpp.session.conflict-limit", 0);
    }

    /**
     * Returns the session originated from the specified address or <tt>null</tt> if none was
     * found. The specified address MUST contain a resource that uniquely identifies the session.
     *
     * A single connection manager should connect to the same node.
     *
     * @param address the address of the connection manager (including resource that identifies specific socket)
     * @return the session originated from the specified address.
     */
    public ConnectionMultiplexerSession getConnectionMultiplexerSession(JID address) {
        // Search in the list of CMs connected to this JVM
        LocalConnectionMultiplexerSession session =
                localSessionManager.getConnnectionManagerSessions().get(address.toString());
        if (session == null && server.getRemoteSessionLocator() != null) {
            // Search in the list of CMs connected to other cluster members
            byte[] nodeID = multiplexerSessionsCache.get(address.toString());
            if (nodeID != null) {
                return server.getRemoteSessionLocator().getConnectionMultiplexerSession(nodeID, address);
            }
        }
        return null;
    }

    /**
     * Returns all sessions originated from connection managers.
     *
     * @return all sessions originated from connection managers.
     */
    public List<ConnectionMultiplexerSession> getConnectionMultiplexerSessions() {
        List<ConnectionMultiplexerSession> sessions = new ArrayList<>();
        // Add sessions of CMs connected to this JVM
        sessions.addAll(localSessionManager.getConnnectionManagerSessions().values());
        // Add sessions of CMs connected to other cluster nodes
        RemoteSessionLocator locator = server.getRemoteSessionLocator();
        if (locator != null) {
            for (Map.Entry<String, byte[]> entry : multiplexerSessionsCache.entrySet()) {
                if (!server.getNodeID().equals(entry.getValue())) {
                    sessions.add(locator.getConnectionMultiplexerSession(entry.getValue(), new JID(entry.getKey())));
                }
            }
        }
        return sessions;
    }

    /**
     * Returns a collection with all the sessions originated from the connection manager
     * whose domain matches the specified domain. If there is no connection manager with
     * the specified domain then an empty list is going to be returned.
     *
     * @param domain the domain of the connection manager.
     * @return a collection with all the sessions originated from the connection manager
     *         whose domain matches the specified domain.
     */
    public List<ConnectionMultiplexerSession> getConnectionMultiplexerSessions(String domain) {
        List<ConnectionMultiplexerSession> sessions = new ArrayList<>();
        // Add sessions of CMs connected to this JVM
        for (String address : localSessionManager.getConnnectionManagerSessions().keySet()) {
            JID jid = new JID(address);
            if (domain.equals(jid.getDomain())) {
                sessions.add(localSessionManager.getConnnectionManagerSessions().get(address));
            }
        }
        // Add sessions of CMs connected to other cluster nodes
        RemoteSessionLocator locator = server.getRemoteSessionLocator();
        if (locator != null) {
            for (Map.Entry<String, byte[]> entry : multiplexerSessionsCache.entrySet()) {
                if (!server.getNodeID().equals(entry.getValue())) {
                    JID jid = new JID(entry.getKey());
                    if (domain.equals(jid.getDomain())) {
                        sessions.add(
                                locator.getConnectionMultiplexerSession(entry.getValue(), new JID(entry.getKey())));
                    }
                }
            }
        }
        return sessions;
    }

    /**
     * Creates a new <tt>ConnectionMultiplexerSession</tt>.
     *
     * @param conn the connection to create the session from.
     * @param address the JID (may include a resource) of the connection manager's session.
     * @return a newly created session.
     */
    public LocalConnectionMultiplexerSession createMultiplexerSession(Connection conn, JID address) {
        if (serverName == null) {
            throw new IllegalStateException("Server not initialized");
        }
        StreamID id = nextStreamID();
        LocalConnectionMultiplexerSession session = new LocalConnectionMultiplexerSession(serverName, conn, id);
        conn.init(session);
        // Register to receive close notification on this session so we can
        // figure out when users that were using this connection manager may become unavailable
        conn.registerCloseListener(multiplexerSessionListener, session);

        // Add to connection multiplexer session.
        boolean firstConnection = getConnectionMultiplexerSessions(address.getDomain()).isEmpty();
        localSessionManager.getConnnectionManagerSessions().put(address.toString(), session);
        // Keep track of the cluster node hosting the new CM connection
        multiplexerSessionsCache.put(address.toString(), server.getNodeID().toByteArray());
        if (firstConnection) {
            // Notify ConnectionMultiplexerManager that a new connection manager
            // is available
            ConnectionMultiplexerManager.getInstance().multiplexerAvailable(address.getDomain());
        }
        return session;
    }

    /**
     * Returns a randomly created ID to be used in a stream element.
     *
     * @return a randomly created ID to be used in a stream element.
     */
    public StreamID nextStreamID() {
        return streamIDFactory.createStreamID();
    }

    /**
     * Creates a new <tt>ClientSession</tt>. The new Client session will have a newly created
     * stream ID.
     *
     * @param conn the connection to create the session from.
     * @param language The language to use for the new session.
     * @return a newly created session.
     */
    public LocalClientSession createClientSession(Connection conn, Locale language) {
        return createClientSession(conn, nextStreamID(), language);
    }

    /**
     * Creates a new <tt>ClientSession</tt> with the specified streamID.
     *
     * @param conn the connection to create the session from.
     * @param id the streamID to use for the new session.
     * @return a newly created session.
     */
    public LocalClientSession createClientSession(Connection conn, StreamID id) {
        return createClientSession( conn, id, null);
    }

    /**
     * Creates a new <tt>ClientSession</tt> with the specified streamID.
     *
     * @param conn the connection to create the session from.
     * @param id the streamID to use for the new session.
     * @param language The language to use for the new session.
     * @return a newly created session.
     */
    public LocalClientSession createClientSession(Connection conn, StreamID id, Locale language) {
        if (serverName == null) {
            throw new IllegalStateException("Server not initialized");
        }
        LocalClientSession session = new LocalClientSession(serverName, conn, id, language);
        conn.init(session);
        // Register to receive close notification on this session so we can
        // remove  and also send an unavailable presence if it wasn't
        // sent before
        conn.registerCloseListener(clientSessionListener, session);

        // Add to pre-authenticated sessions.
        localSessionManager.getPreAuthenticatedSessions().put(session.getAddress().getResource(), session);
        // Increment the counter of user sessions
        connectionsCounter.incrementAndGet();
        return session;
    }

    /**
     * Creates a new <tt>ClientSession</tt> with the specified streamID.
     *
     * @param conn the connection to create the session from.
     * @param id the streamID to use for the new session.
     * @return a newly created session.
     */
    public HttpSession createClientHttpSession(long rid, InetAddress address, StreamID id, HttpConnection connection, Locale language)
            throws UnauthorizedException {
        if (serverName == null) {
            throw new UnauthorizedException("Server not initialized");
        }
        PacketDeliverer backupDeliverer = server.getPacketDeliverer();
        HttpSession session = new HttpSession(backupDeliverer, serverName, address, id, rid, connection, language);
        Connection conn = session.getConnection();
        conn.init(session);
        conn.registerCloseListener(clientSessionListener, session);
        localSessionManager.getPreAuthenticatedSessions().put(session.getAddress().getResource(), session);
        connectionsCounter.incrementAndGet();
        return session;
    }

    public LocalComponentSession createComponentSession(JID address, Connection conn) {
        if (serverName == null) {
            throw new IllegalStateException("Server not initialized");
        }
        StreamID id = nextStreamID();
        LocalComponentSession session = new LocalComponentSession(serverName, conn, id);
        conn.init(session);
        // Register to receive close notification on this session so we can
        // remove the external component from the list of components
        conn.registerCloseListener(componentSessionListener, session);
        // Set the bind address as the address of the session
        session.setAddress(address);

        // Add to component session.
        localSessionManager.getComponentsSessions().add(session);
        // Keep track of the cluster node hosting the new external component
        componentSessionsCache.put(address.toString(), server.getNodeID().toByteArray());
        return session;
    }

    /**
     * Creates a session for a remote server. The session should be created only after the
     * remote server has been authenticated either using "server dialback" or SASL.
     *
     * @param conn the connection to the remote server.
     * @param id the stream ID used in the stream element when authenticating the server.
     * @return the newly created {@link IncomingServerSession}.
     * @throws UnauthorizedException if the local server has not been initialized yet.
     */
    public LocalIncomingServerSession createIncomingServerSession(Connection conn, StreamID id, String fromDomain)
            throws UnauthorizedException {
        if (serverName == null) {
            throw new UnauthorizedException("Server not initialized");
        }
        LocalIncomingServerSession session = new LocalIncomingServerSession(serverName, conn, id, fromDomain);
        conn.init(session);
        // Register to receive close notification on this session so we can
        // remove its route from the sessions set
        conn.registerCloseListener(incomingServerListener, session);

        return session;
    }

    /**
     * Notification message that a new OutgoingServerSession has been created. Register a listener
     * that will react when the connection gets closed.
     *
     * @param session the newly created OutgoingServerSession.
     */
    public void outgoingServerSessionCreated(LocalOutgoingServerSession session) {
        // Register to receive close notification on this session so we can
        // remove its route from the sessions set
        session.getConnection().registerCloseListener(outgoingServerListener, session);
    }

    /**
     * Registers that a server session originated by a remote server is hosting a given hostname.
     * Notice that the remote server may be hosting several subdomains as well as virtual hosts so
     * the same IncomingServerSession may be associated with many keys. If the remote server
     * creates many sessions to this server (eg. one for each subdomain) then associate all
     * the sessions with the originating server that created all the sessions.
     *
     * @param hostname the hostname that is being served by the remote server.
     * @param session the incoming server session to the remote server.
     */
    public void registerIncomingServerSession(String hostname, LocalIncomingServerSession session) {
        // Keep local track of the incoming server session connected to this JVM
        String streamID = session.getStreamID().getID();
        localSessionManager.addIncomingServerSessions(streamID, session);
        // Keep track of the nodeID hosting the incoming server session
        incomingServerSessionsCache.put(streamID, server.getNodeID().toByteArray());
        // Update list of sockets/sessions coming from the same remote hostname
        Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
        try {
            lock.lock();
            List<String> streamIDs = hostnameSessionsCache.get(hostname);
            if (streamIDs == null) {
                streamIDs = new ArrayList<>();
            }
            streamIDs.add(streamID);
            hostnameSessionsCache.put(hostname, streamIDs);
        }
        finally {
            lock.unlock();
        }
        // Add to clustered cache
        lock = CacheFactory.getLock(streamID, validatedDomainsCache);
        try {
            lock.lock();
            Set<String> validatedDomains = validatedDomainsCache.get(streamID);
            if (validatedDomains == null) {
                validatedDomains = new HashSet<>();
            }
            boolean added = validatedDomains.add(hostname);
            if (added) {
                validatedDomainsCache.put(streamID, validatedDomains);
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * Unregisters the specified remote server session originiated by the specified remote server.
     *
     * @param hostname the hostname that is being served by the remote server.
     * @param session the session to unregiser.
     */
    public void unregisterIncomingServerSession(String hostname, IncomingServerSession session) {
        // Remove local track of the incoming server session connected to this JVM
        String streamID = session.getStreamID().getID();
        localSessionManager.removeIncomingServerSessions(streamID);
        // Remove track of the nodeID hosting the incoming server session
        incomingServerSessionsCache.remove(streamID);

        // Remove from list of sockets/sessions coming from the remote hostname
        Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
        try {
            lock.lock();
            List<String> streamIDs = hostnameSessionsCache.get(hostname);
            if (streamIDs != null) {
                streamIDs.remove(streamID);
                if (streamIDs.isEmpty()) {
                    hostnameSessionsCache.remove(hostname);
                }
                else {
                    hostnameSessionsCache.put(hostname, streamIDs);
                }
            }
        }
        finally {
            lock.unlock();
        }
        // Remove from clustered cache
        lock = CacheFactory.getLock(streamID, validatedDomainsCache);
        try {
            lock.lock();
            Set<String> validatedDomains = validatedDomainsCache.get(streamID);
            if (validatedDomains == null) {
                validatedDomains = new HashSet<>();
            }
            validatedDomains.remove(hostname);
            if (!validatedDomains.isEmpty()) {
                validatedDomainsCache.put(streamID, validatedDomains);
            }
            else {
                validatedDomainsCache.remove(streamID);
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * Returns a collection with all the domains, subdomains and virtual hosts that where
     * validated. The remote server is allowed to send packets from any of these domains,
     * subdomains and virtual hosts.<p>
     *
     * Content is stored in a clustered cache so that even in the case of the node hosting
     * the sessions is lost we can still have access to this info to be able to perform
     * proper clean up logic.
     *
     * @param streamID id that uniquely identifies the session.
     * @return domains, subdomains and virtual hosts that where validated.
     */
    public Collection<String> getValidatedDomains(String streamID) {
        Lock lock = CacheFactory.getLock(streamID, validatedDomainsCache);
        try {
            lock.lock();
            Set<String> validatedDomains = validatedDomainsCache.get(streamID);
            if (validatedDomains == null) {
                return Collections.emptyList();
            }
            return Collections.unmodifiableCollection(validatedDomains);
        } finally {
            lock.unlock();
        }
    }

    /**
     * Add a new session to be managed. The session has been authenticated and resource
     * binding has been done.
     *
     * @param session the session that was authenticated.
     */
    public void addSession(LocalClientSession session) {
        // Add session to the routing table (routing table will know session is not available yet)
        routingTable.addClientRoute(session.getAddress(), session);
        // Remove the pre-Authenticated session but remember to use the temporary ID as the key
        localSessionManager.getPreAuthenticatedSessions().remove(session.getStreamID().toString());
        SessionEventDispatcher.EventType event = session.getAuthToken().isAnonymous() ?
                SessionEventDispatcher.EventType.anonymous_session_created :
                SessionEventDispatcher.EventType.session_created;
        // Fire session created event.
        SessionEventDispatcher.dispatchEvent(session, event);
        if (ClusterManager.isClusteringStarted()) {
            // Track information about the session and share it with other cluster nodes
            sessionInfoCache.put(session.getAddress().toString(), new ClientSessionInfo(session));
        }
    }

    /**
     * Notification message sent when a client sent an available presence for the session. Making
     * the session available means that the session is now eligible for receiving messages from
     * other clients. Sessions whose presence is not available may only receive packets (IQ packets)
     * from the server. Therefore, an unavailable session remains invisible to other clients.
     *
     * @param session the session that receieved an available presence.
     * @param presence the presence for the session.
     */
    public void sessionAvailable(LocalClientSession session, Presence presence) {
        if (session.getAuthToken().isAnonymous()) {
            // Anonymous session always have resources so we only need to add one route. That is
            // the route to the anonymous session
            routingTable.addClientRoute(session.getAddress(), session);
        }
        else {
            // A non-anonymous session is now available
            // Add route to the new session
            routingTable.addClientRoute(session.getAddress(), session);
            // Broadcast presence between the user's resources
            broadcastPresenceOfOtherResource(session);

            // RFC 6121 ยง 4.4.2.
            // The user's server MUST also send the presence stanza to all of the user's available resources (including the resource that generated the presence notification in the first place).
            Presence selfPresence = presence.createCopy();
            selfPresence.setTo(session.getAddress());
            routingTable.routePacket(session.getAddress(), selfPresence, false);
        }
    }

    /**
     * Returns true if the session should broadcast presences to all other resources for the
     * current client. When disabled it is not possible to broadcast presence packets to another
     * resource of the connected user. This is desirable if you have a use case where you have
     * many resources attached to the same user account.
     *
     * @return true if presence should be broadcast to other resources of the same account
     */
    public static boolean isOtherResourcePresenceEnabled() {
        return JiveGlobals.getBooleanProperty("xmpp.client.other-resource.presence", true);
    }

    /**
     * Sends the presences of other connected resources to the resource that just connected.
     *
     * @param session the newly created session.
     */
    private void broadcastPresenceOfOtherResource(LocalClientSession session) {
        if (!SessionManager.isOtherResourcePresenceEnabled()) {
            return;
        }
        Presence presence;
        // Get list of sessions of the same user
        JID searchJID = new JID(session.getAddress().getNode(), session.getAddress().getDomain(), null);
        List<JID> addresses = routingTable.getRoutes(searchJID, null);
        for (JID address : addresses) {
            if (address.equals(session.getAddress())) {
                continue;
            }
            // Send the presence of an existing session to the session that has just changed
            // the presence
            ClientSession userSession = routingTable.getClientRoute(address);
            presence = userSession.getPresence().createCopy();
            presence.setTo(session.getAddress());
            session.process(presence);
        }
    }

    /**
     * Broadcasts presence updates from the originating user's resource to any of the user's
     * existing available resources (if any).
     *
     * @param originatingResource the full JID of the session that sent the presence update.
     * @param presence the presence.
     */
    public void broadcastPresenceToOtherResources(JID originatingResource, Presence presence) {
        if (!SessionManager.isOtherResourcePresenceEnabled()) {
            return;
        }
        // Get list of sessions of the same user
        JID searchJID = new JID(originatingResource.getNode(), originatingResource.getDomain(), null);
        List<JID> addresses = routingTable.getRoutes(searchJID, null);
        for (JID address : addresses) {
        	if (!originatingResource.equals(address)) {
	            // Send the presence of the session whose presence has changed to
	            // this user's other session(s)
	            presence.setTo(address);
	            routingTable.routePacket(address, presence, false);
        	}
        }
    }

    /**
     * Notification message sent when a client sent an unavailable presence for the session. Making
     * the session unavailable means that the session is not eligible for receiving messages from
     * other clients.
     *
     * @param session the session that received an unavailable presence.
     */
    public void sessionUnavailable(LocalClientSession session) {
        if (session.getAddress() != null && routingTable != null &&
                session.getAddress().toBareJID().trim().length() != 0) {
            // Update route to unavailable session (anonymous or not)
            routingTable.addClientRoute(session.getAddress(), session);
        }
    }

    /**
     * Change the priority of a session, that was already available, associated with the sender.
     *
     * @param session   The session whose presence priority has been modified
     * @param oldPriority The old priority for the session
     */
    public void changePriority(LocalClientSession session, int oldPriority) {
        if (session.getAuthToken().isAnonymous()) {
            // Do nothing if the session belongs to an anonymous user
            return;
        }
        int newPriority = session.getPresence().getPriority();
        if (newPriority < 0 || oldPriority >= 0) {
            // Do nothing if new presence priority is not positive and old presence negative
            return;
        }

        // Check presence's priority of other available resources
        JID searchJID = session.getAddress().asBareJID();
        for (JID address : routingTable.getRoutes(searchJID, null)) {
            if (address.equals(session.getAddress())) {
                continue;
            }
            ClientSession otherSession = routingTable.getClientRoute(address);
            if (otherSession.getPresence().getPriority() >= 0) {
                return;
            }
        }

        // User sessions had negative presence before this change so deliver messages
        if (session.canFloodOfflineMessages()) {
            OfflineMessageStore messageStore = server.getOfflineMessageStore();
            Collection<OfflineMessage> messages = messageStore.getMessages(session.getAuthToken().getUsername(), true);
            for (Message message : messages) {
                session.process(message);
            }
        }
    }

    public boolean isAnonymousRoute(String username) {
        // JID's node and resource are the same for anonymous sessions
        return isAnonymousRoute(new JID(username, serverName, username, true));
    }

    public boolean isAnonymousRoute(JID address) {
        // JID's node and resource are the same for anonymous sessions
        return routingTable.isAnonymousRoute(address);
    }

    public boolean isActiveRoute(String username, String resource) {
        boolean hasRoute = false;
        Session session = routingTable.getClientRoute(new JID(username, serverName, resource));
        // Makes sure the session is still active
        if (session != null && !session.isClosed()) {
            hasRoute = session.validate();
        }

        return hasRoute;
    }

    /**
     * Returns the session responsible for this JID data. The returned Session may have never sent
     * an available presence (thus not have a route) or could be a Session that hasn't
     * authenticated yet (i.e. preAuthenticatedSessions).
     *
     * @param from the sender of the packet.
     * @return the <code>Session</code> associated with the JID.
     */
    public ClientSession getSession(JID from) {
        // Return null if the JID is null or belongs to a foreign server. If the server is
        // shutting down then serverName will be null so answer null too in this case.
        if (from == null || serverName == null || !serverName.equals(from.getDomain())) {
            return null;
        }

        // Initially Check preAuthenticated Sessions
        if (from.getResource() != null) {
            ClientSession session = localSessionManager.getPreAuthenticatedSessions().get(from.getResource());
            if (session != null) {
                return session;
            }
        }

        if (from.getResource() == null || from.getNode() == null) {
            return null;
        }

        return routingTable.getClientRoute(from);
    }

    /**
     * Returns a list that contains all authenticated client sessions connected to the server.
     * The list contains sessions of anonymous and non-anonymous users.
     *
     * @return a list that contains all client sessions connected to the server.
     */
    public Collection<ClientSession> getSessions() {
        return routingTable.getClientsRoutes(false);
    }


    public Collection<ClientSession> getSessions(SessionResultFilter filter) {
        List<ClientSession> results = new ArrayList<>();
        if (filter != null) {
            // Grab all the matching sessions
            results.addAll(getSessions());

            // Now we have a copy of the references so we can spend some time
            // doing the rest of the filtering without locking out session access
            // so let's iterate and filter each session one by one
            List<ClientSession> filteredResults = new ArrayList<>();
            for (ClientSession session : results) {
                // Now filter on creation date if needed
                filteredResults.add(session);
            }

            // Sort list.
            Collections.sort(filteredResults, filter.getSortComparator());

            int maxResults = filter.getNumResults();
            if (maxResults == SessionResultFilter.NO_RESULT_LIMIT) {
                maxResults = filteredResults.size();
            }

            // Now generate the final list. I believe it's faster to to build up a new
            // list than it is to remove items from head and tail of the sorted tree
            List<ClientSession> finalResults = new ArrayList<>();
            int startIndex = filter.getStartIndex();
            Iterator<ClientSession> sortedIter = filteredResults.iterator();
            for (int i = 0; sortedIter.hasNext() && finalResults.size() < maxResults; i++) {
                ClientSession result = sortedIter.next();
                if (i >= startIndex) {
                    finalResults.add(result);
                }
            }
            return finalResults;
        }
        return results;
    }

    /**
     * Returns the incoming server session hosted by this JVM that matches the specified stream ID.
     *
     * @param streamID the stream ID that identifies the incoming server session hosted by this JVM.
     * @return the incoming server session hosted by this JVM or null if none was found.
     */
    public LocalIncomingServerSession getIncomingServerSession(String streamID) {
        return localSessionManager.getIncomingServerSession(streamID);
    }

    /**
     * Returns the list of sessions that were originated by a remote server. The list will be
     * ordered chronologically.  IncomingServerSession can only receive packets from the remote
     * server but are not capable of sending packets to the remote server.
     *
     * @param hostname the name of the remote server.
     * @return the sessions that were originated by a remote server.
     */
    public List<IncomingServerSession> getIncomingServerSessions(String hostname) {
        List<String> streamIDs;
        // Get list of sockets/sessions coming from the remote hostname
        Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
        try {
            lock.lock();
            streamIDs = hostnameSessionsCache.get(hostname);
        }
        finally {
            lock.unlock();
        }

        if (streamIDs == null) {
            return Collections.emptyList();
        }
        else {
            // Collect the sessions associated to the found stream IDs
            List<IncomingServerSession> sessions = new ArrayList<>();
            for (String streamID : streamIDs) {
                // Search in local hosted sessions
                IncomingServerSession session = localSessionManager.getIncomingServerSession(streamID);
                RemoteSessionLocator locator = server.getRemoteSessionLocator();
                if (session == null && locator != null) {
                    // Get the node hosting this session
                    byte[] nodeID = incomingServerSessionsCache.get(streamID);
                    if (nodeID != null) {
                        session = locator.getIncomingServerSession(nodeID, streamID);
                    }
                }
                if (session != null) {
                    sessions.add(session);
                }
            }
            return sessions;
        }
    }

    /**
     * Returns a session that was originated from this server to a remote server.
     * OutgoingServerSession an only send packets to the remote server but are not capable of
     * receiving packets from the remote server.
     *
     * @param hostname the name of the remote server.
     * @return a session that was originated from this server to a remote server.
     */
    public OutgoingServerSession getOutgoingServerSession(String hostname) {
        return routingTable.getServerRoute(new JID(null, hostname, null, true));
    }

    public Collection<ClientSession> getSessions(String username) {
        List<ClientSession> sessionList = new ArrayList<>();
        if (username != null && serverName != null) {
            List<JID> addresses = routingTable.getRoutes(new JID(username, serverName, null, true), null);
            for (JID address : addresses) {
                sessionList.add(routingTable.getClientRoute(address));
            }
        }
        return sessionList;
    }

    /**
     * Returns number of client sessions that are connected to the server. Sessions that
     * are authenticated and not authenticated will be included
     *
     * @param onlyLocal true if only sessions connected to this JVM will be considered. Otherwise count cluster wise.
     * @return number of client sessions that are connected to the server.
     */
    public int getConnectionsCount(boolean onlyLocal) {
        int total = connectionsCounter.get();
        if (!onlyLocal) {
            Collection<Object> results =
                    CacheFactory.doSynchronousClusterTask(new GetSessionsCountTask(false), false);
            for (Object result : results) {
                if (result == null) {
                    continue;
                }
                total = total + (Integer) result;
            }
        }
        return total;
    }

    /**
     * Returns number of client sessions that are authenticated with the server. This includes
     * anonymous and non-anoymous users.
     *
     * @param onlyLocal true if only sessions connected to this JVM will be considered. Otherwise count cluster wise.
     * @return number of client sessions that are authenticated with the server.
     */
    public int getUserSessionsCount(boolean onlyLocal) {
        int total = routingTable.getClientsRoutes(true).size();
        if (!onlyLocal) {
            Collection<Object> results =
                    CacheFactory.doSynchronousClusterTask(new GetSessionsCountTask(true), false);
            for (Object result : results) {
                if (result == null) {
                    continue;
                }
                total = total + (Integer) result;
            }
        }
        return total;
    }

    /**
     * Returns number of sessions coming from remote servers. <i>Current implementation is only counting
     * sessions connected to this JVM and not adding up sessions connected to other cluster nodes.</i>
     *
     * @param onlyLocal true if only sessions connected to this JVM will be considered. Otherwise count cluster wise.
     * @return number of sessions coming from remote servers.
     */
    public int getIncomingServerSessionsCount(boolean onlyLocal) {
        int total = localSessionManager.getIncomingServerSessions().size();
        if (!onlyLocal) {
            // TODO Implement this when needed
        }
        return total;
    }

    /**
     * Returns the number of sessions for a user that are available. For the count
     * of all sessions for the user, including sessions that are just starting
     * or closed.
     *
     * @see #getConnectionsCount(boolean)
     * @param username the user.
     * @return number of available sessions for a user.
     */
    public int getActiveSessionCount(String username) {
        return routingTable.getRoutes(new JID(username, serverName, null, true), null).size();
    }

    public int getSessionCount(String username) {
        // TODO Count ALL sessions not only available
        return routingTable.getRoutes(new JID(username, serverName, null, true), null).size();
    }

    /**
     * Returns a collection with the established sessions from external components.
     *
     * @return a collection with the established sessions from external components.
     */
    public Collection<ComponentSession> getComponentSessions() {
        List<ComponentSession> sessions = new ArrayList<>();
        // Add sessions of external components connected to this JVM
        sessions.addAll(localSessionManager.getComponentsSessions());
        // Add sessions of external components connected to other cluster nodes
        RemoteSessionLocator locator = server.getRemoteSessionLocator();
        if (locator != null) {
            for (Map.Entry<String, byte[]> entry : componentSessionsCache.entrySet()) {
                if (!server.getNodeID().equals(entry.getValue())) {
                    sessions.add(locator.getComponentSession(entry.getValue(), new JID(entry.getKey())));
                }
            }
        }
        return sessions;
    }

    /**
     * Returns the session of the component whose domain matches the specified domain.
     *
     * @param domain the domain of the component session to look for.
     * @return the session of the component whose domain matches the specified domain.
     */
    public ComponentSession getComponentSession(String domain) {
        // Search in the external components connected to this JVM
        for (ComponentSession session : localSessionManager.getComponentsSessions()) {
            if (domain.equals(session.getAddress().getDomain())) {
                return session;
            }
        }
        // Search in the external components connected to other cluster nodes
        RemoteSessionLocator locator = server.getRemoteSessionLocator();
        if (locator != null) {
            byte[] nodeID = componentSessionsCache.get(domain);
            if (nodeID != null) {
                return locator.getComponentSession(nodeID, new JID(domain));
            }
        }
        return null;
    }

    /**
     * Returns a collection with the hostnames of the remote servers that currently have an
     * incoming server connection to this server.
     *
     * @return a collection with the hostnames of the remote servers that currently have an
     *         incoming server connection to this server.
     */
    public Collection<String> getIncomingServers() {
        return hostnameSessionsCache.keySet();
    }

    /**
     * Returns a collection with the hostnames of the remote servers that currently may receive
     * packets sent from this server.
     *
     * @return a collection with the hostnames of the remote servers that currently may receive
     *         packets sent from this server.
     */
    public Collection<String> getOutgoingServers() {
        return routingTable.getServerHostnames();
    }

    /**
     * Broadcasts the given data to all connected sessions. Excellent
     * for server administration messages.
     *
     * @param packet the packet to be broadcast.
     */
    public void broadcast(Message packet) {
        routingTable.broadcastPacket(packet, false);
    }

    /**
     * Broadcasts the given data to all connected sessions for a particular
     * user. Excellent for updating all connected resources for users such as
     * roster pushes.
     *
     * @param username the user to send the boradcast to.
     * @param packet the packet to be broadcast.
     * @throws PacketException if a packet exception occurs.
     */
    public void userBroadcast(String username, Packet packet) throws PacketException {
        // TODO broadcast to ALL sessions of the user and not only available
        for (JID address : routingTable.getRoutes(new JID(username, serverName, null), null)) {
            packet.setTo(address);
            routingTable.routePacket(address, packet, true);
        }
    }

    /**
     * Removes a session.
     *
     * @param session the session.
     * @return true if the requested session was successfully removed.
     */
    public boolean removeSession(LocalClientSession session) {
        // Do nothing if session is null or if the server is shutting down. Note: When the server
        // is shutting down the serverName will be null.
        if (session == null || serverName == null) {
            return false;
        }

        AuthToken authToken = session.getAuthToken();
        // Consider session anonymous (for this matter) if we are closing a session that never authenticated
        boolean anonymous = authToken == null || authToken.isAnonymous();
        return removeSession(session, session.getAddress(), anonymous, false);
    }

    /**
     * Removes a session.
     *
     * @param session the session or null when session is derived from fullJID.
     * @param fullJID the address of the session.
     * @param anonymous true if the authenticated user is anonymous.
     * @param forceUnavailable true if an unavailable presence must be created and routed.
     * @return true if the requested session was successfully removed.
     */
    public boolean removeSession(ClientSession session, JID fullJID, boolean anonymous, boolean forceUnavailable) {
        // Do nothing if server is shutting down. Note: When the server
        // is shutting down the serverName will be null.
        if (serverName == null) {
            return false;
        }

        if (session == null) {
            session = getSession(fullJID);
        }

        // Remove route to the removed session (anonymous or not)
        boolean removed = routingTable.removeClientRoute(fullJID);

        if (removed) {
            // Fire session event.
            if (anonymous) {
                SessionEventDispatcher
                        .dispatchEvent(session, SessionEventDispatcher.EventType.anonymous_session_destroyed);
            }
            else {
                SessionEventDispatcher.dispatchEvent(session, SessionEventDispatcher.EventType.session_destroyed);

            }
        }

        // Remove the session from the pre-Authenticated sessions list (if present)
        boolean preauth_removed =
                localSessionManager.getPreAuthenticatedSessions().remove(fullJID.getResource()) != null;
        // If the user is still available then send an unavailable presence
        if (forceUnavailable || session.getPresence().isAvailable()) {
            Presence offline = new Presence();
            offline.setFrom(fullJID);
            offline.setTo(new JID(null, serverName, null, true));
            offline.setType(Presence.Type.unavailable);
            router.route(offline);
        }

        // Stop tracking information about the session and share it with other cluster nodes
        sessionInfoCache.remove(fullJID.toString());

        if (removed || preauth_removed) {
            // Decrement the counter of user sessions
            connectionsCounter.decrementAndGet();
            return true;
        }
        return false;
    }

    public int getConflictKickLimit() {
        return conflictLimit;
    }

    /**
     * Returns the temporary keys used by the sessions that has not been authenticated yet. This
     * is an utility method useful for debugging situations.
     *
     * @return the temporary keys used by the sessions that has not been authenticated yet.
     */
    public Collection<String> getPreAuthenticatedKeys() {
        return localSessionManager.getPreAuthenticatedSessions().keySet();
    }

    /**
     * Returns true if the specified address belongs to a preauthenticated session. Preauthenticated
     * sessions are only available to the local cluster node when running inside of a cluster.
     *
     * @param address the address of the session.
     * @return true if the specified address belongs to a preauthenticated session.
     */
    public boolean isPreAuthenticatedSession(JID address) {
        return serverName.equals(address.getDomain()) &&
                localSessionManager.getPreAuthenticatedSessions().containsKey(address.getResource());
    }

    public void setConflictKickLimit(int limit) {
        conflictLimit = limit;
        JiveGlobals.setProperty("xmpp.session.conflict-limit", Integer.toString(conflictLimit));
    }
    /*
    @Override
    public Iterator<DiscoServerItem> getItems() {
        return Arrays.asList(new DiscoServerItem(serverAddress, null, null, null, this, this)).iterator();
    }

    @Override
    public Iterator<Element> getIdentities(String name, String node, JID senderJID) {
        return Collections.emptyIterator();
    }

    @Override
    public Iterator<String> getFeatures(String name, String node, JID senderJID) {
        return Collections.emptyIterator();
    }

    @Override
    public DataForm getExtendedInfo(String name, String node, JID senderJID) {
        return null;
    }

    @Override
    public boolean hasInfo(String name, String node, JID senderJID) {
        return false;
    }

    @Override
    public Iterator<DiscoItem> getItems(String name, String node, JID senderJID) {
        try {
            // If the requesting entity is the user itself or the requesting entity can probe the presence of the user.
            if (name != null && senderJID != null &&
            	server.getUserManager().isRegisteredUser(senderJID) &&
            	(name.equals(senderJID.getNode()) || server.getPresenceManager().canProbePresence(senderJID, name))) {
                Collection<DiscoItem> discoItems = new ArrayList<DiscoItem>();
                for (ClientSession clientSession : getSessions(name)) {
                    discoItems.add(new DiscoItem(clientSession.getAddress(), null, null, null));
                }
                return discoItems.iterator();
            }
            return Collections.emptyIterator();
        } catch (UserNotFoundException e) {
            return Collections.emptyIterator();
        }
    }
    */
    private class ClientSessionListener implements ConnectionCloseListener {
        /**
         * Handle a session that just closed.
         *
         * @param handback The session that just closed
         */
        @Override
        public void onConnectionClose(Object handback) {
            try {
                LocalClientSession session = (LocalClientSession) handback;
                try {
                    if ((session.getPresence().isAvailable() || !session.wasAvailable()) &&
                            routingTable.hasClientRoute(session.getAddress())) {
                        // Send an unavailable presence to the user's subscribers
                        // Note: This gives us a chance to send an unavailable presence to the
                        // entities that the user sent directed presences
                        Presence presence = new Presence();
                        presence.setType(Presence.Type.unavailable);
                        presence.setFrom(session.getAddress());
                        router.route(presence);
                    }

                    session.getStreamManager().onClose(router, serverAddress);
                }
                finally {
                    // Remove the session
                    removeSession(session);
                }
            }
            catch (Exception e) {
                // Can't do anything about this problem...
                Log.error(LocaleUtils.getLocalizedString("admin.error.close"), e);
            }
        }
    }

    private class ComponentSessionListener implements ConnectionCloseListener {
        /**
         * Handle a session that just closed.
         *
         * @param handback The session that just closed
         */
        @Override
        public void onConnectionClose(Object handback) {
            LocalComponentSession session = (LocalComponentSession)handback;
            try {
                // Unbind registered domains for this external component
                for (String domain : session.getExternalComponent().getSubdomains()) {
                    String subdomain = domain.substring(0, domain.indexOf(serverName) - 1);
                    InternalComponentManager.getInstance().removeComponent(subdomain, session.getExternalComponent());
                }
            }
            catch (Exception e) {
                // Can't do anything about this problem...
                Log.error(LocaleUtils.getLocalizedString("admin.error.close"), e);
            }
            finally {
                // Remove the session
                localSessionManager.getComponentsSessions().remove(session);

                // Remove track of the cluster node hosting the external component
                // if no more components are handling it.
                if (!InternalComponentManager.getInstance().hasComponent(session.getAddress())) {
                    componentSessionsCache.remove(session.getAddress().toString());
                }
            }
        }
    }

    private class IncomingServerSessionListener implements ConnectionCloseListener {
        /**
         * Handle a session that just closed.
         *
         * @param handback The session that just closed
         */
        @Override
        public void onConnectionClose(Object handback) {
            IncomingServerSession session = (IncomingServerSession)handback;
            // Remove all the hostnames that were registered for this server session
            for (String hostname : session.getValidatedDomains()) {
                unregisterIncomingServerSession(hostname, session);
            }
        }
    }

    private class OutgoingServerSessionListener implements ConnectionCloseListener {
        /**
         * Handle a session that just closed.
         *
         * @param handback The session that just closed
         */
        @Override
        public void onConnectionClose(Object handback) {
            OutgoingServerSession session = (OutgoingServerSession)handback;
            // Remove all the hostnames that were registered for this server session
            for (String hostname : session.getHostnames()) {
                // Remove the route to the session using the hostname
                server.getRoutingTable().removeServerRoute(new JID(hostname));
            }
        }
    }

    private class ConnectionMultiplexerSessionListener implements ConnectionCloseListener {
        /**
         * Handle a session that just closed.
         *
         * @param handback The session that just closed
         */
        @Override
        public void onConnectionClose(Object handback) {
            ConnectionMultiplexerSession session = (ConnectionMultiplexerSession)handback;
            // Remove all the hostnames that were registered for this server session
            String domain = session.getAddress().getDomain();
            localSessionManager.getConnnectionManagerSessions().remove(session.getAddress().toString());
            // Remove track of the cluster node hosting the CM connection
            multiplexerSessionsCache.remove(session.getAddress().toString());
            if (getConnectionMultiplexerSessions(domain).isEmpty()) {
                // Terminate ClientSessions originated from this connection manager
                // that are still active since the connection manager has gone down
                ConnectionMultiplexerManager.getInstance().multiplexerUnavailable(domain);
            }
        }
    }

    @Override
	public void initialize(XMPPServer server) {
        super.initialize(server);
        this.server = server;
        router = server.getPacketRouter();
        userManager = server.getUserManager();
        routingTable = server.getRoutingTable();
        serverName = server.getServerInfo().getXMPPDomain();
        serverAddress = new JID(serverName);

        if (JiveGlobals.getBooleanProperty("xmpp.audit.active")) {
            streamIDFactory = new AuditStreamIDFactory();
        }
        else {
            streamIDFactory = new BasicStreamIDFactory();
        }

        String conflictLimitProp = JiveGlobals.getProperty("xmpp.session.conflict-limit");
        if (conflictLimitProp == null) {
            conflictLimit = 0;
            JiveGlobals.setProperty("xmpp.session.conflict-limit", Integer.toString(conflictLimit));
        }
        else {
            try {
                conflictLimit = Integer.parseInt(conflictLimitProp);
            }
            catch (NumberFormatException e) {
                conflictLimit = 0;
                JiveGlobals.setProperty("xmpp.session.conflict-limit", Integer.toString(conflictLimit));
            }
        }

        // Initialize caches.
        componentSessionsCache = CacheFactory.createCache(COMPONENT_SESSION_CACHE_NAME);
        multiplexerSessionsCache = CacheFactory.createCache(CM_CACHE_NAME);
        incomingServerSessionsCache = CacheFactory.createCache(ISS_CACHE_NAME);
        hostnameSessionsCache = CacheFactory.createCache("Sessions by Hostname");
        validatedDomainsCache = CacheFactory.createCache("Validated Domains");
        sessionInfoCache = CacheFactory.createCache(C2S_INFO_CACHE_NAME);
        // Listen to cluster events
        ClusterManager.addListener(this);
        //server.getIQDiscoItemsHandler().addServerItemsProvider(this);
    }


    /**
     * Sends a message with a given subject and body to all the active user sessions in the server.
     *
     * @param subject the subject to broadcast.
     * @param body    the body to broadcast.
     */
    public void sendServerMessage(String subject, String body) {
        sendServerMessage(null, subject, body);
    }

    /**
     * Sends a message with a given subject and body to one or more user sessions related to the
     * specified address. If address is null or the address's node is null then the message will be
     * sent to all the user sessions. But if the address includes a node but no resource then
     * the message will be sent to all the user sessions of the requeted user (defined by the node).
     * Finally, if the address is a full JID then the message will be sent to the session associated
     * to the full JID. If no session is found then the message is not sent.
     *
     * @param address the address that defines the sessions that will receive the message.
     * @param subject the subject to broadcast.
     * @param body    the body to broadcast.
     */
    public void sendServerMessage(JID address, String subject, String body) {
        Message packet = createServerMessage(subject, body);
        if (address == null || address.getNode() == null || !userManager.isRegisteredUser(address)) {
            broadcast(packet);
        }
        else if (address.getResource() == null || address.getResource().length() < 1) {
            userBroadcast(address.getNode(), packet);
        }
        else {
            routingTable.routePacket(address, packet, true);
        }
    }

    private Message createServerMessage(String subject, String body) {
        Message message = new Message();
        message.setFrom(serverAddress);
        if (subject != null) {
            message.setSubject(subject);
        }
        message.setBody(body);
        return message;
    }

    @Override
	public void start() throws IllegalStateException {
        super.start();
        localSessionManager.start();
    }

    @Override
	public void stop() {
        Log.debug("SessionManager: Stopping server");
        // Stop threads that are sending packets to remote servers
        OutgoingSessionPromise.getInstance().shutdown();
        if (JiveGlobals.getBooleanProperty("shutdownMessage.enabled")) {
            sendServerMessage(null, LocaleUtils.getLocalizedString("admin.shutdown.now"));
        }
        localSessionManager.stop();
        serverName = null;
    }

    /**
     * Returns true if remote servers are allowed to have more than one connection to this
     * server. Having more than one connection may improve number of packets that can be
     * transfered per second. This setting only used by the server dialback mehod.<p>
     *
     * It is highly recommended that {@link #getServerSessionTimeout()} is enabled so that
     * dead connections to this server can be easily discarded.
     *
     * @return true if remote servers are allowed to have more than one connection to this
     *         server.
     */
    public boolean isMultipleServerConnectionsAllowed() {
        return JiveGlobals.getBooleanProperty("xmpp.server.session.allowmultiple", true);
    }

    /**
     * Sets if remote servers are allowed to have more than one connection to this
     * server. Having more than one connection may improve number of packets that can be
     * transfered per second. This setting only used by the server dialback mehod.<p>
     *
     * It is highly recommended that {@link #getServerSessionTimeout()} is enabled so that
     * dead connections to this server can be easily discarded.
     *
     * @param allowed true if remote servers are allowed to have more than one connection to this
     *        server.
     */
    public void setMultipleServerConnectionsAllowed(boolean allowed) {
        JiveGlobals.setProperty("xmpp.server.session.allowmultiple", Boolean.toString(allowed));
        if (allowed && JiveGlobals.getIntProperty("xmpp.server.session.idle", 10 * 60 * 1000) <= 0)
        {
            Log.warn("Allowing multiple S2S connections for each domain, without setting a " +
                    "maximum idle timeout for these connections, is unrecommended! Either " +
                    "set xmpp.server.session.allowmultiple to 'false' or change " +
                    "xmpp.server.session.idle to a (large) positive value.");
        }
    }

    /******************************************************
     * Clean up code
     *****************************************************/
    /**
     * Sets the number of milliseconds to elapse between clearing of idle server sessions.
     *
     * @param timeout the number of milliseconds to elapse between clearings.
     */
    public void setServerSessionTimeout(int timeout) {
        if (getServerSessionTimeout() == timeout) {
            return;
        }
        // Set the new property value
        JiveGlobals.setProperty("xmpp.server.session.timeout", Integer.toString(timeout));
    }

    /**
     * Returns the number of milliseconds to elapse between clearing of idle server sessions.
     *
     * @return the number of milliseconds to elapse between clearing of idle server sessions.
     */
    public int getServerSessionTimeout() {
        return JiveGlobals.getIntProperty("xmpp.server.session.timeout", 5 * 60 * 1000);
    }

    public void setServerSessionIdleTime(int idleTime) {
        if (getServerSessionIdleTime() == idleTime) {
            return;
        }
        // Set the new property value
        JiveGlobals.setProperty("xmpp.server.session.idle", Integer.toString(idleTime));

        if (idleTime <= 0 && isMultipleServerConnectionsAllowed() )
        {
            Log.warn("Allowing multiple S2S connections for each domain, without setting a " +
                "maximum idle timeout for these connections, is unrecommended! Either " +
                "set xmpp.server.session.allowmultiple to 'false' or change " +
                "xmpp.server.session.idle to a (large) positive value.");
        }
    }

    public int getServerSessionIdleTime() {
        return JiveGlobals.getIntProperty("xmpp.server.session.idle", 10 * 60 * 1000);
    }

    public Cache<String, ClientSessionInfo> getSessionInfoCache() {
        return sessionInfoCache;
    }

    @Override
    public void joinedCluster() {
        restoreCacheContent();
        // Track information about local sessions and share it with other cluster nodes
        for (ClientSession session : routingTable.getClientsRoutes(true)) {
            sessionInfoCache.put(session.getAddress().toString(), new ClientSessionInfo((LocalClientSession)session));
        }
    }

    @Override
    public void joinedCluster(byte[] nodeID) {
        // Do nothing
    }

    @Override
    public void leftCluster() {
        if (!XMPPServer.getInstance().isShuttingDown()) {
            // Add local sessions to caches
            restoreCacheContent();
        }
    }

    @Override
    public void leftCluster(byte[] nodeID) {
        // Do nothing
    }

    @Override
    public void markedAsSeniorClusterMember() {
        // Do nothing
    }

    private void restoreCacheContent() {
        // Add external component sessions hosted locally to the cache (using new nodeID)
        for (Session session : localSessionManager.getComponentsSessions()) {
            componentSessionsCache.put(session.getAddress().toString(), server.getNodeID().toByteArray());
        }

        // Add connection multiplexer sessions hosted locally to the cache (using new nodeID)
        for (String address : localSessionManager.getConnnectionManagerSessions().keySet()) {
            multiplexerSessionsCache.put(address, server.getNodeID().toByteArray());
        }

        // Add incoming server sessions hosted locally to the cache (using new nodeID)
        for (LocalIncomingServerSession session : localSessionManager.getIncomingServerSessions()) {
            String streamID = session.getStreamID().getID();
            incomingServerSessionsCache.put(streamID, server.getNodeID().toByteArray());
            for (String hostname : session.getValidatedDomains()) {
                // Update list of sockets/sessions coming from the same remote hostname
                Lock lock = CacheFactory.getLock(hostname, hostnameSessionsCache);
                try {
                    lock.lock();
                    List<String> streamIDs = hostnameSessionsCache.get(hostname);
                    if (streamIDs == null) {
                        streamIDs = new ArrayList<>();
                    }
                    streamIDs.add(streamID);
                    hostnameSessionsCache.put(hostname, streamIDs);
                }
                finally {
                    lock.unlock();
                }
                // Add to clustered cache
                lock = CacheFactory.getLock(streamID, validatedDomainsCache);
                try {
                    lock.lock();
                    Set<String> validatedDomains = validatedDomainsCache.get(streamID);
                    if (validatedDomains == null) {
                        validatedDomains = new HashSet<>();
                    }
                    boolean added = validatedDomains.add(hostname);
                    if (added) {
                        validatedDomainsCache.put(streamID, validatedDomains);
                    }
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}