Commit 1ff31a5a authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Initial version. JM-687

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@3903 b35dd754-fafc-0310-a699-88a17e54d16e
parent 036e029d
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.net;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.ConnectionManager;
import org.jivesoftware.wildfire.ServerPort;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
/**
* Accepts new socket connections and uses a thread for each new connection.
*
* @author Gaston Dombiak
*/
class BlockingAcceptingMode extends SocketAcceptingMode {
protected BlockingAcceptingMode(ConnectionManager connManager, ServerPort serverPort,
InetAddress bindInterface) throws IOException {
super(connManager, serverPort);
serverSocket = new ServerSocket(serverPort.getPort(), -1, bindInterface);
}
/**
* About as simple as it gets. The thread spins around an accept
* call getting sockets and creating new reading threads for each new connection.
*/
public void run() {
while (notTerminated) {
try {
Socket sock = serverSocket.accept();
if (sock != null) {
Log.debug("Connect " + sock.toString());
SocketReader reader =
connManager.createSocketReader(sock, false, serverPort, true);
Thread thread = new Thread(reader, reader.getName());
thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY);
thread.start();
}
}
catch (IOException ie) {
if (notTerminated) {
Log.error(LocaleUtils.getLocalizedString("admin.error.accept"),
ie);
}
}
catch (Throwable e) {
Log.error(LocaleUtils.getLocalizedString("admin.error.accept"), e);
}
}
}
}
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.net;
import com.jcraft.jzlib.JZlib;
import com.jcraft.jzlib.ZInputStream;
import org.dom4j.Element;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.AsynchronousCloseException;
/**
* Process incoming packets using a blocking model. Once a session has been created
* an endless loop is used to process incoming packets. Packets are processed
* sequentially.
*
* @author Gaston Dombiak
*/
class BlockingReadingMode extends SocketReadingMode {
public BlockingReadingMode(Socket socket, SocketReader socketReader) {
super(socket, socketReader);
}
/**
* A dedicated thread loop for reading the stream and sending incoming
* packets to the appropriate router.
*/
public void run() {
try {
socketReader.reader.getXPPParser().setInput(new InputStreamReader(socket.getInputStream(),
CHARSET));
// Read in the opening tag and prepare for packet stream
try {
socketReader.createSession();
}
catch (IOException e) {
Log.debug("Error creating session", e);
throw e;
}
// Read the packet stream until it ends
if (socketReader.session != null) {
readStream();
}
}
catch (EOFException eof) {
// Normal disconnect
}
catch (SocketException se) {
// The socket was closed. The server may close the connection for several
// reasons (e.g. user requested to remove his account). Do nothing here.
}
catch (AsynchronousCloseException ace) {
// The socket was closed.
}
catch (XmlPullParserException ie) {
// It is normal for clients to abruptly cut a connection
// rather than closing the stream document. Since this is
// normal behavior, we won't log it as an error.
// Log.error(LocaleUtils.getLocalizedString("admin.disconnect"),ie);
}
catch (Exception e) {
if (socketReader.session != null) {
Log.warn(LocaleUtils.getLocalizedString("admin.error.stream") + ". Session: " +
socketReader.session, e);
}
}
finally {
if (socketReader.session != null) {
if (Log.isDebugEnabled()) {
Log.debug("Logging off " + socketReader.session.getAddress() + " on " + socketReader.connection);
}
try {
socketReader.session.getConnection().close();
}
catch (Exception e) {
Log.warn(LocaleUtils.getLocalizedString("admin.error.connection")
+ "\n" + socket.toString());
}
}
else {
// Close and release the created connection
socketReader.connection.close();
Log.error(LocaleUtils.getLocalizedString("admin.error.connection")
+ "\n" + socket.toString());
}
socketReader.shutdown();
}
}
/**
* Read the incoming stream until it ends.
*/
private void readStream() throws Exception {
socketReader.open = true;
while (socketReader.open) {
Element doc = socketReader.reader.parseDocument().getRootElement();
if (doc == null) {
// Stop reading the stream since the client has sent an end of
// stream element and probably closed the connection.
return;
}
String tag = doc.getName();
if ("starttls".equals(tag)) {
// Negotiate TLS
if (negotiateTLS()) {
tlsNegotiated();
}
else {
socketReader.open = false;
socketReader.session = null;
}
}
else if ("auth".equals(tag)) {
// User is trying to authenticate using SASL
if (authenticateClient(doc)) {
// SASL authentication was successful so open a new stream and offer
// resource binding and session establishment (to client sessions only)
saslSuccessful();
}
else if (socketReader.connection.isClosed()) {
socketReader.open = false;
socketReader.session = null;
}
}
else if ("compress".equals(tag))
{
// Client is trying to initiate compression
if (compressClient(doc)) {
// Compression was successful so open a new stream and offer
// resource binding and session establishment (to client sessions only)
compressionSuccessful();
}
}
else {
socketReader.process(doc);
}
}
}
protected void tlsNegotiated() throws XmlPullParserException, IOException {
XmlPullParser xpp = socketReader.reader.getXPPParser();
// Reset the parser to use the new reader
xpp.setInput(new InputStreamReader(
socketReader.connection.getTLSStreamHandler().getInputStream(), CHARSET));
// Skip new stream element
for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
eventType = xpp.next();
}
super.tlsNegotiated();
}
protected void saslSuccessful() throws XmlPullParserException, IOException {
MXParser xpp = socketReader.reader.getXPPParser();
// Reset the parser since a new stream header has been sent from the client
xpp.resetInput();
// Skip the opening stream sent by the client
for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
eventType = xpp.next();
}
super.saslSuccessful();
}
protected boolean compressClient(Element doc) throws XmlPullParserException, IOException {
boolean answer = super.compressClient(doc);
if (answer) {
XmlPullParser xpp = socketReader.reader.getXPPParser();
// Reset the parser since a new stream header has been sent from the client
if (socketReader.connection.getTLSStreamHandler() == null) {
ZInputStream in = new ZInputStream(socket.getInputStream());
in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
xpp.setInput(new InputStreamReader(in, CHARSET));
}
else {
ZInputStream in = new ZInputStream(
socketReader.connection.getTLSStreamHandler().getInputStream());
in.setFlushMode(JZlib.Z_PARTIAL_FLUSH);
xpp.setInput(new InputStreamReader(in, CHARSET));
}
}
return answer;
}
protected void compressionSuccessful() throws XmlPullParserException, IOException {
XmlPullParser xpp = socketReader.reader.getXPPParser();
// Skip the opening stream sent by the client
for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
eventType = xpp.next();
}
super.compressionSuccessful();
}
}
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.net;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
/**
* Class that simulate an InputStream given a un-blocking channel.
*
* @author Daniele Piras
*/
class ChannelInputStream extends InputStream
{
ByteBuffer buf = ByteBuffer.allocate(1024);
ReadableByteChannel inputChannel;
public ChannelInputStream(ReadableByteChannel ic)
{
inputChannel = ic;
}
private void doRead() throws IOException
{
final int cnt = inputChannel.read(buf);
if (cnt > 0)
{
buf.flip();
}
else
{
if (cnt == -1)
{
buf.flip();
}
}
}
public synchronized int read(byte[] bytes, int off, int len)
throws IOException
{
if (buf.position() == 0)
{
doRead();
}
else
{
buf.flip();
}
len = Math.min(len, buf.remaining());
if (len == 0)
{
return -1;
}
buf.get(bytes, off, len);
if (buf.hasRemaining())
{
// Discard read data and move unread data to the begining of the buffer.
// Leave
// the position at the end of the buffer as a way to indicate that there
// is
// unread data
buf.compact();
}
else
{
buf.clear();
}
return len;
}
@Override
public int read() throws IOException
{
byte[] tmpBuf = new byte[1];
int byteRead = read(tmpBuf, 0, 1);
if (byteRead < 1)
{
return -1;
}
else
{
return tmpBuf[0];
}
}
}
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.net;
import org.jivesoftware.util.JiveGlobals;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Thread pool to be used for processing incoming packets when using non-blocking
* connections.
*
* // TODO Change thead pool configuration. Would be nice to have something that can be
* // TODO dynamically adjusted to demand and circumstances.
*
* @author Daniele Piras
*/
class IOExecutor {
// SingleTon ...
protected static IOExecutor instance = new IOExecutor();
// Pool obj
protected ThreadPoolExecutor executeMsgPool;
// Internal queue for the pool
protected LinkedBlockingQueue<Runnable> executeQueue;
/*
* Simple constructor that initialize the main executor structure.
*
*/
protected IOExecutor() {
// Read poolsize parameter...
int poolSize = JiveGlobals.getIntProperty("tiscali.pool.size", 15);
// Create queue for executor
executeQueue = new LinkedBlockingQueue<Runnable>();
// Create executor
executeMsgPool =
new ThreadPoolExecutor(poolSize, poolSize, 60, TimeUnit.SECONDS, executeQueue);
}
public static void execute(Runnable task) {
instance.executeMsgPool.execute(task);
}
}
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.net;
import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.ConnectionManager;
import org.jivesoftware.wildfire.ServerPort;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* Accepts new socket connections using a non-blocking model. A single selector is
* used for all connected clients and also for accepting new connections.
*
* @author Daniele Piras
*/
class NonBlockingAcceptingMode extends SocketAcceptingMode {
// Time (in ms) to sleep from a reading-cycle to another
private static final long CYCLE_TIME = 10;
// Selector to collect messages from client connections.
private Selector selector;
protected NonBlockingAcceptingMode(ConnectionManager connManager, ServerPort serverPort,
InetAddress bindInterface) throws IOException {
super(connManager, serverPort);
// Chaning server to use NIO
// Open selector...
selector = Selector.open();
// Create a new ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// Retrieve socket and bind socket with specified address
this.serverSocket = serverSocketChannel.socket();
this.serverSocket.bind(new InetSocketAddress(bindInterface, serverPort.getPort()));
// Configure Blocking to unblocking
serverSocketChannel.configureBlocking(false);
// Registering connection with selector.
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
AcceptConnection acceptConnection = new AcceptConnection();
sk.attach(acceptConnection);
}
/**
* DANIELE:
* This thread use the selector NIO features to retrieve client connections
* and messages.
*/
public void run() {
while (notTerminated && !Thread.interrupted()) {
try {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
it.remove();
SelectorAction action = (SelectorAction) key.attachment();
if (action == null) {
continue;
}
if (key.isAcceptable()) {
action.connect(key);
}
else if (key.isReadable()) {
action.read(key);
}
}
Thread.sleep(CYCLE_TIME);
}
catch (IOException ie) {
if (notTerminated) {
Log.error(LocaleUtils.getLocalizedString("admin.error.accept"),
ie);
}
}
catch (Exception e) {
Log.error(LocaleUtils.getLocalizedString("admin.error.accept"), e);
}
}
}
/*
* InnerClass that is use when a new client arrive.
* It's use the reactor pattern to register an abstract action
* to the selector.
*/
class AcceptConnection implements SelectorAction {
public void read(SelectionKey key) throws IOException {
}
/*
* A client arrive...
*/
public void connect(SelectionKey key) throws IOException {
// Retrieve the server socket channel...
ServerSocketChannel sChannel = (ServerSocketChannel) key.channel();
// Accept the connection
SocketChannel socketChannel = sChannel.accept();
// Retrieve socket for incoming connection
Socket sock = socketChannel.socket();
socketChannel.configureBlocking(false);
// Registering READING operation into the selector
SelectionKey sockKey = socketChannel.register(selector, SelectionKey.OP_READ);
if (sock != null) {
System.out.println("Connect " + sock.toString());
Log.debug("Connect " + sock.toString());
try {
SocketReader reader =
connManager.createSocketReader(sock, false, serverPort, false);
SelectorAction action = new ReadAction(reader);
sockKey.attach(action);
}
catch (Exception e) {
// There is an exception...
Log.error(LocaleUtils.getLocalizedString("admin.error.accept"), e);
}
}
}
}
class ReadAction implements SelectorAction {
SocketReader reader;
public ReadAction(SocketReader reader) {
this.reader = reader;
}
public void read(SelectionKey key) throws IOException {
// Socket reader (using non-blocking mode) will read the stream and process, in
// another thread, any number of stanzas found in the stream.
reader.run();
}
public void connect(SelectionKey key) throws IOException {
}
}
}
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.net;
import java.io.IOException;
import java.nio.channels.SelectionKey;
/**
* @author Daniele Piras
*/
interface SelectorAction
{
public abstract void read( SelectionKey key ) throws IOException;
public abstract void connect( SelectionKey key ) throws IOException;
}
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.net;
import org.jivesoftware.wildfire.ConnectionManager;
import org.jivesoftware.wildfire.ServerPort;
import java.io.IOException;
import java.net.ServerSocket;
/**
* Abstract class for {@link BlockingAcceptingMode} and {@link NonBlockingAcceptingMode}.
*
* @author Gaston Dombiak
*/
abstract class SocketAcceptingMode {
/**
* True while this thread should continue running.
*/
protected boolean notTerminated = true;
/**
* Holds information about the port on which the server will listen for connections.
*/
protected ServerPort serverPort;
/**
* socket that listens for connections.
*/
protected ServerSocket serverSocket;
protected ConnectionManager connManager;
protected SocketAcceptingMode(ConnectionManager connManager, ServerPort serverPort) {
this.connManager = connManager;
this.serverPort = serverPort;
}
public abstract void run();
public void shutdown() {
notTerminated = false;
try {
ServerSocket sSock = serverSocket;
serverSocket = null;
if (sSock != null) {
sSock.close();
}
}
catch (IOException e) {
// we don't care, no matter what, the socket should be dead
}
}
}
This diff is collapsed.
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.net;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
/**
* This is a Light-Weight XML Parser.
* It read data from a channel and collect data until data are available in
* the channel.
* When a message is complete you can retrieve messages invoking the method
* getMsgs() and you can invoke the method areThereMsgs() to know if at least
* an message is presents.
*
* @author Daniele Piras
*
*/
class XMLLightweightParser
{
// Chars that rappresent CDATA section start
protected static char[] CDATA_START = {'<','!','[','C','D','A','T','A','['};
// Chars that rappresent CDATA section end
protected static char[] CDATA_END = {']',']','>'};
// Buffer with all data retrieved
protected StringBuilder buffer = new StringBuilder();
// ---- INTERNAL STATUS -------
// Initial status
protected static final int INIT = 0;
// Status used when the first tag name is retrieved
protected static final int HEAD = 2;
// Status used when robot is inside the xml and it looking for the tag conclusion
protected static final int INSIDE = 3;
// Status used when a '<' is found and try to find the conclusion tag.
protected static final int PRETAIL = 4;
// Status used when the ending tag is equal to the head tag
protected static final int TAIL = 5;
// Status used when robot is inside the main tag and found an '/' to check '/>'.
protected static final int VERIFY_CLOSE_TAG = 6;
// Status used when you are inside a parameter
protected static final int INSIDE_PARAM_VALUE = 7;
// Status used when you are inside a cdata section
protected static final int INSIDE_CDATA = 8;
// Current robot status
protected int status = INIT;
// Index to looking for a CDATA section start or end.
protected int cdataOffset = 0;
// Number of chars that machs with the head tag. If the tailCount is equal to
// the head length so a close tag is found.
protected int tailCount = 0;
// Indicate the starting point in the buffer for the next message.
protected int startLastMsg = 0;
// Flag used to discover tag in the form <tag />.
protected boolean insideRootTag = false;
// Object conteining the head tag
protected StringBuilder head = new StringBuilder( 5 );
// List with all finished messages found.
protected List<String> msgs = new ArrayList<String>();
private ReadableByteChannel inputChannel;
byte[] rawByteBuffer;
ByteBuffer byteBuffer;
Charset encoder;
public ReadableByteChannel getChannel()
{
return inputChannel;
}
public XMLLightweightParser( ReadableByteChannel channel, String charset )
{
rawByteBuffer = new byte[1024];
byteBuffer = ByteBuffer.wrap( rawByteBuffer );
setInput( channel, charset );
}
public XMLLightweightParser( InputStream is , String charset)
{
rawByteBuffer = new byte[1024];
byteBuffer = ByteBuffer.wrap( rawByteBuffer );
setInput( is, charset );
}
public void setInput( InputStream is, String charset )
{
inputChannel = Channels.newChannel( is );
encoder = Charset.forName( charset );
invalidateBuffer();
}
public void setInput( ReadableByteChannel channel, String charset )
{
inputChannel = channel;
encoder = Charset.forName( charset );
invalidateBuffer();
}
/*
* true if the parser has found some complete xml message.
*/
public boolean areThereMsgs()
{
return ( msgs.size() > 0 );
}
/*
* @return an array with all messages found
*/
public String[] getMsgs()
{
String[] res = new String[ msgs.size() ];
for ( int i = 0; i < res.length; i++ )
{
res[ i ] = msgs.get( i );
}
msgs.clear();
invalidateBuffer();
return res;
}
/*
* Method use to re-initialize the buffer
*/
protected void invalidateBuffer()
{
if ( buffer.length() > 0 )
{
String str = buffer.substring( startLastMsg ).toString().trim();
buffer.delete( 0, buffer.length() );
buffer.append( str );
buffer.trimToSize();
}
startLastMsg = 0;
}
/*
* Method that add a message to the list and reinit parser.
*/
protected void foundMsg( String msg )
{
// Add message to the complete message list
if ( msg != null )
{
msgs.add( msg.trim() );
}
// Move the position into the buffer
status = INIT;
tailCount = 0;
cdataOffset = 0;
head.setLength( 0 );
insideRootTag = false;
}
/*
* Main reading method
*/
public void read() throws Exception
{
// Reset buffer
byteBuffer.limit( rawByteBuffer.length );
byteBuffer.rewind();
int readByte = inputChannel.read( byteBuffer );
if ( readByte == -1 )
{
// ERROR ON SOCKET!!
throw new IOException( "ReadByte == -1.Socket Close" );
}
else if ( readByte <= 0 )
{
return;
}
else if ( readByte == 1 && rawByteBuffer[ 0 ] == ' ' )
{
// Heart bit! Ignore it.
return;
}
byteBuffer.flip();
byte[] bhs = byteBuffer.array();
byteBuffer.rewind();
CharBuffer charBuffer = encoder.decode( byteBuffer );
charBuffer.flip();
char[] buf = charBuffer.array();
buffer.append( buf );
// Robot.
char ch;
for ( int i = 0; i < readByte; i++ )
{
//ch = rawByteBuffer[ i ];
ch = buf[ i ];
if ( status == TAIL )
{
// Looking for the close tag
if ( ch == head.charAt( tailCount ) )
{
tailCount++;
if ( tailCount == head.length() )
{
// Close tag found!
// Calculate the correct start,end position of the message into the buffer
int end = buffer.length() - readByte + ( i + 1 );
String msg = buffer.substring( startLastMsg, end );
// Add message to the list
foundMsg( msg );
startLastMsg = end;
}
}
else
{
tailCount = 0;
status = INSIDE;
}
}
else if ( status == PRETAIL )
{
if ( ch == CDATA_START[ cdataOffset ] )
{
cdataOffset++;
if ( cdataOffset == CDATA_START.length )
{
status = INSIDE_CDATA;
cdataOffset = 0;
continue;
}
}
else
{
cdataOffset = 0;
status = INSIDE;
}
if ( ch == '/' )
{
status = TAIL;
}
}
else if ( status == VERIFY_CLOSE_TAG )
{
if ( ch == '>' )
{
// Found a tag in the form <tag />
int end = buffer.length() - readByte + ( i + 1 );
String msg = buffer.substring( startLastMsg, end );
// Add message to the list
foundMsg( msg );
startLastMsg = end;
}
else
{
status = INSIDE;
}
}
else if ( status == INSIDE_PARAM_VALUE )
{
if ( ch == '"' )
{
status = INSIDE;
continue;
}
}
else if ( status == INSIDE_CDATA )
{
if ( ch == CDATA_END[ cdataOffset ] )
{
cdataOffset++;
if ( cdataOffset == CDATA_END.length )
{
status = INSIDE;
cdataOffset = 0;
continue;
}
}
else
{
cdataOffset = 0;
}
}
else if ( status == INSIDE )
{
if ( ch == CDATA_START[ cdataOffset ] )
{
cdataOffset++;
if ( cdataOffset == CDATA_START.length )
{
status = INSIDE_CDATA;
cdataOffset = 0;
continue;
}
}
else
{
cdataOffset = 0;
}
if ( ch == '"' )
{
status = INSIDE_PARAM_VALUE;
}
else if ( ch == '>' )
{
if ( insideRootTag && "stream:stream>".equals( head.toString() ) )
{
// Found closing stream:stream
int end = buffer.length() - readByte + ( i + 1 );
String msg = buffer.substring( startLastMsg, end );
foundMsg( msg );
startLastMsg = end;
}
insideRootTag = false;
}
else if ( ch == '<' )
{
status = PRETAIL;
}
else if ( ch == '/' && insideRootTag )
{
status = VERIFY_CLOSE_TAG;
}
}
else if ( status == HEAD )
{
if ( ch == ' ' || ch == '>' )
{
// Append > to head to facility the research of </tag>
head.append( ">" );
status = INSIDE;
insideRootTag = true;
continue;
}
head.append( (char)ch );
}
else if ( status == INIT )
{
if ( ch != ' ' && ch != '\r' && ch != '\n' && ch != '<' )
{
invalidateBuffer();
return;
}
if ( ch == '<' )
{
status = HEAD;
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment