Commit 0e03d9df authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Refactoring work.

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@3988 b35dd754-fafc-0310-a699-88a17e54d16e
parent 33c65b66
...@@ -133,8 +133,7 @@ public class InternalComponentManager implements ComponentManager, RoutableChann ...@@ -133,8 +133,7 @@ public class InternalComponentManager implements ComponentManager, RoutableChann
} }
public void sendPacket(Component component, Packet packet) { public void sendPacket(Component component, Packet packet) {
PacketRouter router; PacketRouter router = XMPPServer.getInstance().getPacketRouter();
router = XMPPServer.getInstance().getPacketRouter();
if (router != null) { if (router != null) {
router.route(packet); router.route(packet);
} }
......
...@@ -15,6 +15,7 @@ import org.dom4j.Element; ...@@ -15,6 +15,7 @@ import org.dom4j.Element;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.wildfire.ClientSession; import org.jivesoftware.wildfire.ClientSession;
import org.jivesoftware.wildfire.PacketRouter; import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.RoutingTable;
import org.jivesoftware.wildfire.auth.UnauthorizedException; import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserException;
import org.xmpp.packet.IQ; import org.xmpp.packet.IQ;
...@@ -39,9 +40,9 @@ import java.net.Socket; ...@@ -39,9 +40,9 @@ import java.net.Socket;
*/ */
public class ClientSocketReader extends SocketReader { public class ClientSocketReader extends SocketReader {
public ClientSocketReader(PacketRouter router, String serverName, Socket socket, public ClientSocketReader(PacketRouter router, RoutingTable routingTable, String serverName,
SocketConnection connection, boolean useBlockingMode) { Socket socket, SocketConnection connection, boolean useBlockingMode) {
super(router, serverName, socket, connection, useBlockingMode); super(router, routingTable, serverName, socket, connection, useBlockingMode);
} }
protected void processIQ(IQ packet) throws UnauthorizedException { protected void processIQ(IQ packet) throws UnauthorizedException {
......
...@@ -14,6 +14,7 @@ package org.jivesoftware.wildfire.net; ...@@ -14,6 +14,7 @@ package org.jivesoftware.wildfire.net;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.PacketRouter; import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.RoutingTable;
import org.jivesoftware.wildfire.auth.UnauthorizedException; import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.component.ComponentSession; import org.jivesoftware.wildfire.component.ComponentSession;
import org.jivesoftware.wildfire.component.InternalComponentManager; import org.jivesoftware.wildfire.component.InternalComponentManager;
...@@ -32,9 +33,9 @@ import java.net.Socket; ...@@ -32,9 +33,9 @@ import java.net.Socket;
*/ */
public class ComponentSocketReader extends SocketReader { public class ComponentSocketReader extends SocketReader {
public ComponentSocketReader(PacketRouter router, String serverName, Socket socket, public ComponentSocketReader(PacketRouter router, RoutingTable routingTable, String serverName,
SocketConnection connection, boolean useBlockingMode) { Socket socket, SocketConnection connection, boolean useBlockingMode) {
super(router, serverName, socket, connection, useBlockingMode); super(router, routingTable, serverName, socket, connection, useBlockingMode);
} }
/** /**
......
...@@ -13,13 +13,19 @@ package org.jivesoftware.wildfire.net; ...@@ -13,13 +13,19 @@ package org.jivesoftware.wildfire.net;
import org.dom4j.Element; import org.dom4j.Element;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log; import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.*; import org.jivesoftware.wildfire.RoutingTable;
import org.jivesoftware.wildfire.Session;
import org.jivesoftware.wildfire.SessionManager;
import org.jivesoftware.wildfire.auth.UnauthorizedException; import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.multiplex.ConnectionMultiplexerSession; import org.jivesoftware.wildfire.multiplex.ConnectionMultiplexerSession;
import org.jivesoftware.wildfire.multiplex.MultiplexerPacketHandler; import org.jivesoftware.wildfire.multiplex.MultiplexerPacketHandler;
import org.jivesoftware.wildfire.multiplex.Route;
import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserException;
import org.xmpp.packet.*; import org.xmpp.packet.IQ;
import org.xmpp.packet.Message;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
...@@ -62,15 +68,15 @@ public class ConnectionMultiplexerSocketReader extends SocketReader { ...@@ -62,15 +68,15 @@ public class ConnectionMultiplexerSocketReader extends SocketReader {
* Pool of threads that are available for processing the requests. * Pool of threads that are available for processing the requests.
*/ */
private ThreadPoolExecutor threadPool; private ThreadPoolExecutor threadPool;
private SessionManager sessionManager;
/** /**
* Handler of IQ packets sent from the Connection Manager to the server. * Handler of IQ packets sent from the Connection Manager to the server.
*/ */
private MultiplexerPacketHandler packetHandler; private MultiplexerPacketHandler packetHandler;
public ConnectionMultiplexerSocketReader(PacketRouter router, String serverName, Socket socket, public ConnectionMultiplexerSocketReader(PacketRouter router, RoutingTable routingTable,
SocketConnection connection, boolean useBlockingMode) { String serverName, Socket socket, SocketConnection connection,
super(router, serverName, socket, connection, useBlockingMode); boolean useBlockingMode) {
super(router, routingTable, serverName, socket, connection, useBlockingMode);
// Create a pool of threads that will process received packets. If more threads are // Create a pool of threads that will process received packets. If more threads are
// required then the command will be executed on the SocketReader process // required then the command will be executed on the SocketReader process
int coreThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.core.threads", 10); int coreThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.core.threads", 10);
...@@ -80,7 +86,6 @@ public class ConnectionMultiplexerSocketReader extends SocketReader { ...@@ -80,7 +86,6 @@ public class ConnectionMultiplexerSocketReader extends SocketReader {
new ThreadPoolExecutor(coreThreads, maxThreads, 60, TimeUnit.SECONDS, new ThreadPoolExecutor(coreThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueSize), new LinkedBlockingQueue<Runnable>(queueSize),
new ThreadPoolExecutor.CallerRunsPolicy()); new ThreadPoolExecutor.CallerRunsPolicy());
sessionManager = XMPPServer.getInstance().getSessionManager();
} }
boolean createSession(String namespace) boolean createSession(String namespace)
...@@ -100,8 +105,7 @@ public class ConnectionMultiplexerSocketReader extends SocketReader { ...@@ -100,8 +105,7 @@ public class ConnectionMultiplexerSocketReader extends SocketReader {
protected void processIQ(final IQ packet) throws UnauthorizedException { protected void processIQ(final IQ packet) throws UnauthorizedException {
if (session.getStatus() != Session.STATUS_AUTHENTICATED) { if (session.getStatus() != Session.STATUS_AUTHENTICATED) {
// Session is not authenticated and IQ packet is not being sent to the // Session is not authenticated so return error
// connection manager
IQ reply = new IQ(); IQ reply = new IQ();
reply.setChildElement(packet.getChildElement().createCopy()); reply.setChildElement(packet.getChildElement().createCopy());
reply.setID(packet.getID()); reply.setID(packet.getID());
...@@ -114,31 +118,22 @@ public class ConnectionMultiplexerSocketReader extends SocketReader { ...@@ -114,31 +118,22 @@ public class ConnectionMultiplexerSocketReader extends SocketReader {
// Process the packet in another thread // Process the packet in another thread
threadPool.execute(new Runnable() { threadPool.execute(new Runnable() {
public void run() { public void run() {
try {
JID from = packet.getFrom();
if (from != null && from.equals(session.getAddress())) {
// IQ packets sent from the connection manager itself have a special
// processing logic. No route is created to the Connection Managers
// so we need to catch IQ packets here and process them
packetHandler.handle(packet); packetHandler.handle(packet);
} }
else {
// Increment packet counter of the client session
incrementClientPacketCount(from);
}
// Process and route the packet
ConnectionMultiplexerSocketReader.super.processIQ(packet);
}
catch (UnauthorizedException e) {
Log.error("Error processing packet", e);
}
}
}); });
} }
protected void processMessage(final Message packet) throws UnauthorizedException { /**
* Process stanza sent by a client that is connected to a connection manager. The
* original stanza is wrapped in the route element. Only a single stanza must be
* wrapped in the route element.
*
* @param packet the route element.
*/
private void processRoute(final Route packet) throws UnauthorizedException {
if (session.getStatus() != Session.STATUS_AUTHENTICATED) { if (session.getStatus() != Session.STATUS_AUTHENTICATED) {
Message reply = new Message(); // Session is not authenticated so return error
Route reply = new Route(packet.getStreamID());
reply.setID(packet.getID()); reply.setID(packet.getID());
reply.setTo(packet.getFrom()); reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo()); reply.setFrom(packet.getTo());
...@@ -149,60 +144,43 @@ public class ConnectionMultiplexerSocketReader extends SocketReader { ...@@ -149,60 +144,43 @@ public class ConnectionMultiplexerSocketReader extends SocketReader {
// Process the packet in another thread // Process the packet in another thread
threadPool.execute(new Runnable() { threadPool.execute(new Runnable() {
public void run() { public void run() {
try { packetHandler.route(packet);
// Increment packet counter of the client session
incrementClientPacketCount(packet.getFrom());
// Process and route the packet
ConnectionMultiplexerSocketReader.super.processMessage(packet);
}
catch (UnauthorizedException e) {
Log.error("Error processing packet", e);
}
} }
}); });
} }
protected void processMessage(final Message packet) throws UnauthorizedException {
throw new UnauthorizedException("Message packets are not supported. Original packets " +
"should be wrapped by IQ packets.");
}
protected void processPresence(final Presence packet) throws UnauthorizedException { protected void processPresence(final Presence packet) throws UnauthorizedException {
if (session.getStatus() != Session.STATUS_AUTHENTICATED) { throw new UnauthorizedException("Message packets are not supported. Original packets " +
Presence reply = new Presence(); "should be wrapped by IQ packets.");
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setError(PacketError.Condition.not_authorized);
session.process(reply);
return;
} }
// Process the packet in another thread
threadPool.execute(new Runnable() { boolean processUnknowPacket(Element doc) {
public void run() { String tag = doc.getName();
if ("route".equals(tag)) {
// Process stanza wrapped by the route packet
try { try {
// Increment packet counter of the client session processRoute(new Route(doc));
incrementClientPacketCount(packet.getFrom()); return true;
// Process and route the packet
ConnectionMultiplexerSocketReader.super.processPresence(packet);
} }
catch (UnauthorizedException e) { catch (UnauthorizedException e) {
Log.error("Error processing packet", e); // Should never happen
} }
} }
}); else if ("handshake".equals(tag)) {
}
boolean processUnknowPacket(Element doc) {
if ("handshake".equals(doc.getName())) {
open = ((ConnectionMultiplexerSession)session).authenticate(doc.getStringValue()); open = ((ConnectionMultiplexerSession)session).authenticate(doc.getStringValue());
return true; return true;
} }
return false; else if ("error".equals(tag) && "stream".equals(doc.getNamespacePrefix())) {
} session.getConnection().close();
open = false;
private void incrementClientPacketCount(JID from) { return true;
if (from != null) {
ClientSession originatingSession = sessionManager.getSession(from);
if (originatingSession != null) {
originatingSession.incrementClientPacketCount();
}
} }
return false;
} }
String getName() { String getName() {
......
...@@ -15,6 +15,7 @@ import org.dom4j.Element; ...@@ -15,6 +15,7 @@ import org.dom4j.Element;
import org.jivesoftware.util.JiveGlobals; import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log; import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.PacketRouter; import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.RoutingTable;
import org.jivesoftware.wildfire.auth.UnauthorizedException; import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.interceptor.PacketRejectedException; import org.jivesoftware.wildfire.interceptor.PacketRejectedException;
import org.jivesoftware.wildfire.server.IncomingServerSession; import org.jivesoftware.wildfire.server.IncomingServerSession;
...@@ -49,9 +50,9 @@ public class ServerSocketReader extends SocketReader { ...@@ -49,9 +50,9 @@ public class ServerSocketReader extends SocketReader {
*/ */
private ThreadPoolExecutor threadPool; private ThreadPoolExecutor threadPool;
public ServerSocketReader(PacketRouter router, String serverName, Socket socket, public ServerSocketReader(PacketRouter router, RoutingTable routingTable, String serverName,
SocketConnection connection, boolean useBlockingMode) { Socket socket, SocketConnection connection, boolean useBlockingMode) {
super(router, serverName, socket, connection, useBlockingMode); super(router, routingTable, serverName, socket, connection, useBlockingMode);
// Create a pool of threads that will process received packets. If more threads are // Create a pool of threads that will process received packets. If more threads are
// required then the command will be executed on the SocketReader process // required then the command will be executed on the SocketReader process
int coreThreads = JiveGlobals.getIntProperty("xmpp.server.processing.core.threads", 2); int coreThreads = JiveGlobals.getIntProperty("xmpp.server.processing.core.threads", 2);
......
...@@ -46,18 +46,33 @@ public abstract class SocketReader implements Runnable { ...@@ -46,18 +46,33 @@ public abstract class SocketReader implements Runnable {
*/ */
private static XmlPullParserFactory factory = null; private static XmlPullParserFactory factory = null;
/**
* Session associated with the socket reader.
*/
protected Session session; protected Session session;
/**
* Reference to the physical connection.
*/
protected SocketConnection connection; protected SocketConnection connection;
/**
* Server name for which we are attending clients.
*/
protected String serverName; protected String serverName;
/** /**
* Router used to route incoming packets to the correct channels. * Router used to route incoming packets to the correct channels.
*/ */
private PacketRouter router; private PacketRouter router;
/**
* Routing table used for checking whether a domain is known or not.
*/
private RoutingTable routingTable;
/**
* Specifies whether the socket is using blocking or non-blocking connections.
*/
private SocketReadingMode readingMode; private SocketReadingMode readingMode;
XMPPPacketReader reader = null; XMPPPacketReader reader = null;
protected boolean open; protected boolean open;
private RoutingTable routingTable = XMPPServer.getInstance().getRoutingTable();
static { static {
try { try {
...@@ -72,15 +87,17 @@ public abstract class SocketReader implements Runnable { ...@@ -72,15 +87,17 @@ public abstract class SocketReader implements Runnable {
* Creates a dedicated reader for a socket. * Creates a dedicated reader for a socket.
* *
* @param router the router for sending packets that were read. * @param router the router for sending packets that were read.
* @param routingTable the table that keeps routes to registered services.
* @param serverName the name of the server this socket is working for. * @param serverName the name of the server this socket is working for.
* @param socket the socket to read from. * @param socket the socket to read from.
* @param connection the connection being read. * @param connection the connection being read.
* @param useBlockingMode true means that the server will use a thread per connection. * @param useBlockingMode true means that the server will use a thread per connection.
*/ */
public SocketReader(PacketRouter router, String serverName, Socket socket, public SocketReader(PacketRouter router, RoutingTable routingTable, String serverName,
SocketConnection connection, boolean useBlockingMode) { Socket socket, SocketConnection connection, boolean useBlockingMode) {
this.serverName = serverName; this.serverName = serverName;
this.router = router; this.router = router;
this.routingTable = routingTable;
this.connection = connection; this.connection = connection;
connection.setSocketReader(this); connection.setSocketReader(this);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment