Commit eab6196e authored by Derek DeMoro's avatar Derek DeMoro Committed by derek

Refactoring changes


git-svn-id: http://svn.igniterealtime.org/svn/repos/messenger/trunk@596 b35dd754-fafc-0310-a699-88a17e54d16e
parent fe5c6144
...@@ -13,11 +13,11 @@ package org.jivesoftware.messenger; ...@@ -13,11 +13,11 @@ package org.jivesoftware.messenger;
import org.jivesoftware.messenger.auth.UnauthorizedException; import org.jivesoftware.messenger.auth.UnauthorizedException;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import org.dom4j.io.XMLWriter;
import org.xmlpull.v1.XmlPullParserException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
/** /**
* <p>Represents a connection on the server.</p> * <p>Represents a connection on the server.</p>
...@@ -62,7 +62,7 @@ public interface Connection { ...@@ -62,7 +62,7 @@ public interface Connection {
* @return The XmlSerializer underlying this connection * @return The XmlSerializer underlying this connection
* @throws UnauthorizedException If caller doesn't have permission to access this resource * @throws UnauthorizedException If caller doesn't have permission to access this resource
*/ */
XMLStreamWriter getSerializer() throws UnauthorizedException; XMLWriter getSerializer() throws UnauthorizedException;
/** /**
* Close this session including associated socket connection. * Close this session including associated socket connection.
...@@ -128,7 +128,7 @@ public interface Connection { ...@@ -128,7 +128,7 @@ public interface Connection {
* *
* @param packet The packet to deliver. * @param packet The packet to deliver.
* @throws UnauthorizedException If caller doesn't have permission to access this resource * @throws UnauthorizedException If caller doesn't have permission to access this resource
* @throws XMLStreamException if there was a problem sending the packet * @throws org.xmlpull.v1.XmlPullParserException if there was a problem sending the packet
*/ */
void deliver(Packet packet) throws UnauthorizedException, XMLStreamException; void deliver(Packet packet) throws UnauthorizedException, XmlPullParserException;
} }
...@@ -133,8 +133,7 @@ public class OfflineMessageStrategy extends BasicModule { ...@@ -133,8 +133,7 @@ public class OfflineMessageStrategy extends BasicModule {
session.getConnection().deliver(response); session.getConnection().deliver(response);
Message errorResponse = message.createCopy(); Message errorResponse = message.createCopy();
errorResponse.setError(new PacketError(PacketError.Condition.item_not_found, errorResponse.setError(new PacketError(PacketError.Type.continue_processing, PacketError.Condition.item_not_found));
PacketError.Type.continue_processing));
session.getConnection().deliver(errorResponse); session.getConnection().deliver(errorResponse);
} }
catch (Exception e) { catch (Exception e) {
......
...@@ -13,8 +13,7 @@ package org.jivesoftware.messenger; ...@@ -13,8 +13,7 @@ package org.jivesoftware.messenger;
import org.jivesoftware.messenger.auth.UnauthorizedException; import org.jivesoftware.messenger.auth.UnauthorizedException;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import org.xmlpull.v1.XmlPullParserException;
import javax.xml.stream.XMLStreamException;
/** /**
* Delivers packets to locally connected streams. This is the opposite * Delivers packets to locally connected streams. This is the opposite
...@@ -34,6 +33,5 @@ public interface PacketDeliverer { ...@@ -34,6 +33,5 @@ public interface PacketDeliverer {
* @param packet The packet to route * @param packet The packet to route
* @throws java.lang.NullPointerException If the packet is null or the packet could not be routed * @throws java.lang.NullPointerException If the packet is null or the packet could not be routed
*/ */
public void deliver(Packet packet) throws public void deliver(Packet packet) throws UnauthorizedException, PacketException, XmlPullParserException;
UnauthorizedException, PacketException, XMLStreamException;
} }
...@@ -25,7 +25,6 @@ import java.util.Random; ...@@ -25,7 +25,6 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.xml.stream.XMLStreamException;
import org.jivesoftware.messenger.audit.AuditStreamIDFactory; import org.jivesoftware.messenger.audit.AuditStreamIDFactory;
import org.jivesoftware.messenger.auth.UnauthorizedException; import org.jivesoftware.messenger.auth.UnauthorizedException;
import org.jivesoftware.messenger.container.Container; import org.jivesoftware.messenger.container.Container;
...@@ -41,6 +40,7 @@ import org.xmpp.packet.JID; ...@@ -41,6 +40,7 @@ import org.xmpp.packet.JID;
import org.xmpp.packet.Message; import org.xmpp.packet.Message;
import org.xmpp.packet.Packet; import org.xmpp.packet.Packet;
import org.xmpp.packet.Presence; import org.xmpp.packet.Presence;
import org.xmlpull.v1.XmlPullParserException;
/** /**
* Manages the sessions associated with an account. The information * Manages the sessions associated with an account. The information
...@@ -261,7 +261,7 @@ public class SessionManager implements ConnectionCloseListener { ...@@ -261,7 +261,7 @@ public class SessionManager implements ConnectionCloseListener {
* *
* @param packet * @param packet
*/ */
private void broadcast(Packet packet) throws UnauthorizedException, PacketException, XMLStreamException { private void broadcast(Packet packet) throws UnauthorizedException, PacketException, XmlPullParserException {
Iterator entries = resources.values().iterator(); Iterator entries = resources.values().iterator();
while (entries.hasNext()) { while (entries.hasNext()) {
Session session = (Session)entries.next(); Session session = (Session)entries.next();
...@@ -746,7 +746,7 @@ public class SessionManager implements ConnectionCloseListener { ...@@ -746,7 +746,7 @@ public class SessionManager implements ConnectionCloseListener {
* *
* @param packet The packet to be broadcast * @param packet The packet to be broadcast
*/ */
public void broadcast(Packet packet) throws UnauthorizedException, PacketException, XMLStreamException { public void broadcast(Packet packet) throws UnauthorizedException, PacketException, XmlPullParserException {
sessionLock.readLock().lock(); sessionLock.readLock().lock();
try { try {
Iterator values = sessions.values().iterator(); Iterator values = sessions.values().iterator();
...@@ -776,7 +776,7 @@ public class SessionManager implements ConnectionCloseListener { ...@@ -776,7 +776,7 @@ public class SessionManager implements ConnectionCloseListener {
* *
* @param packet The packet to be broadcast * @param packet The packet to be broadcast
*/ */
public void userBroadcast(String username, Packet packet) throws UnauthorizedException, PacketException, XMLStreamException { public void userBroadcast(String username, Packet packet) throws UnauthorizedException, PacketException, XmlPullParserException {
sessionLock.readLock().lock(); sessionLock.readLock().lock();
try { try {
SessionMap sessionMap = (SessionMap)sessions.get(username); SessionMap sessionMap = (SessionMap)sessions.get(username);
......
...@@ -11,22 +11,31 @@ ...@@ -11,22 +11,31 @@
package org.jivesoftware.messenger.net; package org.jivesoftware.messenger.net;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.messenger.*;
import org.jivesoftware.messenger.audit.Auditor;
import org.jivesoftware.messenger.auth.UnauthorizedException;
import org.xmpp.packet.Presence;
import org.xmpp.packet.Message;
import org.xmpp.packet.IQ;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.net.Socket; import java.net.Socket;
import javax.xml.stream.*; import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.io.XMLWriter;
import org.dom4j.io.XPPPacketReader;
import org.jivesoftware.messenger.Connection;
import org.jivesoftware.messenger.PacketRouter;
import org.jivesoftware.messenger.Session;
import org.jivesoftware.messenger.audit.Auditor;
import org.jivesoftware.messenger.auth.UnauthorizedException;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory;
import org.xmpp.packet.IQ;
import org.xmpp.packet.Message;
import org.xmpp.packet.Presence;
/** /**
* @author Iain Shigeoka * @author Derek DeMoro
*/ */
public class SocketReadThread extends Thread { public class SocketReadThread extends Thread {
...@@ -36,14 +45,10 @@ public class SocketReadThread extends Thread { ...@@ -36,14 +45,10 @@ public class SocketReadThread extends Thread {
*/ */
private String charset = "UTF-8"; private String charset = "UTF-8";
private XMLInputFactory xppFactory;
private XMLStreamReader xpp;
private Session session; private Session session;
private Connection connection; private Connection connection;
private static final String ETHERX_NAMESPACE = private static final String ETHERX_NAMESPACE = "http://etherx.jabber.org/streams";
"http://etherx.jabber.org/streams";
private String serverName; private String serverName;
/** /**
...@@ -56,9 +61,10 @@ public class SocketReadThread extends Thread { ...@@ -56,9 +61,10 @@ public class SocketReadThread extends Thread {
*/ */
private Auditor auditor; private Auditor auditor;
private PacketFactory packetFactory;
private boolean clearSignout = false; private boolean clearSignout = false;
XmlPullParserFactory factory = null;
XPPPacketReader reader = null;
/** /**
* Create dedicated read thread for this socket. * Create dedicated read thread for this socket.
...@@ -69,22 +75,15 @@ public class SocketReadThread extends Thread { ...@@ -69,22 +75,15 @@ public class SocketReadThread extends Thread {
* @param sock The socket to read from * @param sock The socket to read from
* @param session The session being read * @param session The session being read
*/ */
public SocketReadThread(PacketRouter router, public SocketReadThread(PacketRouter router, String serverName, Auditor auditor, Socket sock,
PacketFactory packetFactory,
String serverName,
Auditor auditor,
Socket sock,
Session session) { Session session) {
super("SRT reader"); super("SRT reader");
this.serverName = serverName; this.serverName = serverName;
this.router = router; this.router = router;
this.packetFactory = packetFactory;
this.auditor = auditor; this.auditor = auditor;
this.session = session; this.session = session;
connection = session.getConnection(); connection = session.getConnection();
this.sock = sock; this.sock = sock;
xppFactory = XMLInputFactory.newInstance();
xppFactory.setProperty(XMLInputFactory.IS_COALESCING, Boolean.TRUE);
} }
/** /**
...@@ -93,9 +92,16 @@ public class SocketReadThread extends Thread { ...@@ -93,9 +92,16 @@ public class SocketReadThread extends Thread {
*/ */
public void run() { public void run() {
try { try {
xpp = factory = XmlPullParserFactory.newInstance();
xppFactory.createXMLStreamReader(new InputStreamReader(sock.getInputStream(), // factory.setNamespaceAware(true);
charset));
reader = new XPPPacketReader();
reader.setXPPFactory(factory);
reader.getXPPParser().setInput(new InputStreamReader(sock.getInputStream(),
charset));
// Read in the opening tag and prepare for packet stream // Read in the opening tag and prepare for packet stream
createSession(); createSession();
...@@ -109,8 +115,8 @@ public class SocketReadThread extends Thread { ...@@ -109,8 +115,8 @@ public class SocketReadThread extends Thread {
catch (EOFException eof) { catch (EOFException eof) {
// Normal disconnect // Normal disconnect
} }
catch (XMLStreamException ie) { catch (XmlPullParserException ie) {
// Check if the user abruptly cut the connection without sending previously an // Check if the user abruptly cut the connection without sending previously an
// unavailable presence // unavailable presence
if (clearSignout == false) { if (clearSignout == false) {
if (session != null && session.getStatus() == Session.STATUS_AUTHENTICATED) { if (session != null && session.getStatus() == Session.STATUS_AUTHENTICATED) {
...@@ -119,7 +125,6 @@ public class SocketReadThread extends Thread { ...@@ -119,7 +125,6 @@ public class SocketReadThread extends Thread {
// Simulate an unavailable presence sent by the user. // Simulate an unavailable presence sent by the user.
Presence packet = presence.createCopy(); Presence packet = presence.createCopy();
packet.setType(Presence.Type.unavailable); packet.setType(Presence.Type.unavailable);
packet.setType(null);
packet.setFrom(session.getAddress()); packet.setFrom(session.getAddress());
router.route(packet); router.route(packet);
clearSignout = true; clearSignout = true;
...@@ -165,52 +170,41 @@ public class SocketReadThread extends Thread { ...@@ -165,52 +170,41 @@ public class SocketReadThread extends Thread {
* for incoming data. To prevent clients from stalling channel handlers, * for incoming data. To prevent clients from stalling channel handlers,
* a watch dog timer is used. Packets that take longer than the watch * a watch dog timer is used. Packets that take longer than the watch
* dog limit to read will cause the session to be closed. * dog limit to read will cause the session to be closed.
*
* @throws XMLStreamException if there is trouble reading from the socket
*/ */
private void readStream() throws UnauthorizedException, XMLStreamException { private void readStream() throws Exception {
while (true) { while (true) {
for (int eventType = xpp.next(); Document document = reader.parseDocument();
eventType != XMLStreamConstants.START_ELEMENT;
eventType = xpp.next()) {
if (eventType == XMLStreamConstants.CHARACTERS) {
if (!xpp.isWhiteSpace()) {
throw new XMLStreamException(LocaleUtils.getLocalizedString("admin.error.packet.text"));
}
}
else if (eventType == XMLStreamConstants.END_DOCUMENT) {
return;
}
}
String tag = xpp.getLocalName(); if (document != null) {
Element doc = document.getRootElement();
if ("message".equals(tag)) { String tag = doc.getName();
Message packet = packetFactory.getMessage(xpp); if ("message".equals(tag)) {
packet.setFrom(session.getAddress()); Message packet = new Message(doc);
auditor.audit(packet); packet.setFrom(session.getAddress());
router.route(packet); auditor.audit(packet);
session.incrementClientPacketCount(); router.route(packet);
} session.incrementClientPacketCount();
else if ("presence".equals(tag)) { }
Presence packet = packetFactory.getPresence(xpp); else if ("presence".equals(tag)) {
packet.setFrom(session.getAddress()); Presence packet = new Presence(doc);
auditor.audit(packet); packet.setFrom(session.getAddress());
router.route(packet); auditor.audit(packet);
session.incrementClientPacketCount(); router.route(packet);
// Update the flag that indicates if the user made a clean sign out session.incrementClientPacketCount();
clearSignout = (Presence.Type.unavailable == packet.getType() ? true : false); // Update the flag that indicates if the user made a clean sign out
} clearSignout = (Presence.Type.unavailable == packet.getType() ? true : false);
else if ("iq".equals(tag)) { }
IQ packet = packetFactory.getIQ(xpp); else if ("iq".equals(tag)) {
packet.setFrom(session.getAddress()); IQ packet = new IQ(doc);
auditor.audit(packet); packet.setFrom(session.getAddress());
router.route(packet); auditor.audit(packet);
session.incrementClientPacketCount(); router.route(packet);
} session.incrementClientPacketCount();
else { }
throw new XMLStreamException(LocaleUtils.getLocalizedString("admin.error.packet.tag") + tag); else {
throw new XmlPullParserException(LocaleUtils.getLocalizedString("admin.error.packet.tag") + tag);
}
} }
} }
} }
...@@ -226,34 +220,40 @@ public class SocketReadThread extends Thread { ...@@ -226,34 +220,40 @@ public class SocketReadThread extends Thread {
* *
* @throws UnauthorizedException If the caller did not have permission * @throws UnauthorizedException If the caller did not have permission
* to use this method. * to use this method.
* @throws XMLStreamException If the stream is not valid XML
*/ */
private void createSession() throws UnauthorizedException, XMLStreamException { private void createSession() throws UnauthorizedException, XmlPullParserException, IOException, Exception {
XmlPullParser xpp = reader.getXPPParser();
for (int eventType = xpp.getEventType(); for (int eventType = xpp.getEventType();
eventType != XMLStreamConstants.START_ELEMENT; eventType != XmlPullParser.START_TAG;
eventType = xpp.next()) { eventType = xpp.next()) {
} }
// Conduct error checking, the opening tag should be 'stream' // Conduct error checking, the opening tag should be 'stream'
// in the 'etherx' namespace // in the 'etherx' namespace
if (!xpp.getLocalName().equals("stream")) { if (!xpp.getName().equals("stream")) {
throw new XMLStreamException(LocaleUtils.getLocalizedString("admin.error.bad-stream")); throw new XmlPullParserException(LocaleUtils.getLocalizedString("admin.error.bad-stream"));
} }
if (!xpp.getNamespaceURI(xpp.getPrefix()).equals(ETHERX_NAMESPACE)) { if (!xpp.getNamespace(xpp.getPrefix()).equals(ETHERX_NAMESPACE)) {
throw new XMLStreamException(LocaleUtils.getLocalizedString("admin.error.bad-namespace")); throw new XmlPullParserException(LocaleUtils.getLocalizedString("admin.error.bad-namespace"));
} }
XMLStreamWriter xser = connection.getSerializer(); // Flush this to the Connection to start up.
xser.writeStartDocument(); XMLWriter xser = connection.getSerializer();
xser.writeStartElement("stream", "stream", "http://etherx.jabber.org/streams"); String s = "<stream:stream to=\"jivesoftware.com\" xmlns=\"jabber:client\" xmlns:stream=\"http://etherx.jabber.org/streams\" version=\"1.0\" >";
xser.writeNamespace("stream", "http://etherx.jabber.org/streams"); Element streamElement = null;
xser.writeDefaultNamespace("jabber:client"); try {
xser.writeAttribute("from", serverName); streamElement = DocumentHelper.createElement(s);
xser.writeAttribute("id", session.getStreamID().toString()); }
xser.writeCharacters(" "); catch (Exception e) {
e.printStackTrace();
}
streamElement.addAttribute("from", serverName);
streamElement.addAttribute("id", session.getStreamID().toString());
xser.write(streamElement);
xser.flush(); xser.flush();
// TODO: check for SASL support in opening stream tag // TODO: check for SASL support in opening stream tag
} }
} }
...@@ -29,7 +29,6 @@ import java.net.UnknownHostException; ...@@ -29,7 +29,6 @@ import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import javax.xml.stream.XMLStreamException;
public class ConnectionManagerImpl extends BasicModule implements ConnectionManager { public class ConnectionManagerImpl extends BasicModule implements ConnectionManager {
...@@ -109,7 +108,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -109,7 +108,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
public XMPPServer server; public XMPPServer server;
public PacketFactory packetFactory; public PacketFactory packetFactory;
public void addSocket(Socket sock, boolean isSecure) throws XMLStreamException { public void addSocket(Socket sock, boolean isSecure) {
try { try {
// the order of these calls is critical (stupid huh?) // the order of these calls is critical (stupid huh?)
Connection conn = new SocketConnection(deliverer, Connection conn = new SocketConnection(deliverer,
...@@ -118,7 +117,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana ...@@ -118,7 +117,7 @@ public class ConnectionManagerImpl extends BasicModule implements ConnectionMana
isSecure); isSecure);
Session session = sessionManager.createSession(conn); Session session = sessionManager.createSession(conn);
SocketReadThread reader = new SocketReadThread(router, SocketReadThread reader = new SocketReadThread(router,
packetFactory, serverName, auditManager.getAuditor(), serverName, auditManager.getAuditor(),
sock, session); sock, session);
reader.setDaemon(true); reader.setDaemon(true);
reader.start(); reader.start();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment