Commit 90e9058a authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Initial version. Optimization. JM-925

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@6536 b35dd754-fafc-0310-a699-88a17e54d16e
parent e8606540
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.net;
import org.dom4j.Element;
import org.jivesoftware.wildfire.Connection;
import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.jivesoftware.wildfire.multiplex.MultiplexerPacketHandler;
import org.jivesoftware.wildfire.multiplex.Route;
import org.jivesoftware.wildfire.session.ConnectionMultiplexerSession;
import org.jivesoftware.wildfire.session.Session;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import org.xmpp.packet.IQ;
import org.xmpp.packet.Message;
import org.xmpp.packet.PacketError;
import org.xmpp.packet.Presence;
/**
* Handler of XML stanzas sent by Connection Managers.
*
* @author Gaston Dombiak
*/
public class MultiplexerStanzaHandler extends StanzaHandler {
/**
* Handler of IQ packets sent from the Connection Manager to the server.
*/
private MultiplexerPacketHandler packetHandler;
public MultiplexerStanzaHandler(PacketRouter router, String serverName, Connection connection) {
super(router, serverName, connection);
}
protected void processIQ(final IQ packet) {
if (session.getStatus() != Session.STATUS_AUTHENTICATED) {
// Session is not authenticated so return error
IQ reply = new IQ();
reply.setChildElement(packet.getChildElement().createCopy());
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setError(PacketError.Condition.not_authorized);
session.process(reply);
return;
}
// Process the packet
packetHandler.handle(packet);
}
protected void processMessage(final Message packet) throws UnauthorizedException {
throw new UnauthorizedException("Message packets are not supported. Original packets " +
"should be wrapped by route packets.");
}
protected void processPresence(final Presence packet) throws UnauthorizedException {
throw new UnauthorizedException("Message packets are not supported. Original packets " +
"should be wrapped by route packets.");
}
/**
* Process stanza sent by a client that is connected to a connection manager. The
* original stanza is wrapped in the route element. Only a single stanza must be
* wrapped in the route element.
*
* @param packet the route element.
*/
private void processRoute(final Route packet) {
if (session.getStatus() != Session.STATUS_AUTHENTICATED) {
// Session is not authenticated so return error
Route reply = new Route(packet.getStreamID());
reply.setID(packet.getID());
reply.setTo(packet.getFrom());
reply.setFrom(packet.getTo());
reply.setError(PacketError.Condition.not_authorized);
session.process(reply);
return;
}
// Process the packet
packetHandler.route(packet);
}
boolean processUnknowPacket(Element doc) {
String tag = doc.getName();
if ("route".equals(tag)) {
// Process stanza wrapped by the route packet
processRoute(new Route(doc));
return true;
}
else if ("handshake".equals(tag)) {
if (!((ConnectionMultiplexerSession)session).authenticate(doc.getStringValue())) {
session.getConnection().close();
}
return true;
}
else if ("error".equals(tag) && "stream".equals(doc.getNamespacePrefix())) {
session.getConnection().close();
return true;
}
return false;
}
String getNamespace() {
return "jabber:connectionmanager";
}
boolean validateHost() {
return false;
}
boolean createSession(String namespace, String serverName, XmlPullParser xpp, Connection connection)
throws XmlPullParserException {
if (getNamespace().equals(namespace)) {
// The connected client is a connection manager so create a ConnectionMultiplexerSession
session = ConnectionMultiplexerSession.createSession(serverName, xpp, connection);
if (session != null) {
packetHandler = new MultiplexerPacketHandler(session.getAddress().getDomain());
}
return true;
}
return false;
}
}
This diff is collapsed.
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.nio;
import org.apache.mina.common.ByteBuffer;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.CharsetEncoder;
/**
* Wrapper on a MINA {@link ByteBuffer} that extends the Writer class.
*
* @author Gaston Dombia
*/
public class ByteBufferWriter extends Writer {
private CharsetEncoder encoder;
private ByteBuffer byteBuffer;
public ByteBufferWriter(ByteBuffer byteBuffer, CharsetEncoder encoder) {
this.encoder = encoder;
this.byteBuffer = byteBuffer;
}
public void write(char cbuf[], int off, int len) throws IOException {
byteBuffer.putString(new String(cbuf, off, len), encoder);
}
public void flush() throws IOException {
// Ignore
}
public void close() throws IOException {
// Ignore
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.nio;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.Connection;
import org.jivesoftware.wildfire.net.StanzaHandler;
import java.io.IOException;
/**
* A ConnectionHandler is responsible for creating new sessions, destroying sessions and delivering
* received XML stanzas to the proper StanzaHandler.
*
* @author Gaston Dombiak
*/
public abstract class ConnectionHandler extends IoHandlerAdapter {
/**
* The utf-8 charset for decoding and encoding Jabber packet streams.
*/
static final String CHARSET = "UTF-8";
static final String XML_PARSER = "XML-PARSER";
private static final String HANDLER = "HANDLER";
private static final String CONNECTION = "CONNECTION";
protected String serverName;
protected ConnectionHandler(String serverName) {
this.serverName = serverName;
}
public void sessionOpened(IoSession session) throws Exception {
// Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter.
XMLLightweightParser parser = new XMLLightweightParser(CHARSET);
session.setAttribute(XML_PARSER, parser);
// Create a new NIOConnection for the new session
NIOConnection connection = createNIOConnection(session);
session.setAttribute(CONNECTION, connection);
session.setAttribute(HANDLER, createStanzaHandler(connection));
// Set the max time a connection can be idle before closing it
int idleTime = getMaxIdleTime();
if (idleTime > 0) {
session.setIdleTime(IdleStatus.BOTH_IDLE, idleTime);
}
}
public void sessionClosed(IoSession session) throws Exception {
// Get the connection for this session
Connection connection = (Connection) session.getAttribute(CONNECTION);
// Inform the connection that it was closed
connection.close();
}
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
// Get the connection for this session
Connection connection = (Connection) session.getAttribute(CONNECTION);
// Close idle connection
if (Log.isDebugEnabled()) {
Log.debug("Closing connection that has been idle: " + connection);
}
connection.close();
}
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
if (cause instanceof IOException) {
// TODO Verify if there were packets pending to be sent and decide what to do with them
Log.debug(cause);
}
else {
Log.error(cause);
}
}
public void messageReceived(IoSession session, Object message) throws Exception {
//System.out.println("RCVD: " + message);
// Get the stanza handler for this session
StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
// Let the stanza handler process the received stanza
try {
handler.process( (String) message);
} catch (Exception e) {
Log.error("Closing connection due to error while processing message: " + message, e);
Connection connection = (Connection) session.getAttribute(CONNECTION);
connection.close();
}
}
abstract NIOConnection createNIOConnection(IoSession session);
abstract StanzaHandler createStanzaHandler(NIOConnection connection);
/**
* Returns the max number of seconds a connection can be idle (both ways) before
* being closed.<p>
*
* @return the max number of seconds a connection can be idle.
*/
abstract int getMaxIdleTime();
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.nio;
import org.apache.mina.common.IoSession;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.wildfire.XMPPServer;
import org.jivesoftware.wildfire.multiplex.MultiplexerPacketDeliverer;
import org.jivesoftware.wildfire.net.MultiplexerStanzaHandler;
import org.jivesoftware.wildfire.net.StanzaHandler;
/**
* ConnectionHandler that knows which subclass of {@link org.jivesoftware.wildfire.net.StanzaHandler} should
* be created and how to build and configure a {@link org.jivesoftware.wildfire.nio.NIOConnection}.
*
* @author Gaston Dombiak
*/
public class MultiplexerConnectionHandler extends ConnectionHandler {
public MultiplexerConnectionHandler(String serverName) {
super(serverName);
}
NIOConnection createNIOConnection(IoSession session) {
return new NIOConnection(session, new MultiplexerPacketDeliverer());
}
StanzaHandler createStanzaHandler(NIOConnection connection) {
return new MultiplexerStanzaHandler(XMPPServer.getInstance().getPacketRouter(), serverName, connection);
}
int getMaxIdleTime() {
return JiveGlobals.getIntProperty("xmpp.multiplex.idle", 5 * 60 * 1000) / 1000;
}
}
This diff is collapsed.
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.nio;
import org.apache.mina.common.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
/**
* This is a Light-Weight XML Parser.
* It read data from a channel and collect data until data are available in
* the channel.
* When a message is complete you can retrieve messages invoking the method
* getMsgs() and you can invoke the method areThereMsgs() to know if at least
* an message is presents.
*
* @author Daniele Piras
*/
class XMLLightweightParser {
// Chars that rappresent CDATA section start
protected static char[] CDATA_START = {'<', '!', '[', 'C', 'D', 'A', 'T', 'A', '['};
// Chars that rappresent CDATA section end
protected static char[] CDATA_END = {']', ']', '>'};
// Buffer with all data retrieved
protected StringBuilder buffer = new StringBuilder();
// ---- INTERNAL STATUS -------
// Initial status
protected static final int INIT = 0;
// Status used when the first tag name is retrieved
protected static final int HEAD = 2;
// Status used when robot is inside the xml and it looking for the tag conclusion
protected static final int INSIDE = 3;
// Status used when a '<' is found and try to find the conclusion tag.
protected static final int PRETAIL = 4;
// Status used when the ending tag is equal to the head tag
protected static final int TAIL = 5;
// Status used when robot is inside the main tag and found an '/' to check '/>'.
protected static final int VERIFY_CLOSE_TAG = 6;
// Status used when you are inside a parameter
protected static final int INSIDE_PARAM_VALUE = 7;
// Status used when you are inside a cdata section
protected static final int INSIDE_CDATA = 8;
// Current robot status
protected int status = XMLLightweightParser.INIT;
// Index to looking for a CDATA section start or end.
protected int cdataOffset = 0;
// Number of chars that machs with the head tag. If the tailCount is equal to
// the head length so a close tag is found.
protected int tailCount = 0;
// Indicate the starting point in the buffer for the next message.
protected int startLastMsg = 0;
// Flag used to discover tag in the form <tag />.
protected boolean insideRootTag = false;
// Object conteining the head tag
protected StringBuilder head = new StringBuilder(5);
// List with all finished messages found.
protected List<String> msgs = new ArrayList<String>();
protected boolean insideChildrenTag = false;
ByteBuffer byteBuffer;
Charset encoder;
public XMLLightweightParser(String charset) {
encoder = Charset.forName(charset);
}
/*
* true if the parser has found some complete xml message.
*/
public boolean areThereMsgs() {
return (msgs.size() > 0);
}
/*
* @return an array with all messages found
*/
public String[] getMsgs() {
String[] res = new String[msgs.size()];
for (int i = 0; i < res.length; i++) {
res[i] = msgs.get(i);
}
msgs.clear();
invalidateBuffer();
return res;
}
/*
* Method use to re-initialize the buffer
*/
protected void invalidateBuffer() {
if (buffer.length() > 0) {
String str = buffer.substring(startLastMsg);
buffer.delete(0, buffer.length());
buffer.append(str);
buffer.trimToSize();
}
startLastMsg = 0;
}
/*
* Method that add a message to the list and reinit parser.
*/
protected void foundMsg(String msg) {
// Add message to the complete message list
if (msg != null) {
msgs.add(msg);
}
// Move the position into the buffer
status = XMLLightweightParser.INIT;
tailCount = 0;
cdataOffset = 0;
head.setLength(0);
insideRootTag = false;
insideChildrenTag = false;
}
/*
* Main reading method
*/
public void read(ByteBuffer byteBuffer) throws Exception {
int readByte = byteBuffer.remaining();
invalidateBuffer();
CharBuffer charBuffer = encoder.decode(byteBuffer.buf());
//charBuffer.flip();
char[] buf = charBuffer.array();
buffer.append(buf);
// Robot.
char ch;
for (int i = 0; i < readByte; i++) {
//ch = rawByteBuffer[ i ];
ch = buf[i];
if (status == XMLLightweightParser.TAIL) {
// Looking for the close tag
if (ch == head.charAt(tailCount)) {
tailCount++;
if (tailCount == head.length()) {
// Close tag found!
// Calculate the correct start,end position of the message into the buffer
int end = buffer.length() - readByte + (i + 1);
String msg = buffer.substring(startLastMsg, end);
// Add message to the list
foundMsg(msg);
startLastMsg = end;
}
} else {
tailCount = 0;
status = XMLLightweightParser.INSIDE;
}
} else if (status == XMLLightweightParser.PRETAIL) {
if (ch == XMLLightweightParser.CDATA_START[cdataOffset]) {
cdataOffset++;
if (cdataOffset == XMLLightweightParser.CDATA_START.length) {
status = XMLLightweightParser.INSIDE_CDATA;
cdataOffset = 0;
continue;
}
} else {
cdataOffset = 0;
status = XMLLightweightParser.INSIDE;
}
if (ch == '/') {
status = XMLLightweightParser.TAIL;
}
} else if (status == XMLLightweightParser.VERIFY_CLOSE_TAG) {
if (ch == '>') {
// Found a tag in the form <tag />
int end = buffer.length() - readByte + (i + 1);
String msg = buffer.substring(startLastMsg, end);
// Add message to the list
foundMsg(msg);
startLastMsg = end;
} else {
status = XMLLightweightParser.INSIDE;
}
} else if (status == XMLLightweightParser.INSIDE_PARAM_VALUE) {
if (ch == '"') {
status = XMLLightweightParser.INSIDE;
continue;
}
} else if (status == XMLLightweightParser.INSIDE_CDATA) {
if (ch == XMLLightweightParser.CDATA_END[cdataOffset]) {
cdataOffset++;
if (cdataOffset == XMLLightweightParser.CDATA_END.length) {
status = XMLLightweightParser.INSIDE;
cdataOffset = 0;
continue;
}
} else {
cdataOffset = 0;
}
} else if (status == XMLLightweightParser.INSIDE) {
if (ch == XMLLightweightParser.CDATA_START[cdataOffset]) {
cdataOffset++;
if (cdataOffset == XMLLightweightParser.CDATA_START.length) {
status = XMLLightweightParser.INSIDE_CDATA;
cdataOffset = 0;
continue;
}
} else {
cdataOffset = 0;
}
if (ch == '"') {
status = XMLLightweightParser.INSIDE_PARAM_VALUE;
} else if (ch == '>') {
if (insideRootTag &&
("stream:stream>".equals(head.toString()) || ("?xml>".equals(head.toString())))) {
// Found closing stream:stream
int end = buffer.length() - readByte + (i + 1);
String msg = buffer.substring(startLastMsg, end);
foundMsg(msg);
startLastMsg = end;
}
insideRootTag = false;
} else if (ch == '<') {
status = XMLLightweightParser.PRETAIL;
insideChildrenTag = true;
} else if (ch == '/' && insideRootTag && !insideChildrenTag) {
status = XMLLightweightParser.VERIFY_CLOSE_TAG;
}
} else if (status == XMLLightweightParser.HEAD) {
if (ch == ' ' || ch == '>') {
// Append > to head to facility the research of </tag>
head.append(">");
status = XMLLightweightParser.INSIDE;
insideRootTag = true;
insideChildrenTag = false;
continue;
}
head.append(ch);
} else if (status == XMLLightweightParser.INIT) {
if (ch != ' ' && ch != '\r' && ch != '\n' && ch != '<') {
invalidateBuffer();
return;
}
if (ch == '<') {
status = XMLLightweightParser.HEAD;
}
}
}
if (head.length() > 0 && "/stream:stream>".equals(head.toString())) {
// Found closing stream:stream
foundMsg("</stream:stream>");
}
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.nio;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
/**
* Factory that specifies the encode and decoder to use for parsing XMPP stanzas.
*
* @author Gaston Dombiak
*/
public class XMPPCodecFactory implements ProtocolCodecFactory {
private final XMPPEncoder encoder;
private final XMPPDecoder decoder;
public XMPPCodecFactory() {
encoder = new XMPPEncoder();
decoder = new XMPPDecoder();
}
public ProtocolEncoder getEncoder() throws Exception {
return encoder;
}
public ProtocolDecoder getDecoder() throws Exception {
return decoder;
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.nio;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
/**
* Decoder class that parses ByteBuffers and generates XML stanzas. Generated
* stanzas are then passed to the next filters.
*
* @author Gaston Dombiak
*/
public class XMPPDecoder extends CumulativeProtocolDecoder {
protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
throws Exception {
if (in.remaining() < 4) {
return false;
}
// Get the XML light parser from the IoSession
XMLLightweightParser parser =
(XMLLightweightParser) session.getAttribute(ConnectionHandler.XML_PARSER);
// Parse as many stanzas as possible from the received data
parser.read(in);
if (parser.areThereMsgs()) {
for (String stanza : parser.getMsgs()) {
out.write(stanza);
}
}
return true;
}
}
/**
* $Revision: $
* $Date: $
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.nio;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
/**
* Encoder that does nothing. We are already writing ByteBuffers so there is no need
* to encode them.<p>
*
* This class exists as a counterpart of {@link XMPPDecoder}. Unlike that class this class does nothing.
*
* @author Gaston Dombiak
*/
public class XMPPEncoder extends ProtocolEncoderAdapter {
public void encode(IoSession session, Object message, ProtocolEncoderOutput out)
throws Exception {
// Ignore. Do nothing. Content being sent is already a bytebuffer (of strings)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment