Commit 3b998ea9 authored by Alex Wenckus's avatar Alex Wenckus Committed by alex

We can now compile... back-end is mostly code complete now binding needs to be handled in Jetty.

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/branches/httpbind_branch@5740 b35dd754-fafc-0310-a699-88a17e54d16e
parent ec866941
......@@ -30,6 +30,7 @@ import org.jivesoftware.wildfire.server.OutgoingSessionPromise;
import org.jivesoftware.wildfire.spi.BasicStreamIDFactory;
import org.jivesoftware.wildfire.user.UserManager;
import org.jivesoftware.wildfire.user.UserNotFoundException;
import org.jivesoftware.wildfire.http.HttpSession;
import org.xmpp.packet.JID;
import org.xmpp.packet.Message;
import org.xmpp.packet.Packet;
......@@ -541,6 +542,19 @@ public class SessionManager extends BasicModule {
return session;
}
public HttpSession createClientHttpSession(StreamID id) throws UnauthorizedException {
if (serverName == null) {
throw new UnauthorizedException("Server not initialized");
}
HttpSession session = new HttpSession(serverName, id);
Connection conn = session.getConnection();
conn.init(session);
conn.registerCloseListener(clientSessionListener, session);
preAuthenticatedSessions.put(session.getAddress().getResource(), session);
usersSessionsCounter.incrementAndGet();
return session;
}
public Session createComponentSession(Connection conn) throws UnauthorizedException {
if (serverName == null) {
throw new UnauthorizedException("Server not initialized");
......
......@@ -13,11 +13,11 @@ package org.jivesoftware.wildfire.http;
import org.jivesoftware.wildfire.ClientSession;
import org.jivesoftware.wildfire.StreamID;
import org.jivesoftware.wildfire.Connection;
import org.jivesoftware.wildfire.XMPPServer;
import org.jivesoftware.wildfire.net.VirtualConnection;
import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.dom4j.Element;
import org.dom4j.DocumentHelper;
import org.xmpp.packet.Packet;
import org.xmpp.packet.Message;
import java.util.*;
import java.net.InetAddress;
......@@ -36,7 +36,7 @@ public class HttpSession extends ClientSession {
private int hold = -1000;
private String language;
private final Queue<HttpConnection> connectionQueue = new LinkedList<HttpConnection>();
private final List<Packet> pendingElements = new ArrayList<Packet>();
private final List<Deliverable> pendingElements = new ArrayList<Deliverable>();
private boolean isSecure;
private int maxPollingInterval;
private long lastPoll = -1;
......@@ -44,7 +44,7 @@ public class HttpSession extends ClientSession {
private boolean isClosed;
private int inactivityTimeout;
protected HttpSession(String serverName, StreamID streamID) {
public HttpSession(String serverName, StreamID streamID) {
super(serverName, null, streamID);
conn = new HttpVirtualConnection();
}
......@@ -103,13 +103,41 @@ public class HttpSession extends ClientSession {
}
public String getAvailableStreamFeatures() {
return null;
StringBuilder sb = new StringBuilder(200);
// Include Stream Compression Mechanism
if (conn.getCompressionPolicy() != Connection.CompressionPolicy.disabled &&
!conn.isCompressed()) {
sb.append(
"<compression xmlns=\"http://jabber.org/features/compress\">" +
"<method>zlib</method></compression>");
}
if (getAuthToken() == null) {
// Advertise that the server supports Non-SASL Authentication
sb.append("<auth xmlns=\"http://jabber.org/features/iq-auth\"/>");
// Advertise that the server supports In-Band Registration
if (XMPPServer.getInstance().getIQRegisterHandler().isInbandRegEnabled()) {
sb.append("<register xmlns=\"http://jabber.org/features/iq-register\"/>");
}
}
else {
// If the session has been authenticated then offer resource binding
// and session establishment
sb.append("<bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\"/>");
sb.append("<session xmlns=\"urn:ietf:params:xml:ns:xmpp-session\"/>");
}
return sb.toString();
}
public InetAddress getInetAddress() {
return null;
}
public void close() {
close(false);
}
public synchronized void close(boolean isServerShuttingDown) {
if(isClosed) {
return;
......@@ -129,10 +157,12 @@ public class HttpSession extends ClientSession {
}
private void failDelivery() {
ClientFailoverDeliverer deliverer = new ClientFailoverDeliverer();
deliverer.setStreamID(getStreamID());
for(Element element : pendingElements) {
deliverer.deliver(element);
for(Deliverable deliverable : pendingElements) {
Packet packet = deliverable.packet;
if (packet != null && packet instanceof Message) {
XMPPServer.getInstance().getOfflineMessageStrategy()
.storeOffline((Message) packet);
}
}
pendingElements.clear();
}
......@@ -141,10 +171,15 @@ public class HttpSession extends ClientSession {
return isClosed;
}
private void deliver(String text) {
private synchronized void deliver(String text) {
deliver(new Deliverable(text));
}
public synchronized void deliver(Packet stanza) {
deliver(new Deliverable(stanza));
}
private void deliver(Deliverable stanza) {
String deliverable = createDeliverable(Arrays.asList(stanza));
boolean delivered = false;
while(!delivered && connectionQueue.size() > 0) {
......@@ -172,14 +207,14 @@ public class HttpSession extends ClientSession {
}
}
private String createDeliverable(Collection<Packet> elements) {
Element body = DocumentHelper.createElement("body");
body.addAttribute("xmlns", "http://jabber.org/protocol/httpbind");
for(Packet child : elements) {
child = child.createCopy();
body.add(child.getElement());
private String createDeliverable(Collection<Deliverable> elements) {
StringBuilder builder = new StringBuilder();
builder.append("<body xmlns='" + "http://jabber.org/protocol/httpbind" + "'>");
for(Deliverable child : elements) {
builder.append(child.getDeliverable());
}
return body.asXML();
builder.append("</body>");
return builder.toString();
}
/**
......@@ -294,7 +329,7 @@ public class HttpSession extends ClientSession {
* A virtual server connection relates to a virtual session which its self can relate to many
* http connections.
*/
public class HttpVirtualConnection extends VirtualConnection {
public static class HttpVirtualConnection extends VirtualConnection {
public void closeVirtualConnection() {
((HttpSession)session).close(false);
......@@ -316,4 +351,29 @@ public class HttpSession extends ClientSession {
((HttpSession)session).deliver(text);
}
}
private class Deliverable {
private final String text;
private final Packet packet;
public Deliverable(String text) {
this.text = text;
this.packet = null;
}
public Deliverable(Packet element) {
this.text = null;
this.packet = element.createCopy();
}
public String getDeliverable() {
if(text == null) {
return packet.toXML();
}
else {
return text;
}
}
}
}
......@@ -11,16 +11,15 @@
package org.jivesoftware.wildfire.http;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.multiplexer.ServerSurrogate;
import org.jivesoftware.multiplexer.ConnectionManager;
import org.jivesoftware.multiplexer.Session;
import org.jivesoftware.wildfire.SessionManager;
import org.jivesoftware.wildfire.ConnectionManager;
import org.jivesoftware.wildfire.Session;
import org.jivesoftware.wildfire.StreamID;
import org.jivesoftware.wildfire.multiplex.MultiplexerPacketRouter;
import org.jivesoftware.wildfire.multiplex.UnknownStanzaException;
import org.jivesoftware.wildfire.auth.UnauthorizedException;
import org.dom4j.Element;
import java.util.*;
import java.io.UnsupportedEncodingException;
/**
*
......@@ -77,7 +76,9 @@ public class HttpSessionManager {
return sessionMap.get(streamID);
}
public HttpSession createSession(Element rootNode, HttpConnection connection) {
public HttpSession createSession(Element rootNode, HttpConnection connection)
throws UnauthorizedException
{
// TODO Check if IP address is allowed to connect to the server
// Default language is English ("en").
......@@ -89,9 +90,6 @@ public class HttpSessionManager {
int wait = getIntAttribute(rootNode.attributeValue("wait"), 60);
int hold = getIntAttribute(rootNode.attributeValue("hold"), 1);
// Indicate the compression policy to use for this connection
connection.setCompressionPolicy(serverSurrogate.getCompressionPolicy());
HttpSession session = createSession(serverName);
session.setWait(wait);
session.setHold(hold);
......@@ -111,34 +109,30 @@ public class HttpSessionManager {
return session;
}
private HttpSession createSession(String serverName) {
private HttpSession createSession(String serverName) throws UnauthorizedException {
// Create a ClientSession for this user.
StreamID streamID = SessionManager.getInstance().nextStreamID();
HttpSession session = new HttpSession(serverName, streamID);
// Send to the server that a new client session has been created
sessionManager.createClientHttpSession(streamID);
// Register that the new session is associated with the specified stream ID
sessionMap.put(streamID.getID(), session);
// Send to the server that a new client session has been created
serverSurrogate.clientSessionCreated(streamID);
session.addSessionCloseListener(new SessionListener() {
public void connectionOpened(HttpSession session, HttpConnection connection) {
if (session instanceof HttpSession) {
timer.stop((HttpSession) session);
timer.stop(session);
}
}
public void connectionClosed(HttpSession session, HttpConnection connection) {
if(session instanceof HttpSession) {
HttpSession http = (HttpSession) session;
if(http.getConnectionCount() <= 0) {
timer.reset(http);
}
if (session.getConnectionCount() <= 0) {
timer.reset(session);
}
}
public void sessionClosed(HttpSession session) {
sessionMap.remove(session.getStreamID());
timer.stop(session);
serverSurrogate.clientSessionClosed(session.getStreamID());
}
});
return session;
......@@ -170,9 +164,7 @@ public class HttpSessionManager {
.append(" wait='").append(String.valueOf(session.getWait())).append("'")
.append(">");
builder.append("<stream:features>");
builder.append(serverSurrogate.getSASLMechanismsElement(session).asXML());
builder.append("<bind xmlns=\"urn:ietf:params:xml:ns:xmpp-bind\"/>");
builder.append("<session xmlns=\"urn:ietf:params:xml:ns:xmpp-session\"/>");
builder.append(session.getAvailableStreamFeatures());
builder.append("</stream:features>");
builder.append("</body>");
......@@ -188,9 +180,18 @@ public class HttpSessionManager {
boolean isPoll = elements.size() <= 0;
HttpConnection connection = new HttpConnection(rid, isSecure);
session.addConnection(connection, isPoll);
MultiplexerPacketRouter router = new MultiplexerPacketRouter(session);
for (Element packet : elements) {
serverSurrogate.send(packet, session.getStreamID());
try {
router.route(packet);
}
catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
catch (UnknownStanzaException e) {
e.printStackTrace();
}
}
return connection;
......@@ -219,14 +220,14 @@ public class HttpSessionManager {
}
private class InactivityTimeoutTask extends TimerTask {
private Session session;
private HttpSession session;
public InactivityTimeoutTask(Session session) {
public InactivityTimeoutTask(HttpSession session) {
this.session = session;
}
public void run() {
session.close();
session.close(false);
}
}
}
......@@ -16,11 +16,7 @@ import org.dom4j.Element;
import org.dom4j.QName;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.ClientSession;
import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.XMPPServer;
import org.jivesoftware.wildfire.interceptor.InterceptorManager;
import org.jivesoftware.wildfire.interceptor.PacketRejectedException;
import org.jivesoftware.wildfire.net.SASLAuthentication;
import org.xmpp.packet.*;
import java.io.UnsupportedEncodingException;
......@@ -40,12 +36,10 @@ import java.util.List;
public class MultiplexerPacketHandler {
private String connectionManagerDomain;
private PacketRouter router;
private final ConnectionMultiplexerManager multiplexerManager;
public MultiplexerPacketHandler(String connectionManagerDomain) {
this.connectionManagerDomain = connectionManagerDomain;
router = XMPPServer.getInstance().getPacketRouter();
multiplexerManager = ConnectionMultiplexerManager.getInstance();
}
......@@ -160,135 +154,20 @@ public class MultiplexerPacketHandler {
sendErrorPacket(route, PacketError.Condition.item_not_found, null);
return;
}
// Connection Manager wrapped a packet from a Client Session.
Element wrappedElement = route.getChildElement();
String tag = wrappedElement.getName();
try {
if ("auth".equals(tag) || "response".equals(tag)) {
SASLAuthentication.handle(session, wrappedElement);
}
else if ("iq".equals(tag)) {
processIQ(session, getIQ(wrappedElement));
}
else if ("message".equals(tag)) {
processMessage(session, new Message(wrappedElement));
}
else if ("presence".equals(tag)) {
processPresence(session, new Presence(wrappedElement));
}
else {
Element extraError = DocumentHelper.createElement(QName.get(
"unknown-stanza",
"http://jabber.org/protocol/connectionmanager#errors"));
sendErrorPacket(route, PacketError.Condition.bad_request, extraError);
}
}
catch (UnsupportedEncodingException e) {
Log.error("Error processing wrapped packet: " + wrappedElement.asXML(), e);
sendErrorPacket(route, PacketError.Condition.internal_server_error, null);
}
}
private void processIQ(ClientSession session, IQ packet) {
packet.setFrom(session.getAddress());
MultiplexerPacketRouter router = new MultiplexerPacketRouter(session);
try {
// Invoke the interceptors before we process the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
false);
router.route(packet);
// Invoke the interceptors after we have processed the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
true);
session.incrementClientPacketCount();
}
catch (PacketRejectedException e) {
// An interceptor rejected this packet so answer a not_allowed error
IQ reply = new IQ();
reply.setChildElement(packet.getChildElement().createCopy());
reply.setID(packet.getID());
reply.setTo(session.getAddress());
reply.setFrom(packet.getTo());
reply.setError(PacketError.Condition.not_allowed);
session.process(reply);
// Check if a message notifying the rejection should be sent
if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) {
// A message for the rejection will be sent to the sender of the rejected packet
Message notification = new Message();
notification.setTo(session.getAddress());
notification.setFrom(packet.getTo());
notification.setBody(e.getRejectionMessage());
session.process(notification);
}
router.route(route.getChildElement());
}
}
private void processPresence(ClientSession session, Presence packet) {
packet.setFrom(session.getAddress());
try {
// Invoke the interceptors before we process the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
false);
router.route(packet);
// Invoke the interceptors after we have processed the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
true);
session.incrementClientPacketCount();
}
catch (PacketRejectedException e) {
// An interceptor rejected this packet so answer a not_allowed error
Presence reply = new Presence();
reply.setID(packet.getID());
reply.setTo(session.getAddress());
reply.setFrom(packet.getTo());
reply.setError(PacketError.Condition.not_allowed);
session.process(reply);
// Check if a message notifying the rejection should be sent
if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) {
// A message for the rejection will be sent to the sender of the rejected packet
Message notification = new Message();
notification.setTo(session.getAddress());
notification.setFrom(packet.getTo());
notification.setBody(e.getRejectionMessage());
session.process(notification);
}
}
}
private void processMessage(ClientSession session, Message packet) {
packet.setFrom(session.getAddress());
try {
// Invoke the interceptors before we process the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
false);
router.route(packet);
// Invoke the interceptors after we have processed the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
true);
session.incrementClientPacketCount();
}
catch (PacketRejectedException e) {
// An interceptor rejected this packet
if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) {
// A message for the rejection will be sent to the sender of the rejected packet
Message reply = new Message();
reply.setID(packet.getID());
reply.setTo(session.getAddress());
reply.setFrom(packet.getTo());
reply.setType(packet.getType());
reply.setThread(packet.getThread());
reply.setBody(e.getRejectionMessage());
session.process(reply);
}
}
}
private IQ getIQ(Element doc) {
Element query = doc.element("query");
if (query != null && "jabber:iq:roster".equals(query.getNamespaceURI())) {
return new Roster(doc);
catch (UnknownStanzaException use) {
Element extraError = DocumentHelper.createElement(QName.get(
"unknown-stanza",
"http://jabber.org/protocol/connectionmanager#errors"));
sendErrorPacket(route, PacketError.Condition.bad_request, extraError);
}
else {
return new IQ(doc);
catch (UnsupportedEncodingException e) {
Log.error("Error processing wrapped packet: " + route.getChildElement().asXML(), e);
sendErrorPacket(route, PacketError.Condition.internal_server_error, null);
}
}
......@@ -298,6 +177,7 @@ public class MultiplexerPacketHandler {
*
* @param packet the packet to be bounced.
* @param extraError application specific error or null if none.
* @param error the error.
*/
private void sendErrorPacket(IQ packet, PacketError.Condition error, Element extraError) {
IQ reply = IQ.createResultIQ(packet);
......@@ -316,6 +196,7 @@ public class MultiplexerPacketHandler {
*
* @param packet the packet to be bounced.
* @param extraError application specific error or null if none.
* @param error the error.
*/
private void sendErrorPacket(Route packet, PacketError.Condition error, Element extraError) {
Route reply = new Route(packet.getStreamID());
......
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.multiplex;
import org.jivesoftware.wildfire.PacketRouter;
import org.jivesoftware.wildfire.ClientSession;
import org.jivesoftware.wildfire.XMPPServer;
import org.jivesoftware.wildfire.interceptor.InterceptorManager;
import org.jivesoftware.wildfire.interceptor.PacketRejectedException;
import org.jivesoftware.wildfire.net.SASLAuthentication;
import org.xmpp.packet.*;
import org.dom4j.Element;
import java.io.UnsupportedEncodingException;
/**
* Handles the routing of packets to a particular session.
*
* @author Alexander Wenckus
*/
public class MultiplexerPacketRouter {
private ClientSession session;
private PacketRouter router;
public MultiplexerPacketRouter(ClientSession session) {
this.session = session;
router = XMPPServer.getInstance().getPacketRouter();
}
public void route(Element wrappedElement)
throws UnsupportedEncodingException, UnknownStanzaException
{
String tag = wrappedElement.getName();
if ("auth".equals(tag) || "response".equals(tag)) {
SASLAuthentication.handle(session, wrappedElement);
}
else if ("iq".equals(tag)) {
route(getIQ(wrappedElement));
}
else if ("message".equals(tag)) {
route(new Message(wrappedElement));
}
else if ("presence".equals(tag)) {
route(new Presence(wrappedElement));
}
else {
throw new UnknownStanzaException();
}
}
private IQ getIQ(Element doc) {
Element query = doc.element("query");
if (query != null && "jabber:iq:roster".equals(query.getNamespaceURI())) {
return new Roster(doc);
}
else {
return new IQ(doc);
}
}
public void route(IQ packet) {
packet.setFrom(session.getAddress());
try {
// Invoke the interceptors before we process the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
false);
router.route(packet);
// Invoke the interceptors after we have processed the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
true);
session.incrementClientPacketCount();
}
catch (PacketRejectedException e) {
// An interceptor rejected this packet so answer a not_allowed error
IQ reply = new IQ();
reply.setChildElement(packet.getChildElement().createCopy());
reply.setID(packet.getID());
reply.setTo(session.getAddress());
reply.setFrom(packet.getTo());
reply.setError(PacketError.Condition.not_allowed);
session.process(reply);
// Check if a message notifying the rejection should be sent
if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) {
// A message for the rejection will be sent to the sender of the rejected packet
Message notification = new Message();
notification.setTo(session.getAddress());
notification.setFrom(packet.getTo());
notification.setBody(e.getRejectionMessage());
session.process(notification);
}
}
}
public void route(Message packet) {
packet.setFrom(session.getAddress());
try {
// Invoke the interceptors before we process the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
false);
router.route(packet);
// Invoke the interceptors after we have processed the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
true);
session.incrementClientPacketCount();
}
catch (PacketRejectedException e) {
// An interceptor rejected this packet
if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) {
// A message for the rejection will be sent to the sender of the rejected packet
Message reply = new Message();
reply.setID(packet.getID());
reply.setTo(session.getAddress());
reply.setFrom(packet.getTo());
reply.setType(packet.getType());
reply.setThread(packet.getThread());
reply.setBody(e.getRejectionMessage());
session.process(reply);
}
}
}
public void route(Presence packet) {
packet.setFrom(session.getAddress());
try {
// Invoke the interceptors before we process the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
false);
router.route(packet);
// Invoke the interceptors after we have processed the read packet
InterceptorManager.getInstance().invokeInterceptors(packet, session, true,
true);
session.incrementClientPacketCount();
}
catch (PacketRejectedException e) {
// An interceptor rejected this packet so answer a not_allowed error
Presence reply = new Presence();
reply.setID(packet.getID());
reply.setTo(session.getAddress());
reply.setFrom(packet.getTo());
reply.setError(PacketError.Condition.not_allowed);
session.process(reply);
// Check if a message notifying the rejection should be sent
if (e.getRejectionMessage() != null && e.getRejectionMessage().trim().length() > 0) {
// A message for the rejection will be sent to the sender of the rejected packet
Message notification = new Message();
notification.setTo(session.getAddress());
notification.setFrom(packet.getTo());
notification.setBody(e.getRejectionMessage());
session.process(notification);
}
}
}
}
/**
* $RCSfile: $
* $Revision: $
* $Date: $
*
* Copyright (C) 2006 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.multiplex;
/**
*
*/
public class UnknownStanzaException extends Exception {
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment