Commit c6dd1ae4 authored by daryl herzmann's avatar daryl herzmann

Merge pull request #195 from guusdk/OF-885

OF-885: Use non-blocking, async API for BOSH servlet
parents 8bc8c539 5a0a2e6b
......@@ -6,6 +6,7 @@ work/
*.ipr
.idea
atlassian-ide-plugin.xml
out/
# Ignore Eclipse project files
.settings
......
......@@ -19,28 +19,22 @@
package org.jivesoftware.openfire.http;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.*;
import java.net.InetAddress;
import java.net.URLDecoder;
import java.security.cert.X509Certificate;
import java.util.Date;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.*;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang.StringEscapeUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.QName;
import org.dom4j.io.XMPPPacketReader;
import org.eclipse.jetty.continuation.ContinuationSupport;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.net.MXParser;
import org.jivesoftware.util.JiveGlobals;
......@@ -100,7 +94,23 @@ public class HttpBindServlet extends HttpServlet {
protected void service(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
// add CORS headers for all HTTP responses (errors, etc.)
setCORSHeaders(request, response);
if (boshManager.isCORSEnabled())
{
if (boshManager.isAllOriginsAllowed()) {
// Set the Access-Control-Allow-Origin header to * to allow all Origin to do the CORS
response.setHeader("Access-Control-Allow-Origin", HttpBindManager.HTTP_BIND_CORS_ALLOW_ORIGIN_DEFAULT);
} else {
// Get the Origin header from the request and check if it is in the allowed Origin Map.
// If it is allowed write it back to the Access-Control-Allow-Origin header of the respond.
final String origin = request.getHeader("Origin");
if (boshManager.isThisOriginAllowed(origin)) {
response.setHeader("Access-Control-Allow-Origin", origin);
}
}
response.setHeader("Access-Control-Allow-Methods", HttpBindManager.HTTP_BIND_CORS_ALLOW_METHODS_DEFAULT);
response.setHeader("Access-Control-Allow-Headers", HttpBindManager.HTTP_BIND_CORS_ALLOW_HEADERS_DEFAULT);
response.setHeader("Access-Control-Max-Age", HttpBindManager.HTTP_BIND_CORS_MAX_AGE_DEFAULT);
}
super.service(request, response);
}
......@@ -108,277 +118,170 @@ public class HttpBindServlet extends HttpServlet {
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException
{
final AsyncContext context = request.startAsync();
boolean isScriptSyntaxEnabled = boshManager.isScriptSyntaxEnabled();
if(!isScriptSyntaxEnabled) {
sendLegacyError(response, BoshBindingError.itemNotFound);
sendLegacyError(context, BoshBindingError.itemNotFound);
return;
}
if (isContinuation(request, response)) {
return;
}
String queryString = request.getQueryString();
if (queryString == null || "".equals(queryString)) {
sendLegacyError(response, BoshBindingError.badRequest);
sendLegacyError(context, BoshBindingError.badRequest);
return;
} else if ("isBoshAvailable".equals(queryString)) {
response.setStatus(HttpServletResponse.SC_OK);
context.complete();
return;
}
queryString = URLDecoder.decode(queryString, "UTF-8");
parseDocument(request, response, new ByteArrayInputStream(queryString.getBytes("UTF-8")));
}
private void sendLegacyError(HttpServletResponse response, BoshBindingError error)
throws IOException
{
response.sendError(error.getLegacyErrorCode());
processContent(context, queryString);
}
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
if (isContinuation(request, response)) {
return;
}
final AsyncContext context = request.startAsync();
parseDocument(request, response, request.getInputStream());
// Asynchronously reads the POSTed input, then triggers #processContent.
request.getInputStream().setReadListener(new ReadListenerImpl(context));
}
private void parseDocument(HttpServletRequest request, HttpServletResponse response,
InputStream documentContent)
protected void processContent(AsyncContext context, String content)
throws IOException {
final String remoteAddress = getRemoteAddress(context);
// Parse document from the content.
Document document;
try {
document = createDocument(documentContent);
document = getPacketReader().read(new StringReader(content), "UTF-8");
} catch (Exception ex) {
Log.warn("Error parsing request data from [" + remoteAddress + "]", ex);
sendLegacyError(context, BoshBindingError.badRequest);
return;
}
catch (Exception e) {
Log.warn("Error parsing user request. [" + request.getRemoteAddr() + "]");
sendLegacyError(response, BoshBindingError.badRequest);
if (document == null) {
Log.info("The result of parsing request data from [" + remoteAddress + "] was a null-object.");
sendLegacyError(context, BoshBindingError.badRequest);
return;
}
Element node = document.getRootElement();
final Element node = document.getRootElement();
if (node == null || !"body".equals(node.getName())) {
Log.warn("Body missing from request content. [" + request.getRemoteAddr() + "]");
sendLegacyError(response, BoshBindingError.badRequest);
Log.info("Root element 'body' is missing from parsed request data from [" + remoteAddress + "]");
sendLegacyError(context, BoshBindingError.badRequest);
return;
}
String sid = node.attributeValue("sid");
final long rid = getLongAttribute(node.attributeValue("rid"), -1);
if (rid <= 0) {
Log.info("Root element 'body' does not contain a valid RID attribute value in parsed request data from [" + remoteAddress + "]");
sendLegacyError(context, BoshBindingError.badRequest, "Body-element is missing a RID (Request ID) value, or the provided value is a non-positive integer.");
return;
}
// Process the parsed document.
final String sid = node.attributeValue("sid");
if (sid == null) {
// When there's no Session ID, this should be a request to create a new session. If there's additional content,
// something is wrong.
if (node.elements().size() > 0) {
// invalid session request; missing sid
Log.warn("Invalid client request; SID is required. [" + request.getRemoteAddr() + "]");
sendLegacyError(response, BoshBindingError.badRequest);
Log.info("Root element 'body' does not contain a SID attribute value in parsed request data from [" + remoteAddress + "]");
sendLegacyError(context, BoshBindingError.badRequest);
return;
} else {
// We have a new session
createNewSession(request, response, node);
}
// We have a new session
createNewSession(context, node);
}
else {
handleSessionRequest(sid, request, response, node);
// When there exists a Session ID, new data for an existing session is being provided.
handleSessionRequest(sid, context, node);
}
}
private boolean isContinuation(HttpServletRequest request, HttpServletResponse response)
protected void createNewSession(AsyncContext context, Element rootNode)
throws IOException
{
HttpSession session = (HttpSession) request.getAttribute("request-session");
if (session == null) {
return false;
}
synchronized (session) {
try {
respond(session, request, response, session.consumeResponse((HttpConnection) request.getAttribute("connection")),
request.getMethod());
}
catch (HttpBindException e) {
sendError(request, response, e.getBindingError(), session);
}
}
return true;
}
final long rid = getLongAttribute(rootNode.attributeValue("rid"), -1);
private void sendError(HttpServletRequest request, HttpServletResponse response,
BoshBindingError bindingError, HttpSession session)
throws IOException
{
if (JiveGlobals.getBooleanProperty("log.httpbind.enabled", false)) {
System.out.println(new Date()+": HTTP ERR("+session.getStreamID().getID() + "): " + bindingError.getErrorType().getType() + ", " + bindingError.getCondition() + ".");
}
try {
if ((session.getMajorVersion() == 1 && session.getMinorVersion() >= 6) ||
session.getMajorVersion() > 1) {
respond(session, request, response, createErrorBody(bindingError.getErrorType().getType(),
bindingError.getCondition()), request.getMethod());
}
else {
sendLegacyError(response, bindingError);
}
final X509Certificate[] certificates = (X509Certificate[]) context.getRequest().getAttribute("javax.servlet.request.X509Certificate");
final HttpConnection connection = new HttpConnection(rid, context.getRequest().isSecure(), certificates, context);
final InetAddress address = InetAddress.getByName(context.getRequest().getRemoteAddr());
connection.setSession(sessionManager.createSession(address, rootNode, connection));
if (JiveGlobals.getBooleanProperty("log.httpbind.enabled", false)) {
Log.info(new Date() + ": HTTP RECV(" + connection.getSession().getStreamID().getID() + "): " + rootNode.asXML());
}
finally {
if (bindingError.getErrorType() == BoshBindingError.Type.terminate) {
session.close();
}
catch (UnauthorizedException e) {
// Server wasn't initialized yet.
sendLegacyError(context, BoshBindingError.internalServerError, "Server has not finished initialization." );
}
catch (HttpBindException e) {
sendLegacyError(context, BoshBindingError.internalServerError, "Server has not finished initialization." );
}
private String createErrorBody(String type, String condition) {
Element body = DocumentHelper.createElement("body");
body.addNamespace("", "http://jabber.org/protocol/httpbind");
body.addAttribute("type", type);
body.addAttribute("condition", condition);
return body.asXML();
}
private void handleSessionRequest(String sid, HttpServletRequest request,
HttpServletResponse response, Element rootNode)
private void handleSessionRequest(String sid, AsyncContext context, Element rootNode)
throws IOException
{
if (JiveGlobals.getBooleanProperty("log.httpbind.enabled", false)) {
System.out.println(new Date()+": HTTP RECV(" + sid + "): " + rootNode.asXML());
}
long rid = getLongAttribue(rootNode.attributeValue("rid"), -1);
if (rid <= 0) {
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "Body missing RID (Request ID)");
return;
Log.info(new Date() + ": HTTP RECV(" + sid + "): " + rootNode.asXML());
}
HttpSession session = sessionManager.getSession(sid);
if (session == null) {
if (Log.isDebugEnabled()) {
Log.debug("Client provided invalid session: " + sid + ". [" +
request.getRemoteAddr() + "]");
context.getRequest().getRemoteAddr() + "]");
}
response.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid SID.");
sendLegacyError(context, BoshBindingError.itemNotFound, "Invalid SID value.");
return;
}
final long rid = getLongAttribute(rootNode.attributeValue("rid"), -1);
synchronized (session) {
HttpConnection connection;
try {
connection = sessionManager.forwardRequest(rid, session,
request.isSecure(), rootNode);
session.forwardRequest(rid, context.getRequest().isSecure(), rootNode, context);
}
catch (HttpBindException e) {
sendError(request, response, e.getBindingError(), session);
return;
sendError(session, context, e.getBindingError());
}
catch (HttpConnectionClosedException nc) {
Log.error("Error sending packet to client.", nc);
return;
}
String type = rootNode.attributeValue("type");
String restartStream = rootNode.attributeValue(new QName("restart", rootNode.getNamespaceForPrefix("xmpp")));
int pauseDuration = getIntAttribue(rootNode.attributeValue("pause"), -1);
if ("terminate".equals(type)) {
session.close();
respond(session, request, response, createEmptyBody(true), request.getMethod());
}
else if ("true".equals(restartStream) && rootNode.elements().size() == 0) {
try {
respond(session, request, response, createSessionRestartResponse(session), request.getMethod());
context.complete();
}
catch (DocumentException e) {
Log.error("Error sending session restart response to client.", e);
}
}
else if (pauseDuration > 0 && pauseDuration <= session.getMaxPause()) {
session.pause(pauseDuration);
respond(session, request, response, createEmptyBody(false), request.getMethod());
session.setLastResponseEmpty(true);
}
else {
session.resetInactivityTimeout();
connection.setContinuation(ContinuationSupport.getContinuation(request));
request.setAttribute("request-session", connection.getSession());
request.setAttribute("request", connection.getRequestId());
request.setAttribute("connection", connection);
try {
respond(session, request, response, session.consumeResponse(connection),
request.getMethod());
}
catch (HttpBindException e) {
sendError(request, response, e.getBindingError(), session);
}
}
}
}
private String createSessionRestartResponse(HttpSession session) throws DocumentException {
Element response = DocumentHelper.createElement("body");
response.addNamespace("", "http://jabber.org/protocol/httpbind");
response.addNamespace("stream", "http://etherx.jabber.org/streams");
Element features = response.addElement("stream:features");
for (Element feature : session.getAvailableStreamFeaturesElements()) {
features.add(feature);
}
return response.asXML();
}
private void createNewSession(HttpServletRequest request, HttpServletResponse response,
Element rootNode)
throws IOException
private XMPPPacketReader getPacketReader()
{
long rid = getLongAttribue(rootNode.attributeValue("rid"), -1);
if (rid <= 0) {
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "Body missing RID (Request ID)");
return;
}
try {
X509Certificate[] certificates =
(X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
HttpConnection connection = new HttpConnection(rid, request.isSecure(), certificates);
InetAddress address = InetAddress.getByName(request.getRemoteAddr());
connection.setSession(sessionManager.createSession(address, rootNode, connection));
if (JiveGlobals.getBooleanProperty("log.httpbind.enabled", false)) {
System.out.println(new Date()+": HTTP RECV(" + connection.getSession().getStreamID().getID() + "): " + rootNode.asXML());
}
respond(request, response, connection, request.getMethod());
}
catch (UnauthorizedException e) {
// Server wasn't initialized yet.
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Server Not initialized");
}
catch (HttpBindException e) {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
// Reader is associated with a new XMPPPacketReader
XMPPPacketReader reader = localReader.get();
if (reader == null) {
reader = new XMPPPacketReader();
reader.setXPPFactory(factory);
localReader.set(reader);
}
return reader;
}
// add request argument
private void respond(HttpServletRequest request, HttpServletResponse response, HttpConnection connection, String method)
throws IOException
public static void respond(HttpSession session, AsyncContext context, String content, boolean async) throws IOException
{
String content;
try {
content = connection.getResponse();
}
catch (HttpBindTimeoutException e) {
content = createEmptyBody(false);
connection.getSession().setLastResponseEmpty(true);
}
final HttpServletResponse response = ((HttpServletResponse) context.getResponse());
final HttpServletRequest request = ((HttpServletRequest) context.getRequest());
respond(connection.getSession(), request, response, content, method);
}
// add request argument
private void respond(HttpSession session, HttpServletRequest request, HttpServletResponse response, String content, String method)
throws IOException {
response.setStatus(HttpServletResponse.SC_OK);
response.setContentType("GET".equals(method) ? "text/javascript" : "text/xml");
response.setContentType("GET".equals(request.getMethod()) ? "text/javascript" : "text/xml");
response.setCharacterEncoding("UTF-8");
if ("GET".equals(method)) {
if ("GET".equals(request.getMethod())) {
if (JiveGlobals.getBooleanProperty("xmpp.httpbind.client.no-cache.enabled", true)) {
// Prevent caching of responses
response.addHeader("Cache-Control", "no-store");
......@@ -389,42 +292,76 @@ public class HttpBindServlet extends HttpServlet {
}
if (JiveGlobals.getBooleanProperty("log.httpbind.enabled", false)) {
System.out.println(new Date()+": HTTP SENT(" + session.getStreamID().getID() + "): " + content);
System.out.println(new Date() + ": HTTP SENT(" + session.getStreamID().getID() + "): " + content);
}
final byte[] byteContent = content.getBytes("UTF-8");
if (async) {
response.getOutputStream().setWriteListener(new WriteListenerImpl(context, byteContent));
} else {
context.getResponse().getOutputStream().write(byteContent);
context.getResponse().getOutputStream().flush();
context.complete();
}
byte[] byteContent = content.getBytes("UTF-8");
response.setContentLength(byteContent.length);
response.getOutputStream().write(byteContent);
response.getOutputStream().close();
}
private void setCORSHeaders(HttpServletRequest request, HttpServletResponse response) {
// set CORS headers
if (boshManager.isCORSEnabled()) {
if (boshManager.isAllOriginsAllowed())
// set the Access-Control-Allow-Origin header to * to allow all Origin to do the CORS
response.setHeader("Access-Control-Allow-Origin", HttpBindManager.HTTP_BIND_CORS_ALLOW_ORIGIN_DEFAULT);
else {
// get the Origin header from the request and check if it is in the allowed Origin Map.
// if it is allowed write it back to the Access-Control-Allow-Origin header of the respond.
String origin = request.getHeader("Origin");
if (boshManager.isThisOriginAllowed(origin)) {
response.setHeader("Access-Control-Allow-Origin", origin);
private void sendError(HttpSession session, AsyncContext context, BoshBindingError bindingError)
throws IOException
{
if (JiveGlobals.getBooleanProperty("log.httpbind.enabled", false)) {
System.out.println(new Date() + ": HTTP ERR(" + session.getStreamID().getID() + "): " + bindingError.getErrorType().getType() + ", " + bindingError.getCondition() + ".");
}
try {
if ((session.getMajorVersion() == 1 && session.getMinorVersion() >= 6) || session.getMajorVersion() > 1)
{
final String errorBody = createErrorBody(bindingError.getErrorType().getType(), bindingError.getCondition());
respond(session, context, errorBody, true);
} else {
sendLegacyError(context, bindingError);
}
}
response.setHeader("Access-Control-Allow-Methods", HttpBindManager.HTTP_BIND_CORS_ALLOW_METHODS_DEFAULT);
response.setHeader("Access-Control-Allow-Headers", HttpBindManager.HTTP_BIND_CORS_ALLOW_HEADERS_DEFAULT);
response.setHeader("Access-Control-Max-Age", HttpBindManager.HTTP_BIND_CORS_MAX_AGE_DEFAULT);
finally {
if (bindingError.getErrorType() == BoshBindingError.Type.terminate) {
session.close();
}
}
}
protected static void sendLegacyError(AsyncContext context, BoshBindingError error, String message)
throws IOException
{
final HttpServletResponse response = (HttpServletResponse) context.getResponse();
if (message == null || message.trim().length() == 0) {
response.sendError(error.getLegacyErrorCode());
} else {
response.sendError(error.getLegacyErrorCode(), message);
}
context.complete();
}
protected static void sendLegacyError(AsyncContext context, BoshBindingError error)
throws IOException
{
sendLegacyError(context, error, null);
}
private static String createEmptyBody(boolean terminate) {
Element body = DocumentHelper.createElement("body");
protected static String createEmptyBody(boolean terminate)
{
final Element body = DocumentHelper.createElement("body");
if (terminate) { body.addAttribute("type", "terminate"); }
body.addNamespace("", "http://jabber.org/protocol/httpbind");
return body.asXML();
}
private long getLongAttribue(String value, long defaultValue) {
protected static String createErrorBody(String type, String condition) {
final Element body = DocumentHelper.createElement("body");
body.addNamespace("", "http://jabber.org/protocol/httpbind");
body.addAttribute("type", type);
body.addAttribute("condition", condition);
return body.asXML();
}
protected static long getLongAttribute(String value, long defaultValue) {
if (value == null || "".equals(value)) {
return defaultValue;
}
......@@ -436,7 +373,7 @@ public class HttpBindServlet extends HttpServlet {
}
}
private int getIntAttribue(String value, int defaultValue) {
protected static int getIntAttribute(String value, int defaultValue) {
if (value == null || "".equals(value)) {
return defaultValue;
}
......@@ -448,20 +385,84 @@ public class HttpBindServlet extends HttpServlet {
}
}
private XMPPPacketReader getPacketReader() {
// Reader is associated with a new XMPPPacketReader
XMPPPacketReader reader = localReader.get();
if (reader == null) {
reader = new XMPPPacketReader();
reader.setXPPFactory(factory);
localReader.set(reader);
protected static String getRemoteAddress(AsyncContext context)
{
String remoteAddress = null;
if (context.getRequest() != null && context.getRequest().getRemoteAddr() != null) {
remoteAddress = context.getRequest().getRemoteAddr();
}
return reader;
if (remoteAddress == null || remoteAddress.trim().length() == 0) {
remoteAddress = "<UNKNOWN ADDRESS>";
}
private Document createDocument(InputStream request) throws
DocumentException, IOException, XmlPullParserException
{
return getPacketReader().read("UTF-8", request);
return remoteAddress;
}
class ReadListenerImpl implements ReadListener {
private final AsyncContext context;
private final StringBuilder buffer = new StringBuilder(512);
private final String remoteAddress;
ReadListenerImpl(AsyncContext context) {
this.context = context;
this.remoteAddress = getRemoteAddress(context);
}
@Override
public void onDataAvailable() throws IOException {
Log.trace("Data is available to be read from [" + remoteAddress + "]");
final ServletInputStream inputStream = context.getRequest().getInputStream();
byte b[] = new byte[1024];
int length;
while (inputStream.isReady() && (length = inputStream.read(b)) != -1) {
buffer.append(new String(b, 0, length));
}
}
@Override
public void onAllDataRead() throws IOException {
Log.trace("All data has been read from [" + remoteAddress + "]");
processContent(context, buffer.toString());
}
@Override
public void onError(Throwable throwable) {
Log.warn("Error reading request data from [" + remoteAddress + "]", throwable);
try {
sendLegacyError(context, BoshBindingError.badRequest);
} catch (IOException ex) {
Log.debug("Error while sending an error to ["+remoteAddress +"] in response to an earlier data-read failure.", ex);
}
}
}
static class WriteListenerImpl implements WriteListener {
private final AsyncContext context;
private final byte[] data;
private final String remoteAddress;
public WriteListenerImpl(AsyncContext context, byte[] data) {
this.context = context;
this.data = data;
this.remoteAddress = getRemoteAddress(context);
}
@Override
public void onWritePossible() throws IOException {
Log.trace("Data can be written to [" + remoteAddress + "]");
context.getResponse().getOutputStream().write(data);
context.complete();
}
@Override
public void onError(Throwable throwable) {
Log.warn("Error writing response data to [" + remoteAddress + "]", throwable);
context.complete();
}
}
}
/**
* $RCSfile$
* $Revision: $
* $Date: $
*
* Copyright (C) 2005-2008 Jive Software. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jivesoftware.openfire.http;
/**
* An exception which indicates that the maximum waiting time for a client response has been
* surpassed and an empty response should be returned to the requesting client.
*
* @author Alexander Wenckus
*/
class HttpBindTimeoutException extends Exception {
public HttpBindTimeoutException(String message) {
super(message);
}
public HttpBindTimeoutException() {
super();
}
}
......@@ -20,17 +20,16 @@
package org.jivesoftware.openfire.http;
import org.jivesoftware.util.JiveConstants;
import org.eclipse.jetty.continuation.Continuation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.AsyncContext;
import java.io.IOException;
import java.security.cert.X509Certificate;
/**
* Represents one HTTP connection with a client using the HTTP Binding service. The client will wait
* on {@link #getResponse()} until the server forwards a message to it or the wait time on the
* on a response until the server forwards a message to it or the wait time on the
* session timeout.
*
* @author Alexander Wenckus
......@@ -38,18 +37,16 @@ import java.security.cert.X509Certificate;
public class HttpConnection {
private static final Logger Log = LoggerFactory.getLogger(HttpConnection.class);
private static final String RESPONSE_BODY = "response-body";
private static final String CONNECTION_CLOSED = "connection closed";
private final long requestId;
private final X509Certificate[] sslCertificates;
private final boolean isSecure;
private String body;
private HttpSession session;
private Continuation continuation;
private boolean isClosed;
private final AsyncContext context;
/**
* Constructs an HTTP Connection.
*
......@@ -57,25 +54,30 @@ public class HttpConnection {
* @param isSecure true if this connection is using HTTPS
* @param sslCertificates list of certificates presented by the client.
*/
public HttpConnection(long requestId, boolean isSecure, X509Certificate[] sslCertificates) {
public HttpConnection(long requestId, boolean isSecure, X509Certificate[] sslCertificates, AsyncContext context) {
this.requestId = requestId;
this.isSecure = isSecure;
this.sslCertificates = sslCertificates;
this.context = context;
}
/**
* The connection should be closed without delivering a stanza to the requestor.
*/
public void close() {
synchronized (this) {
if (isClosed) {
return;
}
}
try {
deliverBody(CONNECTION_CLOSED);
deliverBody(null, true);
}
catch (HttpConnectionClosedException e) {
Log.warn("Unexpected exception occurred while trying to close an HttpException.", e);
} catch (IOException e) {
Log.warn("Unexpected exception occurred while trying to close an HttpException.", e);
}
}
......@@ -85,7 +87,7 @@ public class HttpConnection {
*
* @return true if this connection has been closed.
*/
public boolean isClosed() {
public synchronized boolean isClosed() {
return isClosed;
}
......@@ -104,59 +106,25 @@ public class HttpConnection {
* sent an empty body.
*
* @param body the XMPP content to be forwarded to the client inside of a body tag.
* @param async when false, this method blocks until the data has been delivered to the client.
*
* @throws HttpConnectionClosedException when this connection to the client has already received
* a deliverable to forward to the client
*/
public void deliverBody(String body) throws HttpConnectionClosedException {
public void deliverBody(String body, boolean async) throws HttpConnectionClosedException, IOException {
// We only want to use this function once so we will close it when the body is delivered.
synchronized (this) {
if (isClosed) {
throw new HttpConnectionClosedException("The http connection is no longer " +
"available to deliver content");
}
else {
isClosed = true;
}
}
if (body == null) {
body = CONNECTION_CLOSED;
}
if (isSuspended()) {
continuation.setAttribute(RESPONSE_BODY, body);
continuation.resume();
session.incrementServerPacketCount();
}
else {
this.body = body;
}
}
/**
* A call that will suspend the request if there is no deliverable currently available.
* Once the response becomes available, it is returned.
*
* @return the deliverable to send to the client
* @throws HttpBindTimeoutException to indicate that the maximum wait time requested by the
* client has been surpassed and an empty response should be returned.
*/
public String getResponse() throws HttpBindTimeoutException {
if (body == null && continuation != null) {
try {
body = waitForResponse();
}
catch (HttpBindTimeoutException e) {
this.isClosed = true;
throw e;
}
}
else if (body == null) {
throw new IllegalStateException("Continuation not set, cannot wait for deliverable.");
}
else if(CONNECTION_CLOSED.equals(body)) {
return null;
if (body == null) {
body = HttpBindServlet.createEmptyBody(false);
}
return body;
HttpBindServlet.respond(this.getSession(), this.context, body, async);
}
/**
......@@ -195,41 +163,6 @@ public class HttpConnection {
return sslCertificates;
}
void setContinuation(Continuation continuation) {
this.continuation = continuation;
}
public boolean isSuspended() {
return continuation != null && continuation.isSuspended();
}
public boolean isExpired() {
return continuation != null && continuation.isExpired();
}
private String waitForResponse() throws HttpBindTimeoutException {
// we enter this method when we have no messages pending delivery
// when we resume a suspended continuation, or when we time out
if (continuation.isInitial()) {
continuation.setTimeout(session.getWait() * JiveConstants.SECOND);
continuation.suspend();
continuation.undispatch();
} else if (continuation.isResumed()) {
// This will occur when the hold attribute of a session has been exceeded.
String deliverable = (String) continuation.getAttribute(RESPONSE_BODY);
if (deliverable == null) {
throw new HttpBindTimeoutException();
}
else if(CONNECTION_CLOSED.equals(deliverable)) {
return null;
}
return deliverable;
}
throw new HttpBindTimeoutException("Request " + requestId + " exceeded response time from " +
"server of " + session.getWait() + " seconds.");
}
@Override
public String toString() {
return (session != null ? session.toString() : "[Anonymous]")
......
......@@ -19,6 +19,7 @@
package org.jivesoftware.openfire.http;
import java.io.IOException;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
......@@ -36,16 +37,9 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.Namespace;
import org.dom4j.QName;
import org.dom4j.*;
import org.dom4j.io.XMPPPacketReader;
import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.PacketDeliverer;
import org.jivesoftware.openfire.SessionPacketRouter;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.multiplex.UnknownStanzaException;
import org.jivesoftware.openfire.net.MXParser;
......@@ -64,6 +58,10 @@ import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
import org.xmpp.packet.Presence;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
/**
* A session represents a series of interactions with an XMPP client sending packets using the HTTP
* Binding protocol specified in <a href="http://www.xmpp.org/extensions/xep-0124.html">XEP-0124</a>.
......@@ -458,7 +456,7 @@ public class HttpSession extends LocalClientSession {
synchronized (connectionQueue) {
for (HttpConnection connection : connectionQueue) {
// The session is currently active, set the last activity to the current time.
if (!(connection.isClosed() || connection.isExpired())) {
if (!(connection.isClosed())) {
lastActivity = System.currentTimeMillis();
break;
}
......@@ -567,53 +565,69 @@ public class HttpSession extends LocalClientSession {
}
/**
* Returns the response for a specific connection instance. It is possible for there to be multiple
* connections in the queue for the same rid so we need to be careful that we are accessing the correct
* connection.
* <p><b>Note that this method also removes the connection from the internal connection queue.</b>
* Sets whether the initial request on the session was secure.
*
* @param connection the connection for which to get the response.
* @return the response from the connection
* @throws HttpBindException
* @param isSecure true if the initial request was secure and false if it wasn't.
*/
protected String consumeResponse(HttpConnection connection) throws HttpBindException {
Log.debug("consumeResponse: " + connection);
if(connectionQueue.contains(connection)) {
String response = getResponse(connection);
connectionQueue.remove(connection);
fireConnectionClosed(connection);
return response;
}
throw new InternalError("Could not locate connection: " + connection);
protected void setSecure(boolean isSecure) {
this.isSecure = isSecure;
}
private String getResponse(HttpConnection connection) throws HttpBindException {
String response = null;
try {
response = connection.getResponse();
}
catch (HttpBindTimeoutException e) {
// This connection timed out we need to increment the request count
if (connection.getRequestId() != lastRequestID + 1) {
throw new HttpBindException("Unexpected RID error.",
BoshBindingError.itemNotFound);
/**
* Forwards a client request, which is related to a session, to the server. A connection is
* created and queued up in the provided session. When a connection reaches the top of a queue
* any pending packets bound for the client will be forwarded to the client through the
* connection.
*
* @param rid the unique, sequential, requestID sent from the client.
* @param isSecure true if the request was made over a secure channel, HTTPS, and false if it
* was not.
* @param rootNode the XML body of the request.
* @param context the context of the asynchronous servlet call leading up to this method call.
*
* @throws org.jivesoftware.openfire.http.HttpBindException for several reasons: if the encoding inside of an auth packet is
* not recognized by the server, or if the packet type is not recognized.
* @throws org.jivesoftware.openfire.http.HttpConnectionClosedException if the session is no longer available.
*/
public void forwardRequest(long rid, boolean isSecure, Element rootNode, AsyncContext context)
throws HttpBindException, HttpConnectionClosedException, IOException
{
List<Element> elements = rootNode.elements();
boolean isPoll = (elements.size() == 0);
if ("terminate".equals(rootNode.attributeValue("type")))
isPoll = false;
else if ("true".equals(rootNode.attributeValue(new QName("restart", rootNode.getNamespaceForPrefix("xmpp")))))
isPoll = false;
else if (rootNode.attributeValue("pause") != null)
isPoll = false;
HttpConnection connection = this.createConnection(rid, elements, isSecure, isPoll, context);
if (elements.size() > 0) {
// creates the runnable to forward the packets
new HttpPacketSender(this).init();
}
final String type = rootNode.attributeValue("type");
String restartStream = rootNode.attributeValue(new QName("restart", rootNode.getNamespaceForPrefix("xmpp")));
int pauseDuration = HttpBindServlet.getIntAttribute(rootNode.attributeValue("pause"), -1);
if ("terminate".equals(type)) {
connection.deliverBody(createEmptyBody(true), true);
close();
lastRequestID = connection.getRequestId();
}
else if ("true".equals(restartStream) && rootNode.elements().size() == 0) {
connection.deliverBody(createSessionRestartResponse(), true);
lastRequestID = connection.getRequestId();
}
if (response == null) {
response = createEmptyBody();
else if (pauseDuration > 0 && pauseDuration <= getMaxPause()) {
pause(pauseDuration);
connection.deliverBody(createEmptyBody(false), true);
lastRequestID = connection.getRequestId();
setLastResponseEmpty(true);
}
return response;
else {
resetInactivityTimeout();
}
/**
* Sets whether the initial request on the session was secure.
*
* @param isSecure true if the initial request was secure and false if it wasn't.
*/
protected void setSecure(boolean isSecure) {
this.isSecure = isSecure;
}
/**
......@@ -677,10 +691,52 @@ public class HttpSession extends LocalClientSession {
* protocol.
*/
synchronized HttpConnection createConnection(long rid, Collection<Element> packetsToBeSent,
boolean isSecure, boolean isPoll)
throws HttpConnectionClosedException, HttpBindException
boolean isSecure, boolean isPoll, AsyncContext context)
throws HttpConnectionClosedException, HttpBindException, IOException
{
HttpConnection connection = new HttpConnection(rid, isSecure, sslCertificates);
final HttpConnection connection = new HttpConnection(rid, isSecure, sslCertificates, context);
connection.setSession(this);
context.setTimeout(getWait() * JiveConstants.SECOND);
context.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
Log.debug("complete event " + asyncEvent);
connectionQueue.remove(connection);
fireConnectionClosed(connection);
}
@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
Log.debug("timeout event " + asyncEvent);
try {
// If onTimeout does not result in a complete(), the container falls back to default behavior.
// This is why this body is to be delivered in a non-async fashion.
connection.deliverBody(createEmptyBody(), false);
setLastResponseEmpty(true);
// This connection timed out we need to increment the request count
if (connection.getRequestId() != lastRequestID + 1) {
throw new IOException("Unexpected RID error.");
}
lastRequestID = connection.getRequestId();
} catch (HttpConnectionClosedException e) {
Log.warn("Unexpected exception while processing connection timeout.", e);
} finally {
connectionQueue.remove(connection);
fireConnectionClosed(connection);
}
}
@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
Log.debug("error event " + asyncEvent);
Log.warn("Unhandled AsyncListener error: " + asyncEvent.getThrowable());
}
@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {}
});
if (rid <= lastRequestID) {
Delivered deliverable = retrieveDeliverable(rid);
if (deliverable == null) {
......@@ -688,7 +744,7 @@ public class HttpSession extends LocalClientSession {
throw new HttpBindException("Unexpected RID error.",
BoshBindingError.itemNotFound);
}
connection.deliverBody(createDeliverable(deliverable.deliverables));
connection.deliverBody(createDeliverable(deliverable.deliverables), true);
addConnection(connection, isPoll);
return connection;
}
......@@ -719,7 +775,7 @@ public class HttpSession extends LocalClientSession {
}
private void addConnection(HttpConnection connection, boolean isPoll) throws HttpBindException,
HttpConnectionClosedException {
HttpConnectionClosedException, IOException {
if (connection == null) {
throw new IllegalArgumentException("Connection cannot be null.");
}
......@@ -754,7 +810,7 @@ public class HttpSession extends LocalClientSession {
throw new HttpBindException("Unexpected RID error.",
BoshBindingError.itemNotFound);
}
connection.deliverBody(createDeliverable(deliverable.deliverables));
connection.deliverBody(createDeliverable(deliverable.deliverables), true);
} else {
if(Log.isDebugEnabled()) {
Log.debug("It's still open - calling close()");
......@@ -775,7 +831,6 @@ public class HttpSession extends LocalClientSession {
sslCertificates = connection.getPeerCertificates();
connection.setSession(this);
// We aren't supposed to hold connections open or we already have some packets waiting
// to be sent to the client.
if (isPollingSession() || (pendingElements.size() > 0 && connection.getRequestId() == lastRequestID + 1)) {
......@@ -838,8 +893,8 @@ public class HttpSession extends LocalClientSession {
}
private void deliver(HttpConnection connection, Collection<Deliverable> deliverable)
throws HttpConnectionClosedException {
connection.deliverBody(createDeliverable(deliverable));
throws HttpConnectionClosedException, IOException {
connection.deliverBody(createDeliverable(deliverable), true);
Delivered delivered = new Delivered(deliverable);
delivered.setRequestID(connection.getRequestId());
......@@ -934,7 +989,10 @@ public class HttpSession extends LocalClientSession {
}
}
catch (HttpConnectionClosedException e) {
/* Connection was closed, try the next one */
/* Connection was closed, try the next one. Indicates a (concurrency?) bug. */
Log.warn("Iterating over a connection that was closed. Openfire will recover from this problem, but it should not occur in the first place.");
} catch (IOException e) {
Log.warn("An unexpected exception occurred while iterating over connections. Openfire will attempt to recover by ignoring this connection.", e);
}
}
}
......@@ -988,11 +1046,14 @@ public class HttpSession extends LocalClientSession {
pendingElements.clear();
}
} else {
toClose.deliverBody(null);
toClose.deliverBody(null, true);
}
}
} catch (HttpConnectionClosedException e) {
/* ignore ... already closed */
} catch (IOException e) {
// Likely caused by closing a stale session / connection.
Log.debug("An unexpected exception occurred while closing a session.", e);
}
}
}
......@@ -1040,6 +1101,28 @@ public class HttpSession extends LocalClientSession {
return body.asXML();
}
protected static String createEmptyBody(boolean terminate)
{
final Element body = DocumentHelper.createElement("body");
if (terminate) { body.addAttribute("type", "terminate"); }
body.addNamespace("", "http://jabber.org/protocol/httpbind");
return body.asXML();
}
private String createSessionRestartResponse()
{
final Element response = DocumentHelper.createElement("body");
response.addNamespace("", "http://jabber.org/protocol/httpbind");
response.addNamespace("stream", "http://etherx.jabber.org/streams");
final Element features = response.addElement("stream:features");
for (Element feature : getAvailableStreamFeaturesElements()) {
features.add(feature);
}
return response.asXML();
}
/**
* A virtual server connection relates to a http session which its self can relate to many http
* connections.
......@@ -1087,10 +1170,9 @@ public class HttpSession extends LocalClientSession {
}
}
private class Deliverable implements Comparable<Deliverable> {
private class Deliverable {
private final String text;
private final Collection<String> packets;
private long requestID;
public Deliverable(String text) {
this.text = text;
......@@ -1103,22 +1185,13 @@ public class HttpSession extends LocalClientSession {
for (Packet packet : elements) {
// Rewrite packet namespace according XEP-0206
if (packet instanceof Presence) {
final StringBuilder sb = new StringBuilder();
sb.append("<presence xmlns=\"jabber:client\"");
sb.append(packet.toXML().substring(9));
this.packets.add(sb.toString());
this.packets.add("<presence xmlns=\"jabber:client\"" + packet.toXML().substring(9));
}
else if (packet instanceof IQ) {
final StringBuilder sb = new StringBuilder();
sb.append("<iq xmlns=\"jabber:client\"");
sb.append(packet.toXML().substring(3));
this.packets.add(sb.toString());
this.packets.add("<iq xmlns=\"jabber:client\"" + packet.toXML().substring(3));
}
else if (packet instanceof Message) {
final StringBuilder sb = new StringBuilder();
sb.append("<message xmlns=\"jabber:client\"");
sb.append(packet.toXML().substring(8));
this.packets.add(sb.toString());
this.packets.add("<message xmlns=\"jabber:client\"" + packet.toXML().substring(8));
}
else {
this.packets.add(packet.toXML());
......@@ -1139,14 +1212,6 @@ public class HttpSession extends LocalClientSession {
}
}
public void setRequestID(long requestID) {
this.requestID = requestID;
}
public long getRequestID() {
return requestID;
}
public Collection<Packet> getPackets() {
// Check if the Deliverable is about Packets or raw XML
if (packets == null) {
......@@ -1178,10 +1243,6 @@ public class HttpSession extends LocalClientSession {
}
return answer;
}
public int compareTo(Deliverable o) {
return (int) (o.getRequestID() - requestID);
}
}
private class Delivered {
......@@ -1212,4 +1273,24 @@ public class HttpSession extends LocalClientSession {
return packets;
}
}
/**
* A runner that guarantees that the packets per a session will be sent and
* processed in the order in which they were received.
*/
private class HttpPacketSender implements Runnable {
private HttpSession session;
HttpPacketSender(HttpSession session) {
this.session = session;
}
public void run() {
session.sendPendingPackets();
}
private void init() {
HttpBindManager.getInstance().getSessionManager().execute(this);
}
}
}
......@@ -19,8 +19,8 @@
package org.jivesoftware.openfire.http;
import java.io.IOException;
import java.net.InetAddress;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
......@@ -33,19 +33,17 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.QName;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.StreamID;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.TaskEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manages sessions for all users connecting to Openfire using the HTTP binding protocal,
* Manages sessions for all users connecting to Openfire using the HTTP binding protocol,
* <a href="http://www.xmpp.org/extensions/xep-0124.html">XEP-0124</a>.
*/
public class HttpSessionManager {
......@@ -78,7 +76,11 @@ public class HttpSessionManager {
JiveGlobals.migrateProperty("xmpp.httpbind.worker.timeout");
this.sessionManager = SessionManager.getInstance();
init();
}
public void init() {
Log.warn("HttpSessionManager.init() recreate sendPacketPool");
// Configure a pooled executor to handle async routing for incoming packets
// with a default size of 16 threads ("xmpp.httpbind.worker.threads"); also
// uses an unbounded task queue and configurable keep-alive (default: 60 secs)
......@@ -144,7 +146,7 @@ public class HttpSessionManager {
/**
* Creates an HTTP binding session which will allow a user to exchange packets with Openfire.
*
* @param address the internet address that was used to bind to Wildfie.
* @param address the internet address that was used to bind to Openfire.
* @param rootNode the body element that was sent containing the request for a new session.
* @param connection the HTTP connection object which abstracts the individual connections to
* Openfire over the HTTP binding protocol. The initial session creation response is returned to
......@@ -198,16 +200,20 @@ public class HttpSessionManager {
session.setMajorVersion(Integer.parseInt(versionString[0]));
session.setMinorVersion(Integer.parseInt(versionString[1]));
connection.setSession(session);
try {
connection.deliverBody(createSessionCreationResponse(session));
connection.deliverBody(createSessionCreationResponse(session), true);
}
catch (HttpConnectionClosedException e) {
/* This won't happen here. */
Log.error("Error creating session.", e);
throw new HttpBindException("Internal server error", BoshBindingError.internalServerError);
}
catch (DocumentException e) {
Log.error("Error creating document", e);
throw new HttpBindException("Internal server error",
BoshBindingError.internalServerError);
Log.error("Error creating session.", e);
throw new HttpBindException("Internal server error", BoshBindingError.internalServerError);
} catch (IOException e) {
Log.error("Error creating session.", e);
throw new HttpBindException("Internal server error", BoshBindingError.internalServerError);
}
return session;
}
......@@ -293,44 +299,6 @@ public class HttpSessionManager {
return JiveGlobals.getIntProperty("xmpp.httpbind.client.idle.polling", 60);
}
/**
* Forwards a client request, which is related to a session, to the server. A connection is
* created and queued up in the provided session. When a connection reaches the top of a queue
* any pending packets bound for the client will be forwarded to the client through the
* connection.
*
* @param rid the unique, sequential, requestID sent from the client.
* @param session the HTTP session of the client that made the request.
* @param isSecure true if the request was made over a secure channel, HTTPS, and false if it
* was not.
* @param rootNode the XML body of the request.
* @return the created HTTP connection.
*
* @throws HttpBindException for several reasons: if the encoding inside of an auth packet is
* not recognized by the server, or if the packet type is not recognized.
* @throws HttpConnectionClosedException if the session is no longer available.
*/
public HttpConnection forwardRequest(long rid, HttpSession session, boolean isSecure,
Element rootNode) throws HttpBindException,
HttpConnectionClosedException
{
//noinspection unchecked
List<Element> elements = rootNode.elements();
boolean isPoll = (elements.size() == 0);
if ("terminate".equals(rootNode.attributeValue("type")))
isPoll = false;
else if ("true".equals(rootNode.attributeValue(new QName("restart", rootNode.getNamespaceForPrefix("xmpp")))))
isPoll = false;
else if (rootNode.attributeValue("pause") != null)
isPoll = false;
HttpConnection connection = session.createConnection(rid, elements, isSecure, isPoll);
if (elements.size() > 0) {
// creates the runnable to forward the packets
new HttpPacketSender(session).init();
}
return connection;
}
private HttpSession createSession(long rid, InetAddress address, HttpConnection connection) throws UnauthorizedException {
// Create a ClientSession for this user.
StreamID streamID = SessionManager.getInstance().nextStreamID();
......@@ -354,19 +322,7 @@ public class HttpSessionManager {
}
}
private double getDoubleAttribute(String doubleValue, double defaultValue) {
if (doubleValue == null || "".equals(doubleValue.trim())) {
return defaultValue;
}
try {
return Double.parseDouble(doubleValue);
}
catch (Exception ex) {
return defaultValue;
}
}
private String createSessionCreationResponse(HttpSession session) throws DocumentException {
private static String createSessionCreationResponse(HttpSession session) throws DocumentException {
Element response = DocumentHelper.createElement("body");
response.addNamespace("", "http://jabber.org/protocol/httpbind");
response.addNamespace("stream", "http://etherx.jabber.org/streams");
......@@ -417,23 +373,7 @@ public class HttpSessionManager {
}
}
/**
* A runner that guarantees that the packets per a session will be sent and
* processed in the order in which they were received.
*/
private class HttpPacketSender implements Runnable {
private HttpSession session;
HttpPacketSender(HttpSession session) {
this.session = session;
}
public void run() {
session.sendPendingPackets();
}
private void init() {
sendPacketPool.execute(this);
}
protected void execute(Runnable runnable) {
this.sendPacketPool.execute(runnable);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment