Commit 242d99c7 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

1) Connection manager work. JM-666

2) More performance improvement work by reducing or eliminating synch blocks

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@3876 b35dd754-fafc-0310-a699-88a17e54d16e
parent aeaab52a
......@@ -21,6 +21,9 @@ import org.jivesoftware.wildfire.component.InternalComponentManager;
import org.jivesoftware.wildfire.container.BasicModule;
import org.jivesoftware.wildfire.event.SessionEventDispatcher;
import org.jivesoftware.wildfire.handler.PresenceUpdateHandler;
import org.jivesoftware.wildfire.multiplex.ConnectionMultiplexerManager;
import org.jivesoftware.wildfire.multiplex.ConnectionMultiplexerSession;
import org.jivesoftware.wildfire.net.SocketConnection;
import org.jivesoftware.wildfire.server.IncomingServerSession;
import org.jivesoftware.wildfire.server.OutgoingServerSession;
import org.jivesoftware.wildfire.server.OutgoingSessionPromise;
......@@ -58,6 +61,7 @@ public class SessionManager extends BasicModule {
private ComponentSessionListener componentSessionListener = new ComponentSessionListener();
private IncomingServerSessionListener incomingServerListener = new IncomingServerSessionListener();
private OutgoingServerSessionListener outgoingServerListener = new OutgoingServerSessionListener();
private ConnectionMultiplexerSessionListener multiplexerSessionListener = new ConnectionMultiplexerSessionListener();
/**
* Map that holds sessions that has been created but haven't been authenticated yet. The Map
......@@ -85,6 +89,16 @@ public class SessionManager extends BasicModule {
*/
private List<ComponentSession> componentsSessions = new CopyOnWriteArrayList<ComponentSession>();
/**
* Map of connection multiplexer sessions grouped by connection managers. Each connection
* manager may have many connections to the server (i.e. connection pool). All connections
* originated from the same connection manager are grouped as a single entry in the map.
* Once all connections have been closed users that were logged using the connection manager
* will become unavailable.
*/
private Map<String, List<ConnectionMultiplexerSession>> connnectionManagerSessions =
new ConcurrentHashMap<String, List<ConnectionMultiplexerSession>>();
/**
* The sessions contained in this Map are server sessions originated by a remote server. These
* sessions can only receive packets from the remote server but are not capable of sending
......@@ -157,6 +171,91 @@ public class SessionManager extends BasicModule {
}
}
/**
* Returns the session originated from the specified address. If the address contains
* a resource then the exact session is going to be looked for and if none is found then
* <tt>null</tt> is going to be returned. On the other hand, if the address is just a domain
* then any session originated from the connection manager is going to be returned and if
* the connection manager has no sessions then <tt>null</tt> is going to be returned.
*
* @param address the address of the connection manager (no resource included) or a specific
* session of the connection manager (resource included).
* @return the session originated from the specified address.
*/
public ConnectionMultiplexerSession getConnectionMultiplexerSession(JID address) {
List<ConnectionMultiplexerSession> sessions =
connnectionManagerSessions.get(address.getDomain());
if (sessions == null || sessions.isEmpty()) {
return null;
}
if (address.getResource() != null) {
// Look for the exact session
for (ConnectionMultiplexerSession session : sessions) {
if (session.getAddress().equals(address)) {
return session;
}
}
return null;
}
else {
// Look for any session of the connection manager
return sessions.get(0);
}
}
/**
* Returns a collection with all the sessions originated from the connection manager
* whose domain matches the specified domain. If there is no connection manager with
* the specified domain then an empty list is going to be returned.
*
* @param domain the domain of the connection manager.
* @return a collection with all the sessions originated from the connection manager
* whose domain matches the specified domain.
*/
public List<ConnectionMultiplexerSession> getConnectionMultiplexerSessions(String domain) {
List<ConnectionMultiplexerSession> sessions = connnectionManagerSessions.get(domain);
if (sessions == null || sessions.isEmpty()) {
return Collections.emptyList();
}
return new ArrayList<ConnectionMultiplexerSession>(sessions);
}
/**
* Creates a new <tt>ConnectionMultiplexerSession</tt>.
*
* @param conn the connection to create the session from.
* @param address the JID (may include a resource) of the connection manager's session.
* @return a newly created session.
* @throws UnauthorizedException
*/
public ConnectionMultiplexerSession createMultiplexerSession(SocketConnection conn, JID address)
throws UnauthorizedException {
if (serverName == null) {
throw new UnauthorizedException("Server not initialized");
}
StreamID id = nextStreamID();
ConnectionMultiplexerSession session = new ConnectionMultiplexerSession(serverName, conn, id);
conn.init(session);
// Register to receive close notification on this session so we can
// figure out when users that were using this connection manager may become unavailable
conn.registerCloseListener(multiplexerSessionListener, session);
// Add to connection multiplexer session.
List<ConnectionMultiplexerSession> sessions =
connnectionManagerSessions.get(address.getDomain());
if (sessions == null) {
synchronized (address.getDomain().intern()) {
sessions = connnectionManagerSessions.get(address.getDomain());
if (sessions == null) {
sessions = new CopyOnWriteArrayList<ConnectionMultiplexerSession>();
connnectionManagerSessions.put(address.getDomain(), sessions);
}
}
}
sessions.add(session);
return session;
}
/**
* Simple data structure to track sessions for a single user (tracked by resource
* and priority).
......@@ -359,17 +458,30 @@ public class SessionManager extends BasicModule {
}
/**
* Creates a new <tt>ClientSession</tt>.
* Creates a new <tt>ClientSession</tt>. The new Client session will have a newly created
* stream ID.
*
* @param conn the connection to create the session from.
* @return a newly created session.
* @throws UnauthorizedException
*/
public ClientSession createClientSession(Connection conn) throws UnauthorizedException {
return createClientSession(conn, nextStreamID());
}
/**
* Creates a new <tt>ClientSession</tt> with the specified streamID.
*
* @param conn the connection to create the session from.
* @param id the streamID to use for the new session.
* @return a newly created session.
* @throws UnauthorizedException
*/
public Session createClientSession(Connection conn) throws UnauthorizedException {
public ClientSession createClientSession(Connection conn, StreamID id)
throws UnauthorizedException {
if (serverName == null) {
throw new UnauthorizedException("Server not initialized");
}
StreamID id = nextStreamID();
ClientSession session = new ClientSession(serverName, conn, id);
conn.init(session);
// Register to receive close notification on this session so we can
......@@ -536,9 +648,9 @@ public class SessionManager extends BasicModule {
sessions.put(username, resources);
}
resources.addSession(session);
// Remove the pre-Authenticated session but remember to use the temporary ID as the key
preAuthenticatedSessions.remove(session.getStreamID().toString());
}
// Remove the pre-Authenticated session but remember to use the temporary ID as the key
preAuthenticatedSessions.remove(session.getStreamID().toString());
// Fire session created event.
SessionEventDispatcher
.dispatchEvent(session, SessionEventDispatcher.EventType.session_created);
......@@ -682,21 +794,22 @@ public class SessionManager extends BasicModule {
// Do nothing if the session belongs to an anonymous user
return;
}
Session defaultSession;
String username = sender.getNode();
SessionMap resources = sessions.get(username);
if (resources == null) {
return;
}
synchronized (username.intern()) {
SessionMap resources = sessions.get(username);
if (resources == null) {
return;
}
resources.changePriority(sender, priority);
// Get the session with highest priority
Session defaultSession = resources.getDefaultSession(true);
// Update the route to the bareJID with the session with highest priority
routingTable.addRoute(new JID(defaultSession.getAddress().getNode(),
defaultSession.getAddress().getDomain(), ""),
defaultSession);
defaultSession = resources.getDefaultSession(true);
}
// Update the route to the bareJID with the session with highest priority
routingTable.addRoute(new JID(defaultSession.getAddress().getNode(),
defaultSession.getAddress().getDomain(), ""),
defaultSession);
}
......@@ -724,15 +837,17 @@ public class SessionManager extends BasicModule {
}
}
else {
synchronized (username.intern()) {
SessionMap sessionMap = sessions.get(username);
if (sessionMap != null) {
if (resource == null) {
SessionMap sessionMap = sessions.get(username);
if (sessionMap != null) {
if (resource == null) {
synchronized (username.intern()) {
session = sessionMap.getDefaultSession(false);
}
else {
session = sessionMap.getSession(resource);
if (session == null) {
}
else {
session = sessionMap.getSession(resource);
if (session == null) {
synchronized (username.intern()) {
session = sessionMap.getDefaultSession(false);
}
}
......@@ -765,16 +880,14 @@ public class SessionManager extends BasicModule {
// Check if there is a session for a registered user
username = username.toLowerCase();
Session session = null;
synchronized (username.intern()) {
SessionMap sessionMap = sessions.get(username);
if (sessionMap != null) {
if (resource == null) {
hasRoute = !sessionMap.isEmpty();
}
else {
if (sessionMap.hasSession(resource)) {
session = sessionMap.getSession(resource);
}
SessionMap sessionMap = sessions.get(username);
if (sessionMap != null) {
if (resource == null) {
hasRoute = !sessionMap.isEmpty();
}
else {
if (sessionMap.hasSession(resource)) {
session = sessionMap.getSession(resource);
}
}
}
......@@ -824,24 +937,21 @@ public class SessionManager extends BasicModule {
// Initially Check preAuthenticated Sessions
if (resource != null) {
session = preAuthenticatedSessions.get(resource);
if(session != null){
if (session != null) {
return session;
}
}
if (resource == null) {
if (resource == null || username == null) {
return null;
}
if (username == null || !userManager.isRegisteredUser(username)) {
session = anonymousSessions.get(resource);
SessionMap sessionMap = sessions.get(username);
if (sessionMap != null) {
session = sessionMap.getSession(resource);
}
else {
synchronized (username.intern()) {
SessionMap sessionMap = sessions.get(username);
if (sessionMap != null) {
session = sessionMap.getSession(resource);
}
}
else if (!userManager.isRegisteredUser(username)) {
session = anonymousSessions.get(resource);
}
return session;
}
......@@ -1208,15 +1318,14 @@ public class SessionManager extends BasicModule {
else {
// If this is a non-anonymous session then remove the session from the SessionMap
String username = session.getAddress().getNode();
if (session.getAddress() != null && userManager.isRegisteredUser(username)) {
SessionMap sessionMap;
synchronized (username.intern()) {
sessionMap = sessions.get(username);
if (sessionMap != null) {
if (username != null) {
SessionMap sessionMap = sessions.get(username);
if (sessionMap != null) {
synchronized (username.intern()) {
sessionMap.removeSession(session);
if (sessionMap.isEmpty()) {
sessions.remove(username);
}
}
if (sessionMap.isEmpty()) {
sessions.remove(username);
}
}
if (sessionMap != null) {
......@@ -1358,6 +1467,27 @@ public class SessionManager extends BasicModule {
}
}
private class ConnectionMultiplexerSessionListener implements ConnectionCloseListener {
/**
* Handle a session that just closed.
*
* @param handback The session that just closed
*/
public void onConnectionClose(Object handback) {
ConnectionMultiplexerSession session = (ConnectionMultiplexerSession)handback;
// Remove all the hostnames that were registered for this server session
String domain = session.getAddress().getDomain();
List<ConnectionMultiplexerSession> sessions = connnectionManagerSessions.get(domain);
sessions.remove(session);
if (sessions.isEmpty()) {
connnectionManagerSessions.remove(domain);
// Terminate ClientSessions originated from this connection manager
// that are still active since the connection manager has gone down
ConnectionMultiplexerManager.getInstance().closeClientSessions(domain);
}
}
}
public void initialize(XMPPServer server) {
super.initialize(server);
presenceHandler = server.getPresenceUpdateHandler();
......@@ -1466,6 +1596,10 @@ public class SessionManager extends BasicModule {
for (List<IncomingServerSession> incomingSessions : incomingServerSessions.values()) {
sessions.addAll(incomingSessions);
}
for (List<ConnectionMultiplexerSession> multiplexers : connnectionManagerSessions
.values()) {
sessions.addAll(multiplexers);
}
for (Session session : sessions) {
try {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment