Commit db7e648a authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Refactoring work on ComponentEventListener. JM-1310 (review and testing is pending)

git-svn-id: http://svn.igniterealtime.org/svn/repos/openfire/trunk@10120 b35dd754-fafc-0310-a699-88a17e54d16e
parent 9402f3a8
package org.jivesoftware.openfire.clearspace;
import org.jivesoftware.openfire.muc.MUCEventDelegate;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.component.InternalComponentManager;
import org.xmpp.packet.JID;
import org.jivesoftware.openfire.muc.MUCEventDelegate;
import org.xmpp.packet.IQ;
import org.xmpp.component.ComponentException;
import org.xmpp.component.Component;
import org.xmpp.packet.JID;
import java.util.Map;
import java.util.HashMap;
import java.util.Map;
/**
* TODO: Comment me
......@@ -33,19 +31,16 @@ public class ClearspaceMUCEventDelegate extends MUCEventDelegate {
public Map<String, String> getRoomConfig(String roomName) {
Map<String, String> roomConfig = new HashMap<String, String>();
// TODO: Ensure the getComponent method gets implemented.
Component csComponent = ClearspaceManager.getInstance().getComponent();
// TODO: Get the config by connecting to CS through the component
InternalComponentManager internalComponentManager = InternalComponentManager.getInstance();
// TODO: Create query packet asking for the room config and in CS create a handler for that packet
IQ query = null;
IQ result;
try {
result = internalComponentManager.query(csComponent, query, 15000);
} catch (ComponentException e) {
//
IQ result = ClearspaceManager.getInstance().query(query, 15000);
if (result == null) {
// TODO No answer was received from Clearspace so return null
return null;
}
// TODO Check that the IQ is of type RESULT (and not ERROR) otherwise return null
// TODO: Setup roomConfig based on the result packet containing config values
JID roomJid = new JID(roomName);
......
......@@ -16,31 +16,33 @@ import org.apache.commons.httpclient.auth.AuthScope;
import org.apache.commons.httpclient.methods.*;
import org.dom4j.*;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.openfire.IQResultListener;
import org.jivesoftware.openfire.IQRouter;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.XMPPServerInfo;
import org.jivesoftware.openfire.muc.spi.MultiUserChatServiceImpl;
import org.jivesoftware.openfire.auth.AuthFactory;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import static org.jivesoftware.openfire.clearspace.ClearspaceManager.HttpType.GET;
import static org.jivesoftware.openfire.clearspace.ClearspaceManager.HttpType.POST;
import org.jivesoftware.openfire.component.ExternalComponentConfiguration;
import org.jivesoftware.openfire.component.ExternalComponentManager;
import org.jivesoftware.openfire.component.ExternalComponentManagerListener;
import org.jivesoftware.openfire.component.*;
import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.group.GroupNotFoundException;
import org.jivesoftware.openfire.muc.spi.MultiUserChatServiceImpl;
import org.jivesoftware.openfire.net.MXParser;
import org.jivesoftware.openfire.user.UserNotFoundException;
import org.jivesoftware.util.*;
import org.jivesoftware.util.cache.DefaultCache;
import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.component.Component;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.*;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
......@@ -54,50 +56,25 @@ import java.util.*;
*
* @author Daniel Henninger
*/
public class ClearspaceManager extends BasicModule implements ExternalComponentManagerListener {
private ConfigClearspaceTask configClearspaceTask;
/**
* Different kind of HTTP request types
*/
public enum HttpType {
/**
* Represents an HTTP Get request. And it's equivalent to a SQL SELECTE.
*/
GET,
/**
* Represents an HTTP Post request. And it's equivalent to a SQL UPDATE.
*/
POST,
/**
* Represents an HTTP Delete request. And it's equivalent to a SQL DELETE.
*/
DELETE,
/**
* Represents an HTTP Put requests.And it's equivalent to a SQL CREATE.
*/
PUT
}
public class ClearspaceManager extends BasicModule implements ExternalComponentManagerListener, ComponentEventListener {
/**
* This is the username of the user that Openfires uses to connect
* to Clearspace. It is fixed a well known by Openfire and Clearspace.
*/
private static final String OPENFIRE_USERNAME = "openfire_SHRJKZCNU53";
private static final String WEBSERVICES_PATH = "rpc/rest/";
protected static final String IM_URL_PREFIX = "imService/";
public static final String MUC_SUBDOMAIN = "clearspace-conference";
private static final String MUC_DESCRIPTION = "Clearspace Conference Services";
private static ThreadLocal<XMPPPacketReader> localParser = null;
private static XmlPullParserFactory factory = null;
/**
* This map is used to transale exceptions from CS to OF
*/
private static final Map<String, String> exceptionMap;
private static ClearspaceManager instance = new ClearspaceManager();
static {
try {
......@@ -116,13 +93,7 @@ public class ClearspaceManager extends BasicModule implements ExternalComponentM
return parser;
}
};
}
// This map is used to transale exceptions from CS to OF
private static final Map<String, String> exceptionMap;
static {
// Add a new exception map from CS to OF and it will be automatically translated.
exceptionMap = new HashMap<String, String>();
exceptionMap.put("com.jivesoftware.base.UserNotFoundException", "org.jivesoftware.openfire.user.UserNotFoundException");
......@@ -132,8 +103,7 @@ public class ClearspaceManager extends BasicModule implements ExternalComponentM
exceptionMap.put("org.acegisecurity.BadCredentialsException", "org.jivesoftware.openfire.auth.UnauthorizedException");
}
private static ClearspaceManager instance = new ClearspaceManager();
private ConfigClearspaceTask configClearspaceTask;
private Map<String, String> properties;
private String uri;
private String host;
......@@ -141,6 +111,10 @@ public class ClearspaceManager extends BasicModule implements ExternalComponentM
private String sharedSecret;
private Map<String, Long> userIDCache;
private Map<String, Long> groupIDCache;
/**
* Keep the domains of Clearspace components
*/
private List<String> clearspaces = new ArrayList<String>();
/**
* Provides singleton access to an instance of the ClearspaceManager class.
......@@ -412,6 +386,8 @@ public class ClearspaceManager extends BasicModule implements ExternalComponentM
}
// Listen for changes to external component settings
ExternalComponentManager.addListener(this);
// List for registration of new components
InternalComponentManager.getInstance().addListener(this);
// Set up custom clearspace MUC service
// Create service if it doesn't exist, load if it does.
MultiUserChatServiceImpl muc = (MultiUserChatServiceImpl)XMPPServer.getInstance().getMultiUserChatManager().getMultiUserChatService(MUC_SUBDOMAIN);
......@@ -856,13 +832,76 @@ public class ClearspaceManager extends BasicModule implements ExternalComponentM
}
/**
* Returns the Clearspace External XMPP Component.
* Sends an IQ packet to the Clearspace external component and returns the IQ packet
* returned by CS or <tt>null</tt> if no answer was received before the specified
* timeout.<p>
*
* @return the Clearspace External XMPP Component.
* The returned packet will be handled by the server and routed to the entity that sent
* the original IQ packet. Since this method block and listen to the replied IQ packet
* then the entity that sent the original IQ packet should ignore any reply related to
* the originating IQ packet.
*
* @param packet IQ packet to send.
* @param timeout milliseconds to wait before timing out.
* @return IQ packet returned by Clearspace responsing the packet we sent.
*/
protected Component getComponent() {
// TODO: Implement
return null;
public IQ query(final IQ packet, int timeout) {
// Complain if FROM is empty
if (packet.getFrom() == null) {
throw new IllegalStateException("IQ packets with no FROM cannot be sent to Clearspace");
}
// If CS is not connected then return null
if (clearspaces.isEmpty()) {
return null;
}
// Set the target address to the IQ packet
// TODO Use round robin to distribute load
packet.setTo(clearspaces.get(0));
final LinkedBlockingQueue<IQ> answer = new LinkedBlockingQueue<IQ>(8);
final IQRouter router = XMPPServer.getInstance().getIQRouter();
router.addIQResultListener(packet.getID(), new IQResultListener() {
public void receivedAnswer(IQ packet) {
answer.offer(packet);
}
public void answerTimeout(String packetId) {
Log.warn("No answer from Clearspace was received for IQ stanza: " + packet);
}
});
XMPPServer.getInstance().getIQRouter().route(packet);
IQ reply = null;
try {
reply = answer.poll(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Ignore
}
return reply;
}
public void componentRegistered(JID componentJID) {
// Do nothing
}
public void componentUnregistered(JID componentJID) {
// Remove stored information about this component
clearspaces.remove(componentJID.getDomain());
}
public void componentInfoReceived(IQ iq) {
// Check if it's a Clearspace component
boolean isClearspace = false;
Element childElement = iq.getChildElement();
for (Iterator it = childElement.elementIterator("identity"); it.hasNext();) {
Element identity = (Element)it.next();
if ("component".equals(identity.attributeValue("category")) &&
"clearspace".equals(identity.attributeValue("type"))) {
isClearspace = true;
}
}
// If component is Clearspace then keep track of the component
if (isClearspace) {
clearspaces.add(iq.getFrom().getDomain());
}
}
private class ConfigClearspaceTask extends TimerTask {
......@@ -879,4 +918,30 @@ public class ClearspaceManager extends BasicModule implements ExternalComponentM
}
}
}
/**
* Different kind of HTTP request types
*/
public enum HttpType {
/**
* Represents an HTTP Get request. And it's equivalent to a SQL SELECTE.
*/
GET,
/**
* Represents an HTTP Post request. And it's equivalent to a SQL UPDATE.
*/
POST,
/**
* Represents an HTTP Delete request. And it's equivalent to a SQL DELETE.
*/
DELETE,
/**
* Represents an HTTP Put requests.And it's equivalent to a SQL CREATE.
*/
PUT
}
}
......@@ -11,14 +11,24 @@
package org.jivesoftware.openfire.component;
import org.xmpp.component.Component;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
/**
* Interface to listen for component events. Use the
* {@link InternalComponentManager#addListener(ComponentEventListener)}
* method to register for events.
* method to register for events.<p>
*
* The registered event will be triggered only once no matter how many
* times a component is physically connected to the server or to how
* many cluster nodes is connected. Likewise, the unregistered event
* will be triggered only when the last connection of the component
* is no longer available.<p>
*
* When running inside of a cluster each cluster node will get these
* event notifications. For instance, if you have a cluster of two nodes
* and a component connects to a node then both nodes will get the
* event notification.
*
* @author Gaston Dombiak
*/
......@@ -31,18 +41,17 @@ public interface ComponentEventListener {
* of the server since the component has not answered the disco#info request sent
* by the server.
*
* @param component the newly added component.
* @param componentJID address where the component can be located (e.g. search.myserver.com)
*/
public void componentRegistered(Component component, JID componentJID);
public void componentRegistered(JID componentJID);
/**
* A component was removed.
* A component was removed. This means that no other cluster node has this component
* and this was the last connection of the component.
*
* @param component the removed component.
* @param componentJID address where the component was located (e.g. search.myserver.com)
*/
public void componentUnregistered(Component component, JID componentJID);
public void componentUnregistered(JID componentJID);
/**
* The server has received a disco#info response from the component. Once a component
......@@ -50,8 +59,7 @@ public interface ComponentEventListener {
* component to discover if service discover is supported by the component. This event
* is triggered when the server received the response of the component.
*
* @param component the component that answered the disco#info request.
* @param iq the IQ packet with the disco#info sent by the component.
*/
public void componentInfoReceived(Component component, IQ iq);
public void componentInfoReceived(IQ iq);
}
......@@ -18,11 +18,15 @@ import org.jivesoftware.openfire.disco.IQDiscoItemsHandler;
import org.jivesoftware.openfire.session.ComponentSession;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.CacheFactory;
import org.xmpp.component.Component;
import org.xmpp.component.ComponentException;
import org.xmpp.component.ComponentManager;
import org.xmpp.component.ComponentManagerFactory;
import org.xmpp.packet.*;
import org.xmpp.packet.IQ;
import org.xmpp.packet.JID;
import org.xmpp.packet.Packet;
import org.xmpp.packet.Presence;
import java.util.ArrayList;
import java.util.Collections;
......@@ -113,6 +117,7 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
}
Log.debug("InternalComponentManager: Registering component for domain: " + subdomain);
JID componentJID = new JID(subdomain + "." + serverDomain);
boolean notifyListeners = false;
if (routable != null) {
routable.addComponent(component);
}
......@@ -120,8 +125,11 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
routable = new RoutableComponents(componentJID, component);
routables.put(subdomain, routable);
if (!routingTable.hasComponentRoute(componentJID)) {
notifyListeners = true;
}
// Add the route to the new service provided by the component
XMPPServer.getInstance().getRoutingTable().addComponentRoute(componentJID, routable);
routingTable.addComponentRoute(componentJID, routable);
}
// Initialize the new component
......@@ -129,9 +137,11 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
component.initialize(componentJID, this);
component.start();
// Notify listeners that a new component has been registered
for (ComponentEventListener listener : listeners) {
listener.componentRegistered(component, componentJID);
if (notifyListeners) {
// Notify listeners that a new component has been registered
notifyComponentRegistered(componentJID);
// Alert other nodes of new registered domain event
CacheFactory.doClusterTask(new NotifyComponentRegistered(componentJID));
}
// Check for potential interested users.
......@@ -162,6 +172,12 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
}
}
void notifyComponentRegistered(JID componentJID) {
for (ComponentEventListener listener : listeners) {
listener.componentRegistered(componentJID);
}
}
/**
* Removes a component. The {@link Component#shutdown} method will be called on the
* component. Note that if the component was an external component that was connected
......@@ -186,6 +202,9 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
* @param component specific component to remove.
*/
public void removeComponent(String subdomain, Component component) {
if (component == null) {
return;
}
synchronized (routables) {
Log.debug("InternalComponentManager: Unregistering component for domain: " + subdomain);
RoutableComponents routable = routables.get(subdomain);
......@@ -193,31 +212,25 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
if (routable.numberOfComponents() == 0) {
routables.remove(subdomain);
// Remove any info stored with the component being removed
componentInfo.remove(subdomain);
JID componentJID = new JID(subdomain + "." + serverDomain);
// Remove the route for the service provided by the component
RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
if (routingTable != null) {
routingTable.removeComponentRoute(componentJID);
}
// Remove the disco item from the server for the component that is being removed
IQDiscoItemsHandler iqDiscoItemsHandler = XMPPServer.getInstance().getIQDiscoItemsHandler();
if (iqDiscoItemsHandler != null) {
iqDiscoItemsHandler.removeComponentItem(componentJID.toBareJID());
}
routingTable.removeComponentRoute(componentJID);
// Ask the component to shutdown
if (component != null) {
component.shutdown();
}
component.shutdown();
// Notify listeners that an existing component has been unregistered
for (ComponentEventListener listener : listeners) {
listener.componentUnregistered(component, componentJID);
if (!routingTable.hasComponentRoute(componentJID)) {
// Remove the disco item from the server for the component that is being removed
IQDiscoItemsHandler iqDiscoItemsHandler = XMPPServer.getInstance().getIQDiscoItemsHandler();
if (iqDiscoItemsHandler != null) {
iqDiscoItemsHandler.removeComponentItem(componentJID.toBareJID());
}
removeComponentInfo(componentJID);
// Notify listeners that an existing component has been unregistered
notifyComponentUnregistered(componentJID);
// Alert other nodes of component removed event
CacheFactory.doClusterTask(new NotifyComponentUnregistered(componentJID));
}
Log.debug("InternalComponentManager: Component unregistered for domain: " + subdomain);
}
......@@ -227,6 +240,17 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
}
}
void notifyComponentUnregistered(JID componentJID) {
for (ComponentEventListener listener : listeners) {
listener.componentUnregistered(componentJID);
}
}
void removeComponentInfo(JID componentJID) {
// Remove any info stored with the component being removed
componentInfo.remove(componentJID.getDomain());
}
public void sendPacket(Component component, Packet packet) {
if (packet != null && packet.getFrom() == null) {
throw new IllegalArgumentException("Packet with no FROM address was received from component.");
......@@ -274,17 +298,13 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
public void addListener(ComponentEventListener listener) {
listeners.add(listener);
// Notify the new listener about existing components
for (Map.Entry<String, RoutableComponents> entry : routables.entrySet()) {
String subdomain = entry.getKey();
RoutableComponents routable = entry.getValue();
for (Component component : routable.getComponents()) {
JID componentJID = new JID(subdomain + "." + serverDomain);
listener.componentRegistered(component, componentJID);
// Check if there is disco#info stored for the component
IQ disco = componentInfo.get(subdomain);
if (disco != null) {
listener.componentInfoReceived(component, disco);
}
for (String domain : routingTable.getComponentsDomains()) {
JID componentJID = new JID(domain);
listener.componentRegistered(componentJID);
// Check if there is disco#info stored for the component
IQ disco = componentInfo.get(domain);
if (disco != null) {
listener.componentInfoReceived(disco);
}
}
}
......@@ -514,18 +534,27 @@ public class InternalComponentManager extends BasicModule implements ComponentMa
" - " + packet.toXML(), e);
}
// Store the IQ disco#info returned by the component
String subdomain = packet.getFrom().getDomain().replace("." + serverDomain, "");
componentInfo.put(subdomain, iq);
addComponentInfo(iq);
// Notify listeners that a component answered the disco#info request
for (ComponentEventListener listener : listeners) {
listener.componentInfoReceived(component, iq);
}
notifyComponentInfo(iq);
// Alert other cluster nodes
CacheFactory.doClusterTask(new NotifyComponentInfo(iq));
}
}
}
}
}
void notifyComponentInfo(IQ iq) {
for (ComponentEventListener listener : listeners) {
listener.componentInfoReceived(iq);
}
}
void addComponentInfo(IQ iq) {
componentInfo.put(iq.getFrom().getDomain(), iq);
}
/**
* Exposes a Component as a RoutableChannelHandler.
*/
......
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2008 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.openfire.component;
import org.dom4j.Element;
import org.dom4j.tree.DefaultElement;
import org.jivesoftware.util.cache.ClusterTask;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.xmpp.packet.IQ;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
/**
* Task that will be executed on other cluster nodes to trigger the event that a component has
* send its disco#info to the server.
*
* @author Gaston Dombiak
*/
public class NotifyComponentInfo implements ClusterTask {
private IQ iq;
public NotifyComponentInfo() {
}
public NotifyComponentInfo(IQ iq) {
this.iq = iq;
}
public Object getResult() {
return null;
}
public void run() {
final InternalComponentManager manager = InternalComponentManager.getInstance();
manager.addComponentInfo(iq);
manager.notifyComponentInfo(iq);
}
public void writeExternal(ObjectOutput out) throws IOException {
ExternalizableUtil.getInstance().writeSerializable(out, (DefaultElement) iq.getElement());
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
Element packetElement = (Element) ExternalizableUtil.getInstance().readSerializable(in);
iq = new IQ(packetElement, true);
}
}
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2008 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.openfire.component;
import org.jivesoftware.util.cache.ClusterTask;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.xmpp.packet.JID;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
/**
* Task that will be executed on other cluster nodes to trigger the event that a component was
* added to a cluster node.
*
* @author Gaston Dombiak
*/
public class NotifyComponentRegistered implements ClusterTask {
private JID componentJID;
public NotifyComponentRegistered() {
}
public NotifyComponentRegistered(JID componentJID) {
this.componentJID = componentJID;
}
public Object getResult() {
return null;
}
public void run() {
InternalComponentManager.getInstance().notifyComponentRegistered(componentJID);
}
public void writeExternal(ObjectOutput out) throws IOException {
ExternalizableUtil.getInstance().writeSafeUTF(out, componentJID.toString());
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
componentJID = new JID(ExternalizableUtil.getInstance().readSafeUTF(in));
}
}
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2008 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.openfire.component;
import org.jivesoftware.util.cache.ClusterTask;
import org.jivesoftware.util.cache.ExternalizableUtil;
import org.xmpp.packet.JID;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
/**
* Task that will be executed on other cluster nodes to trigger the event that a component was
* removed from a cluster node.
*
* @author Gaston Dombiak
*/
public class NotifyComponentUnregistered implements ClusterTask {
private JID componentJID;
public NotifyComponentUnregistered() {
}
public NotifyComponentUnregistered(JID componentJID) {
this.componentJID = componentJID;
}
public Object getResult() {
return null;
}
public void run() {
final InternalComponentManager manager = InternalComponentManager.getInstance();
manager.removeComponentInfo(componentJID);
manager.notifyComponentUnregistered(componentJID);
}
public void writeExternal(ObjectOutput out) throws IOException {
ExternalizableUtil.getInstance().writeSafeUTF(out, componentJID.toString());
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
componentJID = new JID(ExternalizableUtil.getInstance().readSafeUTF(in));
}
}
......@@ -20,7 +20,6 @@ import org.jivesoftware.util.FastDateFormat;
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.xmpp.component.Component;
import org.xmpp.packet.*;
import java.util.*;
......@@ -133,15 +132,15 @@ public class PacketCopier implements PacketInterceptor, ComponentEventListener {
}
}
public void componentInfoReceived(Component component, IQ iq) {
public void componentInfoReceived(IQ iq) {
//Ignore
}
public void componentRegistered(Component component, JID componentJID) {
public void componentRegistered(JID componentJID) {
//Ignore
}
public void componentUnregistered(Component component, JID componentJID) {
public void componentUnregistered(JID componentJID) {
//Remove component from the list of subscribers (if subscribed)
removeSubscriber(componentJID);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment