Commit 3c0bd7d6 authored by guus's avatar guus

Partial rewrite of the PEP implementation. A cache is used that's regularly...

Partial rewrite of the PEP implementation. A cache is used that's regularly cleaned, which should improve concurrency and reduce the memory footprint (OF-82).

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@11498 b35dd754-fafc-0310-a699-88a17e54d16e
parent f3396022
...@@ -20,23 +20,19 @@ ...@@ -20,23 +20,19 @@
package org.jivesoftware.openfire.pep; package org.jivesoftware.openfire.pep;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.QName; import org.dom4j.QName;
import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.openfire.IQHandlerInfo; import org.jivesoftware.openfire.IQHandlerInfo;
import org.jivesoftware.openfire.XMPPServer; import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.UnauthorizedException; import org.jivesoftware.openfire.auth.UnauthorizedException;
...@@ -57,13 +53,13 @@ import org.jivesoftware.openfire.roster.Roster; ...@@ -57,13 +53,13 @@ import org.jivesoftware.openfire.roster.Roster;
import org.jivesoftware.openfire.roster.RosterEventDispatcher; import org.jivesoftware.openfire.roster.RosterEventDispatcher;
import org.jivesoftware.openfire.roster.RosterEventListener; import org.jivesoftware.openfire.roster.RosterEventListener;
import org.jivesoftware.openfire.roster.RosterItem; import org.jivesoftware.openfire.roster.RosterItem;
import org.jivesoftware.openfire.roster.RosterManager;
import org.jivesoftware.openfire.session.ClientSession; import org.jivesoftware.openfire.session.ClientSession;
import org.jivesoftware.openfire.user.PresenceEventDispatcher; import org.jivesoftware.openfire.user.PresenceEventDispatcher;
import org.jivesoftware.openfire.user.PresenceEventListener; import org.jivesoftware.openfire.user.PresenceEventListener;
import org.jivesoftware.openfire.user.RemotePresenceEventDispatcher; import org.jivesoftware.openfire.user.RemotePresenceEventDispatcher;
import org.jivesoftware.openfire.user.RemotePresenceEventListener; import org.jivesoftware.openfire.user.RemotePresenceEventListener;
import org.jivesoftware.openfire.user.User; import org.jivesoftware.openfire.user.User;
import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.openfire.user.UserNotFoundException; import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -102,7 +98,7 @@ import org.xmpp.packet.Presence; ...@@ -102,7 +98,7 @@ import org.xmpp.packet.Presence;
* </p> * </p>
* *
* @author Armando Jagucki * @author Armando Jagucki
* * @author Guus der Kinderen, guus.der.kinderen@gmail.com
*/ */
public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ServerFeaturesProvider, public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ServerFeaturesProvider,
UserIdentitiesProvider, UserItemsProvider, PresenceEventListener, RemotePresenceEventListener, UserIdentitiesProvider, UserItemsProvider, PresenceEventListener, RemotePresenceEventListener,
...@@ -110,22 +106,28 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -110,22 +106,28 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
private static final Logger Log = LoggerFactory.getLogger(IQPEPHandler.class); private static final Logger Log = LoggerFactory.getLogger(IQPEPHandler.class);
final static String GET_PEP_SERVICE = "SELECT DISTINCT serviceID FROM ofPubsubNode WHERE serviceID=?";
/** /**
* Map of PEP services. Table, Key: bare JID (String); Value: PEPService * Metadata that relates to the IQ processing capabilities of this specific {@link IQHandler}.
*/ */
private Map<String, PEPService> pepServices; private final IQHandlerInfo info;
private IQHandlerInfo info; private PEPServiceManager pepServiceManager = null;
private PubSubEngine pubSubEngine = null;
/** /**
* Queue that will store the JID of the local users that came online. This queue * The managed thread pool that will do most of the processing. The amount
* will be consumed by another thread to improve performance of the server. * of worker threads in this pool should be kept low to avoid resource
* contention.
*/ */
private static BlockingQueue<JID> availableSessions = new LinkedBlockingQueue<JID>(10000); // There's room for future improvement here. If anywhere in the future,
// Openfire allows implementations to use dedicated resource pools, we can
// significantly increase the number of worker threads in this executor. The
// bottleneck for this particular executor is the database pool. During
// startup, PEP queries the database a lot, which causes all of the
// connections in the generic database pool to be used up by this PEP
// implementation. This can cause problems in other parts of Openfire that
// depend on database access (ideally, these should get dedicated resource
// pools too).
private ExecutorService executor = null;
/** /**
* A map of all known full JIDs that have sent presences from a remote server. * A map of all known full JIDs that have sent presences from a remote server.
...@@ -136,161 +138,181 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -136,161 +138,181 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
*/ */
private Map<String, Set<JID>> knownRemotePresences = new ConcurrentHashMap<String, Set<JID>>(); private Map<String, Set<JID>> knownRemotePresences = new ConcurrentHashMap<String, Set<JID>>();
/**
* Constructs a new {@link IQPEPHandler} instance.
*/
public IQPEPHandler() { public IQPEPHandler() {
super("Personal Eventing Handler"); super("Personal Eventing Handler");
pepServices = new ConcurrentHashMap<String, PEPService>();
info = new IQHandlerInfo("pubsub", "http://jabber.org/protocol/pubsub"); info = new IQHandlerInfo("pubsub", "http://jabber.org/protocol/pubsub");
// Create a thread that will process the queued JIDs of the sessions that came online. We
// are processing the events one at a time so we no longer have the paralellism to the database
// that was slowing down the server
Thread thread = new Thread("PEP avaiable sessions handler ") {
public void run() {
final XMPPServer server = XMPPServer.getInstance();
while (!server.isShuttingDown()) {
try {
JID availableSessionJID = availableSessions.take();
// Send the last published items for the contacts on availableSessionJID's roster.
try {
Roster roster = server.getRosterManager().getRoster(availableSessionJID.getNode());
for (RosterItem item : roster.getRosterItems()) {
if (server.isLocal(item.getJid()) && (item.getSubStatus() == RosterItem.SUB_BOTH ||
item.getSubStatus() == RosterItem.SUB_TO)) {
PEPService pepService = getPEPService(item.getJid().toBareJID());
if (pepService != null) {
pepService.sendLastPublishedItems(availableSessionJID);
}
}
}
}
catch (UserNotFoundException e) {
// Do nothing
}
}
catch (Exception e) {
Log.error(e.getMessage(), e);
}
}
}
};
thread.setDaemon(true);
thread.start();
} }
/*
* (non-Javadoc)
* @see org.jivesoftware.openfire.handler.IQHandler#initialize(org.jivesoftware.openfire.XMPPServer)
*/
@Override @Override
public void initialize(XMPPServer server) { public void initialize(XMPPServer server) {
super.initialize(server); super.initialize(server);
pepServiceManager = new PEPServiceManager();
}
/*
* (non-Javadoc)
*
* @see org.jivesoftware.openfire.container.BasicModule#destroy()
*/
public void destroy() {
super.destroy();
}
/*
* (non-Javadoc)
*
* @see org.jivesoftware.openfire.container.BasicModule#start()
*/
public void start() {
super.start();
// start the service manager
pepServiceManager.start();
// start a new executor service
startExecutor();
// Listen to presence events to manage PEP auto-subscriptions. // Listen to presence events to manage PEP auto-subscriptions.
PresenceEventDispatcher.addListener(this); PresenceEventDispatcher.addListener(this);
// Listen to remote presence events to manage the knownRemotePresences map. // Listen to remote presence events to manage the knownRemotePresences map.
RemotePresenceEventDispatcher.addListener(this); RemotePresenceEventDispatcher.addListener(this);
// Listen to roster events for PEP subscription cancelling on contact deletion. // Listen to roster events for PEP subscription cancelling on contact deletion.
RosterEventDispatcher.addListener(this); RosterEventDispatcher.addListener(this);
// Listen to user events in order to destroy a PEP service when a user is deleted. // Listen to user events in order to destroy a PEP service when a user is deleted.
UserEventDispatcher.addListener(this); UserEventDispatcher.addListener(this);
pubSubEngine = new PubSubEngine(server.getPacketRouter());
}
/**
* Returns true if the PEP service is enabled in the server.
*
* @return true if the PEP service is enabled in the server.
*/
public boolean isEnabled() {
return JiveGlobals.getBooleanProperty("xmpp.pep.enabled", true);
} }
/** /*
* Loads a PEP service from the database, if it exists. * (non-Javadoc)
* *
* @param jid the JID of the owner of the PEP service. * @see org.jivesoftware.openfire.container.BasicModule#stop()
* @return the loaded PEP service, or null if not found.
*/ */
private PEPService loadPEPServiceFromDB(String jid) { public void stop() {
PEPService pepService = null; super.stop();
Connection con = null; // Remove listeners
PreparedStatement pstmt = null; PresenceEventDispatcher.removeListener(this);
try { RemotePresenceEventDispatcher.removeListener(this);
con = DbConnectionManager.getConnection(); RosterEventDispatcher.removeListener(this);
// Get all PEP services UserEventDispatcher.removeListener(this);
pstmt = con.prepareStatement(GET_PEP_SERVICE);
pstmt.setString(1, jid);
ResultSet rs = pstmt.executeQuery();
// Restore old PEPServices
while (rs.next()) {
String serviceID = rs.getString(1);
// Create a new PEPService // stop the executor service
pepService = new PEPService(XMPPServer.getInstance(), serviceID); stopExecutor();
pepServices.put(serviceID, pepService);
pubSubEngine.start(pepService);
if (Log.isDebugEnabled()) { // stop the pepservicemananger
Log.debug("PEP: Restored service for " + serviceID + " from the database."); pepServiceManager.stop();
}
} }
rs.close();
pstmt.close(); /**
} * Starts a new thread pool, unless an existing one is still running.
catch (SQLException sqle) { */
Log.error(sqle.getMessage(), sqle); private void startExecutor() {
} if (executor == null || executor.isShutdown()) {
finally { // keep the amount of workers low! See comment that goes with the
try { // field named 'executor'.
if (pstmt != null) Log.debug("Starting executor service...");
pstmt.close(); Executors.newScheduledThreadPool(2);
} }
catch (Exception e) {
Log.error(e.getMessage(), e);
} }
/**
* Shuts down the executor by dropping all tasks from the queue. This method
* will allow the executor to finish operations on running tasks for a
* period of two seconds. After that, tasks are forcefully stopped.
* <p>
* The order in which the various shutdown routines of the executor are
* called, is:
* <ol>
* <li>{@link ExecutorService#shutdown()}</li>
* <li>{@link ExecutorService#awaitTermination(long, TimeUnit)} (two
* seconds)</li>
* <li>{@link ExecutorService#shutdownNow()}</li>
* </ol>
*/
private void stopExecutor() {
Log.debug("Stopping executor service...");
/*
* This method gets called as part of the Component#shutdown() routine.
* If that method gets called, the component has already been removed
* from the routing tables. We don't need to worry about new packets to
* arrive - there won't be any.
*/
executor.shutdown();
try { try {
if (con != null) if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
con.close(); Log.debug("Forcing a shutdown for the executor service (after a two-second timeout has elapsed...");
executor.shutdownNow();
// Note that if any IQ request stanzas had been scheduled, they
// MUST be responded to with an error here. A list of tasks that
// have never been commenced by the executor is returned by the
// #shutdownNow() method of the ExecutorService.
} }
catch (Exception e) { } catch (InterruptedException e) {
Log.error(e.getMessage(), e); // ignore, as we're shutting down anyway.
} }
} }
return pepService; /*
* (non-Javadoc)
*
* @see org.jivesoftware.openfire.handler.IQHandler#getInfo()
*/
@Override
public IQHandlerInfo getInfo() {
return info;
} }
public void stop() { /**
super.stop(); * Implements ServerIdentitiesProvider and UserIdentitiesProvider, adding
for (PEPService service : pepServices.values()) { * the PEP identity to the respective disco#info results.
pubSubEngine.shutdown(service); */
} public Iterator<Element> getIdentities() {
ArrayList<Element> identities = new ArrayList<Element>();
Element identity = DocumentHelper.createElement("identity");
identity.addAttribute("category", "pubsub");
identity.addAttribute("type", "pep");
identities.add(identity);
return identities.iterator();
} }
public void destroy() { /**
super.destroy(); * Implements ServerFeaturesProvider to include all supported XEP-0060 features
// Remove listeners * in the server's disco#info result (as per section 4 of XEP-0163).
PresenceEventDispatcher.removeListener(this); */
RemotePresenceEventDispatcher.removeListener(this); public Iterator<String> getFeatures() {
RosterEventDispatcher.removeListener(this); return XMPPServer.getInstance().getPubSubModule().getFeatures(null, null, null);
UserEventDispatcher.removeListener(this);
} }
@Override
public IQHandlerInfo getInfo() {
return info;
}
/** /**
* Returns the knownRemotePresences map. * Returns true if the PEP service is enabled in the server.
* *
* @return the knownRemotePresences map * @return true if the PEP service is enabled in the server.
*/ */
public Map<String, Set<JID>> getKnownRemotePresenes() { // TODO: listen for property changes to stop/start this module.
return knownRemotePresences; public boolean isEnabled() {
return JiveGlobals.getBooleanProperty("xmpp.pep.enabled", true);
} }
// *****************************************************************
// *** Generic module management ends here. From this point on ***
// *** more specific PEP related implementation after this point ***
// *****************************************************************
/*
* (non-Javadoc)
*
* @see
* org.jivesoftware.openfire.handler.IQHandler#handleIQ(org.xmpp.packet.IQ)
*/
@Override @Override
public IQ handleIQ(IQ packet) throws UnauthorizedException { public IQ handleIQ(IQ packet) throws UnauthorizedException {
// Do nothing if server is not enabled // Do nothing if server is not enabled
...@@ -301,53 +323,52 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -301,53 +323,52 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
return reply; return reply;
} }
JID senderJID = packet.getFrom(); final JID senderJID = packet.getFrom();
if (packet.getTo() == null) { if (packet.getTo() == null) {
if (packet.getType() == IQ.Type.set) { // packet addressed to service itself (not to a node/user)
String jidFrom = senderJID.toBareJID();
PEPService pepService = getPEPService(jidFrom); if (packet.getType() == IQ.Type.set) {
final String jidFrom = senderJID.toBareJID();
PEPService pepService = pepServiceManager.getPEPService(jidFrom);
// If no service exists yet for jidFrom, create one. // If no service exists yet for jidFrom, create one.
if (pepService == null) { if (pepService == null) {
// Return an error if the packet is from an anonymous, unregistered user try {
// or remote user pepService = pepServiceManager.create(senderJID);
if (!XMPPServer.getInstance().isLocal(senderJID) || !UserManager.getInstance().isRegisteredUser(senderJID.getNode())) { } catch (IllegalArgumentException ex) {
IQ reply = IQ.createResultIQ(packet); final IQ reply = IQ.createResultIQ(packet);
reply.setChildElement(packet.getChildElement().createCopy()); reply.setChildElement(packet.getChildElement().createCopy());
reply.setError(PacketError.Condition.not_allowed); reply.setError(PacketError.Condition.not_allowed);
return reply; return reply;
} }
pepService = new PEPService(XMPPServer.getInstance(), jidFrom);
pepServices.put(jidFrom, pepService);
// Probe presences // Probe presences
pubSubEngine.start(pepService); pepServiceManager.start(pepService);
if (Log.isDebugEnabled()) {
Log.debug("PEP: " + jidFrom + " had a PEPService created");
}
// Those who already have presence subscriptions to jidFrom // Those who already have presence subscriptions to jidFrom
// will now automatically be subscribed to this new PEPService. // will now automatically be subscribed to this new
// PEPService.
try { try {
Roster roster = XMPPServer.getInstance().getRosterManager().getRoster(senderJID.getNode()); final RosterManager rm = XMPPServer.getInstance()
for (RosterItem item : roster.getRosterItems()) { .getRosterManager();
if (item.getSubStatus() == RosterItem.SUB_BOTH || item.getSubStatus() == RosterItem.SUB_FROM) { final Roster roster = rm.getRoster(senderJID.getNode());
createSubscriptionToPEPService(pepService, item.getJid(), senderJID); for (final RosterItem item : roster.getRosterItems()) {
if (item.getSubStatus() == RosterItem.SUB_BOTH
|| item.getSubStatus() == RosterItem.SUB_FROM) {
createSubscriptionToPEPService(pepService, item
.getJid(), senderJID);
} }
} }
} } catch (UserNotFoundException e) {
catch (UserNotFoundException e) {
// Do nothing // Do nothing
} }
} }
// If publishing a node, and the node doesn't exist, create it. // If publishing a node, and the node doesn't exist, create it.
Element childElement = packet.getChildElement(); final Element childElement = packet.getChildElement();
Element publishElement = childElement.element("publish"); final Element publishElement = childElement.element("publish");
if (publishElement != null) { if (publishElement != null) {
String nodeID = publishElement.attributeValue("node"); final String nodeID = publishElement.attributeValue("node");
// Do not allow User Avatar nodes to be created. // Do not allow User Avatar nodes to be created.
// TODO: Implement XEP-0084 // TODO: Implement XEP-0084
...@@ -360,36 +381,35 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -360,36 +381,35 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
if (pepService.getNode(nodeID) == null) { if (pepService.getNode(nodeID) == null) {
// Create the node // Create the node
JID creator = new JID(jidFrom); final JID creator = new JID(jidFrom);
LeafNode newNode = new LeafNode(pepService, pepService.getRootCollectionNode(), nodeID, creator); final LeafNode newNode = new LeafNode(pepService, pepService.getRootCollectionNode(), nodeID, creator);
newNode.addOwner(creator); newNode.addOwner(creator);
newNode.saveToDB(); newNode.saveToDB();
} }
} }
// Process with PubSub as usual. // Process with PubSub as usual.
pubSubEngine.process(pepService, packet); pepServiceManager.process(pepService, packet);
} }
} }
else if (packet.getType() == IQ.Type.get || packet.getType() == IQ.Type.set) { else if (packet.getType() == IQ.Type.get || packet.getType() == IQ.Type.set) {
String jidTo = packet.getTo().toBareJID(); // packet was addressed to a node.
PEPService pepService = getPEPService(jidTo); final String jidTo = packet.getTo().toBareJID();
final PEPService pepService = pepServiceManager.getPEPService(jidTo);
if (pepService != null) { if (pepService != null) {
pubSubEngine.process(pepService, packet); pepServiceManager.process(pepService, packet);
} } else {
else {
// Process with PubSub using a dummyService. In the case where an IQ packet is sent to // Process with PubSub using a dummyService. In the case where an IQ packet is sent to
// a user who does not have a PEP service, we wish to utilize the error reporting flow // a user who does not have a PEP service, we wish to utilize the error reporting flow
// already present in the PubSubEngine. This gives the illusion that every user has a // already present in the PubSubEngine. This gives the illusion that every user has a
// PEP service, as required by the specification. // PEP service, as required by the specification.
PEPService dummyService = new PEPService(XMPPServer.getInstance(), senderJID.toBareJID()); PEPService dummyService = new PEPService(XMPPServer.getInstance(), senderJID.toBareJID());
pubSubEngine.process(dummyService, packet); pepServiceManager.process(dummyService, packet);
} }
} else {
}
else {
// Ignore IQ packets of type 'error' or 'result'. // Ignore IQ packets of type 'error' or 'result'.
return null; return null;
} }
...@@ -399,22 +419,12 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -399,22 +419,12 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
} }
/** /**
* Retrieves a PEP service -- attempting first from memory, then from the database. Note * Returns the knownRemotePresences map.
* that if no PEP service was found the next request of the PEP service will hit the
* database since we are not caching 'no PEP services'.
* *
* @param jid the bare JID of the user that owns the PEP service. * @return the knownRemotePresences map
* @return the requested PEP service if found or null if not found.
*/ */
public PEPService getPEPService(String jid) { public Map<String, Set<JID>> getKnownRemotePresenes() {
PEPService pepService = pepServices.get(jid); return knownRemotePresences;
if (pepService == null) {
pepService = loadPEPServiceFromDB(jid);
// TODO Cache that no PEP service was found so we do not look for it again. Remove from cache when created
}
return pepService;
} }
/** /**
...@@ -477,7 +487,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -477,7 +487,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
formField.setVariable("pubsub#subscription_depth"); formField.setVariable("pubsub#subscription_depth");
formField.addValue("all"); formField.addValue("all");
pubSubEngine.process(pepService, subscriptionPacket); pepServiceManager.process(pepService, subscriptionPacket);
} }
/** /**
...@@ -488,7 +498,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -488,7 +498,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
*/ */
private void cancelSubscriptionToPEPService(JID unsubscriber, JID serviceOwner) { private void cancelSubscriptionToPEPService(JID unsubscriber, JID serviceOwner) {
// Retrieve recipientJID's PEP service, if it exists. // Retrieve recipientJID's PEP service, if it exists.
PEPService pepService = getPEPService(serviceOwner.toBareJID()); PEPService pepService = pepServiceManager.getPEPService(serviceOwner.toBareJID());
if (pepService == null) { if (pepService == null) {
return; return;
} }
...@@ -501,27 +511,6 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -501,27 +511,6 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
} }
} }
/**
* Implements ServerIdentitiesProvider and UserIdentitiesProvider, adding
* the PEP identity to the respective disco#info results.
*/
public Iterator<Element> getIdentities() {
ArrayList<Element> identities = new ArrayList<Element>();
Element identity = DocumentHelper.createElement("identity");
identity.addAttribute("category", "pubsub");
identity.addAttribute("type", "pep");
identities.add(identity);
return identities.iterator();
}
/**
* Implements ServerFeaturesProvider to include all supported XEP-0060 features
* in the server's disco#info result (as per section 4 of XEP-0163).
*/
public Iterator<String> getFeatures() {
return XMPPServer.getInstance().getPubSubModule().getFeatures(null, null, null);
}
/** /**
* Implements UserItemsProvider, adding PEP related items to a disco#items * Implements UserItemsProvider, adding PEP related items to a disco#items
* result. * result.
...@@ -530,7 +519,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -530,7 +519,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
ArrayList<Element> items = new ArrayList<Element>(); ArrayList<Element> items = new ArrayList<Element>();
String recipientJID = XMPPServer.getInstance().createJID(name, null, true).toBareJID(); String recipientJID = XMPPServer.getInstance().createJID(name, null, true).toBareJID();
PEPService pepService = getPEPService(recipientJID); PEPService pepService = pepServiceManager.getPEPService(recipientJID);
if (pepService != null) { if (pepService != null) {
CollectionNode rootNode = pepService.getRootCollectionNode(); CollectionNode rootNode = pepService.getRootCollectionNode();
...@@ -557,7 +546,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -557,7 +546,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
} }
public void subscribedToPresence(JID subscriberJID, JID authorizerJID) { public void subscribedToPresence(JID subscriberJID, JID authorizerJID) {
PEPService pepService = getPEPService(authorizerJID.toBareJID()); final PEPService pepService = pepServiceManager.getPEPService(authorizerJID.toBareJID());
if (pepService != null) { if (pepService != null) {
createSubscriptionToPEPService(pepService, subscriberJID, authorizerJID); createSubscriptionToPEPService(pepService, subscriberJID, authorizerJID);
...@@ -592,9 +581,9 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -592,9 +581,9 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
if (newlyAvailableJID == null) { if (newlyAvailableJID == null) {
return; return;
} }
// Store the JID of the session that became online. The processing of this
// event will take place in another thread to improve performance of the server final GetNotificationsOnInitialPresence task = new GetNotificationsOnInitialPresence(newlyAvailableJID);
availableSessions.add(newlyAvailableJID); executor.submit(task);
} }
public void remoteUserAvailable(Presence presence) { public void remoteUserAvailable(Presence presence) {
...@@ -622,7 +611,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -622,7 +611,7 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
// TODO Directed presences should be ignored when no presence subscription exists // TODO Directed presences should be ignored when no presence subscription exists
// Send the presence packet recipient's last published items to the remote user. // Send the presence packet recipient's last published items to the remote user.
PEPService pepService = getPEPService(jidTo.toBareJID()); PEPService pepService = pepServiceManager.getPEPService(jidTo.toBareJID());
if (pepService != null) { if (pepService != null) {
pepService.sendLastPublishedItems(jidFrom); pepService.sendLastPublishedItems(jidFrom);
} }
...@@ -634,11 +623,11 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -634,11 +623,11 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
if (!isEnabled()) { if (!isEnabled()) {
return; return;
} }
JID jidFrom = presence.getFrom(); final JID jidFrom = presence.getFrom();
JID jidTo = presence.getTo(); final JID jidTo = presence.getTo();
// Manage the cache of remote presence resources. // Manage the cache of remote presence resources.
Set<JID> remotePresenceSet = knownRemotePresences.get(jidTo.toBareJID()); final Set<JID> remotePresenceSet = knownRemotePresences.get(jidTo.toBareJID());
if (remotePresenceSet != null) { if (remotePresenceSet != null) {
remotePresenceSet.remove(jidFrom); remotePresenceSet.remove(jidFrom);
...@@ -654,24 +643,15 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -654,24 +643,15 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
} }
public void userDeleting(User user, Map<String, Object> params) { public void userDeleting(User user, Map<String, Object> params) {
JID bareJID = XMPPServer.getInstance().createJID(user.getUsername(), null); final JID bareJID = XMPPServer.getInstance().createJID(user.getUsername(), null);
PEPService pepService = getPEPService(bareJID.toString()); final PEPService pepService = pepServiceManager.getPEPService(bareJID.toString());
if (pepService == null) { if (pepService == null) {
return; return;
} }
// Delete the user's PEP nodes from memory and the database.
CollectionNode rootNode = pepService.getRootCollectionNode();
for (Node node : pepService.getNodes()) {
if (rootNode.isChildNode(node)) {
node.delete();
}
}
rootNode.delete();
// Remove the user's PEP service, finally. // Remove the user's PEP service, finally.
pepServices.remove(bareJID.toString()); pepServiceManager.remove(bareJID);
} }
/** /**
...@@ -679,12 +659,10 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -679,12 +659,10 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
*/ */
public void unavailableSession(ClientSession session, Presence presence) { public void unavailableSession(ClientSession session, Presence presence) {
// Do nothing // Do nothing
} }
public void presenceChanged(ClientSession session, Presence presence) { public void presenceChanged(ClientSession session, Presence presence) {
// Do nothing // Do nothing
} }
public boolean addingContact(Roster roster, RosterItem item, boolean persistent) { public boolean addingContact(Roster roster, RosterItem item, boolean persistent) {
...@@ -694,27 +672,49 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ...@@ -694,27 +672,49 @@ public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider,
public void contactAdded(Roster roster, RosterItem item) { public void contactAdded(Roster roster, RosterItem item) {
// Do nothing // Do nothing
} }
public void contactUpdated(Roster roster, RosterItem item) { public void contactUpdated(Roster roster, RosterItem item) {
// Do nothing // Do nothing
} }
public void rosterLoaded(Roster roster) { public void rosterLoaded(Roster roster) {
// Do nothing // Do nothing
} }
public void userCreated(User user, Map<String, Object> params) { public void userCreated(User user, Map<String, Object> params) {
// Do nothing // Do nothing
} }
public void userModified(User user, Map<String, Object> params) { public void userModified(User user, Map<String, Object> params) {
// Do nothing // Do nothing
}
private class GetNotificationsOnInitialPresence implements Runnable {
private final JID availableSessionJID;
public GetNotificationsOnInitialPresence(final JID availableSessionJID) {
this.availableSessionJID = availableSessionJID;
} }
public void run() {
// Send the last published items for the contacts on availableSessionJID's roster.
try {
final XMPPServer server = XMPPServer.getInstance();
final Roster roster = server.getRosterManager().getRoster(availableSessionJID.getNode());
for (final RosterItem item : roster.getRosterItems()) {
if (server.isLocal(item.getJid()) && (item.getSubStatus() == RosterItem.SUB_BOTH ||
item.getSubStatus() == RosterItem.SUB_TO)) {
PEPService pepService = pepServiceManager.getPEPService(item.getJid().toBareJID());
if (pepService != null) {
pepService.sendLastPublishedItems(availableSessionJID);
}
}
}
}
catch (UserNotFoundException e) {
// Do nothing
}
}
}
} }
...@@ -20,6 +20,16 @@ ...@@ -20,6 +20,16 @@
package org.jivesoftware.openfire.pep; package org.jivesoftware.openfire.pep;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TimeZone;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
import org.dom4j.Element; import org.dom4j.Element;
import org.dom4j.QName; import org.dom4j.QName;
...@@ -29,7 +39,17 @@ import org.jivesoftware.openfire.XMPPServer; ...@@ -29,7 +39,17 @@ import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.commands.AdHocCommandManager; import org.jivesoftware.openfire.commands.AdHocCommandManager;
import org.jivesoftware.openfire.entitycaps.EntityCapabilities; import org.jivesoftware.openfire.entitycaps.EntityCapabilities;
import org.jivesoftware.openfire.entitycaps.EntityCapabilitiesManager; import org.jivesoftware.openfire.entitycaps.EntityCapabilitiesManager;
import org.jivesoftware.openfire.pubsub.*; import org.jivesoftware.openfire.pubsub.CollectionNode;
import org.jivesoftware.openfire.pubsub.DefaultNodeConfiguration;
import org.jivesoftware.openfire.pubsub.LeafNode;
import org.jivesoftware.openfire.pubsub.Node;
import org.jivesoftware.openfire.pubsub.NodeSubscription;
import org.jivesoftware.openfire.pubsub.PendingSubscriptionsCommand;
import org.jivesoftware.openfire.pubsub.PubSubEngine;
import org.jivesoftware.openfire.pubsub.PubSubPersistenceManager;
import org.jivesoftware.openfire.pubsub.PubSubService;
import org.jivesoftware.openfire.pubsub.PublishedItem;
import org.jivesoftware.openfire.pubsub.PublishedItemTask;
import org.jivesoftware.openfire.pubsub.models.AccessModel; import org.jivesoftware.openfire.pubsub.models.AccessModel;
import org.jivesoftware.openfire.pubsub.models.PublisherModel; import org.jivesoftware.openfire.pubsub.models.PublisherModel;
import org.jivesoftware.openfire.roster.Roster; import org.jivesoftware.openfire.roster.Roster;
...@@ -44,10 +64,6 @@ import org.xmpp.packet.Message; ...@@ -44,10 +64,6 @@ import org.xmpp.packet.Message;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketExtension; import org.xmpp.packet.PacketExtension;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
/** /**
* A PEPService is a {@link PubSubService} for use with XEP-0163: "Personal Eventing via * A PEPService is a {@link PubSubService} for use with XEP-0163: "Personal Eventing via
* Pubsub" Version 1.0 * Pubsub" Version 1.0
...@@ -165,7 +181,9 @@ public class PEPService implements PubSubService { ...@@ -165,7 +181,9 @@ public class PEPService implements PubSubService {
// Save or delete published items from the database every 2 minutes // Save or delete published items from the database every 2 minutes
// starting in 2 minutes (default values) // starting in 2 minutes (default values)
publishedItemTask = new PublishedItemTask(this); publishedItemTask = new PublishedItemTask(this) {
};
timer.schedule(publishedItemTask, items_task_timeout, items_task_timeout); timer.schedule(publishedItemTask, items_task_timeout, items_task_timeout);
// Load default configuration for leaf nodes // Load default configuration for leaf nodes
...@@ -530,12 +548,10 @@ public class PEPService implements PubSubService { ...@@ -530,12 +548,10 @@ public class PEPService implements PubSubService {
public void queueItemToAdd(PublishedItem newItem) { public void queueItemToAdd(PublishedItem newItem) {
PubSubEngine.queueItemToAdd(this, newItem); PubSubEngine.queueItemToAdd(this, newItem);
} }
public void queueItemToRemove(PublishedItem removedItem) { public void queueItemToRemove(PublishedItem removedItem) {
PubSubEngine.queueItemToRemove(this, removedItem); PubSubEngine.queueItemToRemove(this, removedItem);
} }
public Map<String, Map<String, String>> getBarePresences() { public Map<String, Map<String, String>> getBarePresences() {
......
/**
* Copyright (C) 2004-2008 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.openfire.pep;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.locks.Lock;
import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.pubsub.CollectionNode;
import org.jivesoftware.openfire.pubsub.Node;
import org.jivesoftware.openfire.pubsub.PubSubEngine;
import org.jivesoftware.openfire.user.UserManager;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
/**
* Manages the creation, persistence and removal of {@link PEPService}
* instances.
*
* @author Guus der Kinderen, guus.der.kinderen@gmail.com
*
*/
public class PEPServiceManager {
public static final Logger Log = LoggerFactory
.getLogger(PEPServiceManager.class);
private final static String GET_PEP_SERVICE = "SELECT DISTINCT serviceID FROM ofPubsubNode WHERE serviceID=?";
/**
* Cache of PEP services. Table, Key: bare JID (String); Value: PEPService
*/
private final Cache<String, PEPService> pepServices = CacheFactory
.createCache("PEPServiceManager");
private PubSubEngine pubSubEngine = null;
/**
* Retrieves a PEP service -- attempting first from memory, then from the
* database.
*
* @param jid
* the bare JID of the user that owns the PEP service.
* @return the requested PEP service if found or null if not found.
*/
public PEPService getPEPService(String jid) {
PEPService pepService = null;
final Lock lock = CacheFactory.getLock(jid, pepServices);
try {
lock.lock();
if (pepServices.containsKey(jid)) {
// lookup in cache
pepService = pepServices.get(jid);
} else {
// lookup in database.
pepService = loadPEPServiceFromDB(jid);
// always add to the cache, even if it doesn't exist. This will
// prevent future database lookups.
pepServices.put(jid, pepService);
}
} finally {
lock.unlock();
}
return pepService;
}
public PEPService create(JID owner) {
// Return an error if the packet is from an anonymous, unregistered user
// or remote user
if (!XMPPServer.getInstance().isLocal(owner)
|| !UserManager.getInstance().isRegisteredUser(owner.getNode())) {
throw new IllegalArgumentException(
"Request must be initiated by a local, registered user, but is not: "
+ owner);
}
PEPService pepService = null;
final String bareJID = owner.toBareJID();
final Lock lock = CacheFactory.getLock(owner, pepServices);
try {
lock.lock();
pepService = pepServices.get(bareJID);
if (pepService == null) {
pepService = new PEPService(XMPPServer.getInstance(), bareJID);
pepServices.put(bareJID, pepService);
if (Log.isDebugEnabled()) {
Log.debug("PEPService created for : " + bareJID);
}
}
} finally {
lock.unlock();
}
return pepService;
}
/**
* Loads a PEP service from the database, if it exists.
*
* @param jid
* the JID of the owner of the PEP service.
* @return the loaded PEP service, or null if not found.
*/
private PEPService loadPEPServiceFromDB(String jid) {
PEPService pepService = null;
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
con = DbConnectionManager.getConnection();
// Get all PEP services
pstmt = con.prepareStatement(GET_PEP_SERVICE);
pstmt.setString(1, jid);
rs = pstmt.executeQuery();
// Restore old PEPServices
while (rs.next()) {
String serviceID = rs.getString(1);
// Create a new PEPService
pepService = new PEPService(XMPPServer.getInstance(), serviceID);
pepServices.put(serviceID, pepService);
pubSubEngine.start(pepService);
if (Log.isDebugEnabled()) {
Log.debug("PEP: Restored service for " + serviceID
+ " from the database.");
}
}
} catch (SQLException sqle) {
Log.error(sqle.getMessage(), sqle);
} finally {
DbConnectionManager.closeConnection(rs, pstmt, con);
}
return pepService;
}
/**
* Deletes the {@link PEPService} belonging to the specified owner.
*
* @param owner
* The JID of the owner of the service to be deleted.
*/
public void remove(JID owner) {
PEPService service = null;
final Lock lock = CacheFactory.getLock(owner, pepServices);
try {
lock.lock();
service = pepServices.remove(owner.toBareJID());
} finally {
lock.unlock();
}
if (service == null) {
return;
}
// Delete the user's PEP nodes from memory and the database.
CollectionNode rootNode = service.getRootCollectionNode();
for (final Node node : service.getNodes()) {
if (rootNode.isChildNode(node)) {
node.delete();
}
}
rootNode.delete();
}
public void start(PEPService pepService) {
pubSubEngine.start(pepService);
}
public void start() {
pubSubEngine = new PubSubEngine(XMPPServer.getInstance()
.getPacketRouter());
}
public void stop() {
for (PEPService service : pepServices.values()) {
pubSubEngine.shutdown(service);
}
pubSubEngine = null;
}
public void process(PEPService service, IQ iq) {
pubSubEngine.process(service, iq);
}
public boolean hasCachedService(JID owner) {
return pepServices.get(owner) != null;
}
// mimics Shutdown, without killing the timer.
public void unload(PEPService service) {
pubSubEngine.shutdown(service);
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2005-2008 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.openfire.pep;
import org.jivesoftware.openfire.pubsub.PublishedItemTask;
/**
* TimerTask that unloads services from memory, after they have been expired
* from cache.
*
* @author Guus der Kinderen, guus.der.kinderen@gmail.com
*/
public class PublishedPEPServiceTask extends PublishedItemTask {
private final PEPServiceManager manager;
public PublishedPEPServiceTask(PEPService service, PEPServiceManager manager) {
super(service);
this.manager = manager;
}
@Override
public void run() {
// Somewhat of a hack to unload the PEPService after it has been removed
// from the cache. New scheduled packets will re-instate the service.
PEPService service = (PEPService) this.getService();
if (manager.hasCachedService(service.getAddress())) {
super.run();
} else {
manager.unload(service);
}
}
}
...@@ -1731,7 +1731,7 @@ public class PubSubEngine { ...@@ -1731,7 +1731,7 @@ public class PubSubEngine {
public void shutdown(PubSubService service) { public void shutdown(PubSubService service) {
// Stop the maintenance processes // Stop the maintenance processes
service.getTimer().cancel(); service.getPublishedItemTask().cancel();
// Delete from the database items contained in the itemsToDelete queue // Delete from the database items contained in the itemsToDelete queue
PublishedItem entry; PublishedItem entry;
while (!service.getItemsToDelete().isEmpty()) { while (!service.getItemsToDelete().isEmpty()) {
...@@ -1749,6 +1749,10 @@ public class PubSubEngine { ...@@ -1749,6 +1749,10 @@ public class PubSubEngine {
} }
// Stop executing ad-hoc commands // Stop executing ad-hoc commands
service.getManager().stop(); service.getManager().stop();
// clear all nodes for this service, to remove circular references back to the service instance.
service.getNodes().clear(); // FIXME: this is an ugly hack. getNodes() is documented to return an unmodifiable collection (but does not).
} }
/******************************************************************************* /*******************************************************************************
......
...@@ -23,6 +23,7 @@ package org.jivesoftware.openfire.pubsub; ...@@ -23,6 +23,7 @@ package org.jivesoftware.openfire.pubsub;
import java.util.Queue; import java.util.Queue;
import java.util.TimerTask; import java.util.TimerTask;
import org.jivesoftware.openfire.pep.PEPService;
import org.jivesoftware.util.LocaleUtils; import org.jivesoftware.util.LocaleUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -91,4 +92,8 @@ public class PublishedItemTask extends TimerTask { ...@@ -91,4 +92,8 @@ public class PublishedItemTask extends TimerTask {
Log.error(LocaleUtils.getLocalizedString("admin.error"), e); Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
} }
} }
protected PubSubService getService() {
return service;
}
} }
...@@ -128,6 +128,7 @@ public class CacheFactory { ...@@ -128,6 +128,7 @@ public class CacheFactory {
cacheNames.put("Entity Capabilities Users", "entityCapabilitiesUsers"); cacheNames.put("Entity Capabilities Users", "entityCapabilitiesUsers");
cacheNames.put("Entity Capabilities Pending Hashes", "entityCapabilitiesPendingHashes"); cacheNames.put("Entity Capabilities Pending Hashes", "entityCapabilitiesPendingHashes");
cacheNames.put("Clearspace SSO Nonce", "clearspaceSSONonce"); cacheNames.put("Clearspace SSO Nonce", "clearspaceSSONonce");
cacheNames.put("PEPServiceManager", "pepServiceManager");
cacheProps.put("cache.fileTransfer.size", 128 * 1024l); cacheProps.put("cache.fileTransfer.size", 128 * 1024l);
cacheProps.put("cache.fileTransfer.maxLifetime", 1000 * 60 * 10l); cacheProps.put("cache.fileTransfer.maxLifetime", 1000 * 60 * 10l);
...@@ -199,6 +200,8 @@ public class CacheFactory { ...@@ -199,6 +200,8 @@ public class CacheFactory {
cacheProps.put("cache.pluginCacheInfo.maxLifetime", -1l); cacheProps.put("cache.pluginCacheInfo.maxLifetime", -1l);
cacheProps.put("cache.clearspaceSSONonce.size", -1l); cacheProps.put("cache.clearspaceSSONonce.size", -1l);
cacheProps.put("cache.clearspaceSSONonce.maxLifetime", JiveConstants.MINUTE * 2); cacheProps.put("cache.clearspaceSSONonce.maxLifetime", JiveConstants.MINUTE * 2);
cacheProps.put("cache.pepServiceManager.size", 1024l * 1024 * 10);
cacheProps.put("cache.pepServiceManager.maxLifetime", JiveConstants.MINUTE * 30);
} }
private CacheFactory() { private CacheFactory() {
......
...@@ -883,6 +883,25 @@ http://www.tangosol.com/UserGuide-Reference-CacheConfig.jsp ...@@ -883,6 +883,25 @@ http://www.tangosol.com/UserGuide-Reference-CacheConfig.jsp
</init-params> </init-params>
</cache-mapping> </cache-mapping>
<cache-mapping>
<cache-name>PEPServiceManager</cache-name>
<scheme-name>near-distributed</scheme-name>
<init-params>
<init-param>
<param-name>back-size-high</param-name>
<param-value>4000</param-value>
</init-param>
<init-param>
<param-name>back-expiry</param-name>
<param-value>30m</param-value>
</init-param>
<init-param>
<param-name>back-size-low</param-name>
<param-value>3000</param-value>
</init-param>
</init-params>
</cache-mapping>
</caching-scheme-mapping> </caching-scheme-mapping>
</cache-config> </cache-config>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment