Commit 3d59aed7 authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Queue login events and process them in another thread. JM-1216

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@9655 b35dd754-fafc-0310-a699-88a17e54d16e
parent 2c54e239
...@@ -46,7 +46,9 @@ import java.sql.PreparedStatement; ...@@ -46,7 +46,9 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.*; import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
/** /**
* <p> * <p>
...@@ -92,6 +94,12 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -92,6 +94,12 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
private PubSubEngine pubSubEngine = null; private PubSubEngine pubSubEngine = null;
/**
* Queue that will store the JID of the local users that came online. This queue
* will be consumed by another thread to improve performance of the server.
*/
private static BlockingQueue<JID> availableSessions = new LinkedBlockingQueue<JID>();
/** /**
* A map of all known full JIDs that have sent presences from a remote server. * A map of all known full JIDs that have sent presences from a remote server.
* table: key Bare JID (String); value Set of JIDs * table: key Bare JID (String); value Set of JIDs
...@@ -105,6 +113,40 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -105,6 +113,40 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
super("Personal Eventing Handler"); super("Personal Eventing Handler");
pepServices = new ConcurrentHashMap<String, PEPService>(); pepServices = new ConcurrentHashMap<String, PEPService>();
info = new IQHandlerInfo("pubsub", "http://jabber.org/protocol/pubsub"); info = new IQHandlerInfo("pubsub", "http://jabber.org/protocol/pubsub");
// Create a thread that will process the queued JIDs of the sessions that came online. We
// are processing the events one at a time so we no longer have the paralellism to the database
// that was slowing down the server
Thread thread = new Thread("PEP avaiable sessions handler ") {
public void run() {
for (; ;) {
try {
JID availableSessionJID = availableSessions.take();
// Send the last published items for the contacts on availableSessionJID's roster.
try {
Roster roster = XMPPServer.getInstance().getRosterManager()
.getRoster(availableSessionJID.getNode());
for (RosterItem item : roster.getRosterItems()) {
if (item.getSubStatus() == RosterItem.SUB_BOTH) {
PEPService pepService = getPEPService(item.getJid().toBareJID());
if (pepService != null) {
pepService.sendLastPublishedItems(availableSessionJID);
}
}
}
}
catch (UserNotFoundException e) {
// Do nothing
}
}
catch (Exception e) {
Log.error(e);
}
}
}
};
thread.setDaemon(true);
thread.start();
} }
@Override @Override
...@@ -441,7 +483,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -441,7 +483,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
public Iterator<Element> getUserItems(String name, JID senderJID) { public Iterator<Element> getUserItems(String name, JID senderJID) {
ArrayList<Element> items = new ArrayList<Element>(); ArrayList<Element> items = new ArrayList<Element>();
String recipientJID = XMPPServer.getInstance().createJID(name, null).toBareJID(); String recipientJID = XMPPServer.getInstance().createJID(name, null, true).toBareJID();
PEPService pepService = getPEPService(recipientJID); PEPService pepService = getPEPService(recipientJID);
if (pepService != null) { if (pepService != null) {
...@@ -500,22 +542,9 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -500,22 +542,9 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
if (newlyAvailableJID == null) { if (newlyAvailableJID == null) {
return; return;
} }
// Store the JID of the session that became online. The processing of this
// Send the last published items for the contacts on newlyAvailableJID's roster. // event will take place in another thread to improve performance of the server
try { availableSessions.add(newlyAvailableJID);
Roster roster = XMPPServer.getInstance().getRosterManager().getRoster(newlyAvailableJID.getNode());
for (RosterItem item : roster.getRosterItems()) {
if (item.getSubStatus() == RosterItem.SUB_BOTH) {
PEPService pepService = getPEPService(item.getJid().toBareJID());
if (pepService != null) {
pepService.sendLastPublishedItems(newlyAvailableJID);
}
}
}
}
catch (UserNotFoundException e) {
// Do nothing
}
} }
public void remoteUserAvailable(Presence presence) { public void remoteUserAvailable(Presence presence) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment