Commit 82bbbc3a authored by Gaston Dombiak's avatar Gaston Dombiak Committed by gato

Refactoring work that includes: 1) Concurrency fixes, 2) small redesign, 3)...

Refactoring work that includes: 1) Concurrency fixes, 2) small redesign, 3) optimizations, 4) code cleanup.

git-svn-id: http://svn.igniterealtime.org/svn/repos/wildfire/trunk@6713 b35dd754-fafc-0310-a699-88a17e54d16e
parent dc3f660d
/**
* $Revision$
* $Date$
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.mediaproxy;
import org.jivesoftware.util.Log;
import java.io.IOException;
import java.net.*;
import java.util.ArrayList;
import java.util.List;
/**
* Listen packets from defined dataSocket and send packets to the defined host.
*
* @author Thiago Camargo
*/
abstract class Channel implements Runnable {
protected byte[] buf = new byte[5000];
protected DatagramSocket dataSocket;
protected DatagramPacket packet;
protected boolean enabled = true;
List<DatagramListener> listeners = new ArrayList<DatagramListener>();
protected InetAddress host;
protected int port;
/**
* Creates a Channel according to the parameters.
*
* @param dataSocket
* @param host
* @param port
*/
public Channel(DatagramSocket dataSocket, InetAddress host, int port) {
this.dataSocket = dataSocket;
this.host = host;
this.port = port;
}
/**
* Get the host that the packet will be sent to.
*
* @return remote host address
*/
public InetAddress getHost() {
return host;
}
/**
* Set the host that the packet will be sent to.
*/
protected void setHost(InetAddress host) {
this.host = host;
}
/**
* Get the port that the packet will be sent to.
*
* @return The remote port number
*/
public int getPort() {
return port;
}
/**
* Set the port that the packet will be sent to.
*
* @param port
*/
protected void setPort(int port) {
this.port = port;
}
/**
* Adds a DatagramListener to the Channel
*
* @param datagramListener
*/
public void addListener(DatagramListener datagramListener) {
listeners.add(datagramListener);
}
/**
* Remove a DatagramListener from the Channel
*
* @param datagramListener
*/
public void removeListener(DatagramListener datagramListener) {
listeners.remove(datagramListener);
}
/**
* Remove every Listeners
*/
public void removeListener() {
listeners.removeAll(listeners);
}
public void cancel() {
this.enabled = false;
dataSocket.close();
}
/**
* Thread override method
*/
public void run() {
try {
while (enabled) {
// Block until a datagram appears:
packet = new DatagramPacket(buf, buf.length);
dataSocket.receive(packet);
if (handle(packet)) {
boolean resend = true;
for (DatagramListener dl : listeners) {
boolean send = dl.datagramReceived(packet);
if (resend && !send) {
resend = false;
}
}
if (resend) {
relayPacket(packet);
}
}
}
}
catch (UnknownHostException uhe) {
if (enabled) {
Log.error("Unknown Host", uhe);
}
}
catch (SocketException se) {
if (enabled) {
Log.error("Socket closed", se);
}
}
catch (IOException ioe) {
if (enabled) {
Log.error("Communication error", ioe);
}
}
}
public void relayPacket(DatagramPacket packet) {
try {
DatagramPacket echo = new DatagramPacket(packet.getData(), packet.getLength(), host, port);
dataSocket.send(echo);
}
catch (IOException e) {
Log.error(e);
}
}
/**
* Handles received packet and returns true if the packet should be processed by the channel.
*
* @param packet received datagram packet
* @return true if listeners will be alerted that a new packet was received.
*/
abstract boolean handle(DatagramPacket packet);
}
\ No newline at end of file
......@@ -14,6 +14,8 @@ import java.net.DatagramPacket;
/**
* Listener for datagram packets received.
*
* @author Thiago Camargo
*/
public interface DatagramListener {
......
/**
* $Revision$
* $Date$
*
* Copyright (C) 2007 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution.
*/
package org.jivesoftware.wildfire.mediaproxy;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
/**
* Listen packets from defined dataSocket and send packets to the defined host.
* But also provides a mechanism to dynamic bind host and port implementing DatagramListener methods to
* change the host and port values according to the received packets.
*
* @author Thiago Camargo
*/
public class DynamicAddressChannel extends Channel implements Runnable, DatagramListener {
private int c = 0;
/**
* Default Channel Constructor
*
* @param dataSocket datasocket to used to send and receive packets
* @param host default destination host for received packets
* @param port default destination port for received packets
*/
public DynamicAddressChannel(DatagramSocket dataSocket, InetAddress host, int port) {
super(dataSocket, host, port);
}
boolean handle(DatagramPacket packet) {
// Relay Destination
if (c++ < 100) { // 100 packets are enough to discover relay address
this.setHost(packet.getAddress());
this.setPort(packet.getPort());
return true;
} else {
c = 1000; // Prevents long overflow
// Check Source Address. If its different, discard packet.
return this.getHost().equals(packet.getAddress());
}
}
/**
* Implement DatagramListener method.
* Set the host and port value to the host and port value from the received packet.
*
* @param datagramPacket the received packet
*/
public boolean datagramReceived(DatagramPacket datagramPacket) {
this.relayPacket(datagramPacket);
return false;
}
}
\ No newline at end of file
......@@ -12,8 +12,9 @@ package org.jivesoftware.wildfire.mediaproxy;
import org.jivesoftware.util.Log;
import java.util.ArrayList;
import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* A Media Proxy relays UDP traffic between two IPs to provide connectivity between
......@@ -23,12 +24,14 @@ import java.util.List;
*
* Each connection relay between two parties is called a session. You can setup a MediaProxy
* for all network interfaces with an empty constructor, or bind it to a specific interface
* with the MediaProxy(String localhost) constructor. <i>The media proxy ONLY works if your
* with the MediaProxy(String localhost) constructor. <i>The media proxy ONLY works if you
* are directly connected to the Internet with a valid IP address.</i>.
*
* @author Thiago Camargo
*/
public class MediaProxy implements SessionListener {
final private List<MediaProxySession> sessions = new ArrayList<MediaProxySession>();
final private Map<String, MediaProxySession> sessions = new ConcurrentHashMap<String, MediaProxySession>();
private String localhost;
......@@ -40,14 +43,6 @@ public class MediaProxy implements SessionListener {
// Lifetime of a Channel in Seconds
private long lifetime = 9000;
/**
* Contruct a MediaProxy instance that will listen from every Network Interface.
* Recommended.
*/
public MediaProxy() {
this.localhost = "localhost";
}
/**
* Contruct a MediaProxy instance that will listen on a specific network interface.
*
......@@ -58,7 +53,7 @@ public class MediaProxy implements SessionListener {
}
/**
* Get the public IP of this RTP Proxy that listen for the incomming packets
* Get the public IP of this media proxy that listen for incomming packets.
*
* @return the host that listens for incomming packets.
*/
......@@ -93,8 +88,8 @@ public class MediaProxy implements SessionListener {
*
* @return List of the Agents
*/
public List<MediaProxySession> getSessions() {
return sessions;
public Collection<MediaProxySession> getSessions() {
return sessions.values();
}
/**
......@@ -160,10 +155,11 @@ public class MediaProxy implements SessionListener {
* @return the session or <tt>null</tt> if the session doesn't exist.
*/
public MediaProxySession getSession(String sid) {
for (MediaProxySession session : sessions) {
if (session.getSID().equals(sid)) {
System.out.println("SID: " + sid + " agentSID: " + session.getSID());
return session;
MediaProxySession proxySession = sessions.get(sid);
if (proxySession != null) {
if (Log.isDebugEnabled()) {
Log.debug("SID: " + sid + " agentSID: " + proxySession.getSID());
return proxySession;
}
}
return null;
......@@ -176,41 +172,21 @@ public class MediaProxy implements SessionListener {
* @param session the session that stopped
*/
public void sessionClosed(MediaProxySession session) {
sessions.remove(session);
Log.debug("Session: " + session.getSID() + " removed.");
}
/**
* Add a new Static Session to the mediaproxy for defined IPs and ports.
* Create a channel between two IPs. ( Point A - Point B )
*
* @param id id of the candidate returned (Could be a Jingle session ID)
* @param creator the agent creator name or description
* @param hostA the hostname or IP of the point A of the Channel
* @param portA the port number point A of the Channel
* @param hostB the hostname or IP of the point B of the Channel
* @param portB the port number point B of the Channel
* @return the added ProxyCandidate
*/
public ProxyCandidate addAgent(String id, String creator, String hostA, int portA, String hostB,
int portB)
{
MediaProxySession session = new MediaProxySession(
id, creator, localhost, hostA, portA, hostB, portB, minPort, maxPort);
sessions.add(session);
session.addKeepAlive(idleTime);
session.addLifeTime(lifetime);
session.addAgentListener(this);
return session;
sessions.remove(session.getSID());
if (Log.isDebugEnabled()) {
Log.debug("Session: " + session.getSID() + " removed.");
}
}
/**
* Add a new Dynamic Session to the mediaproxy for defined IPs and ports.
* The IP and port pairs can change depending of the Senders IP and port.
* Which means that the IP and port values of the points can dynamic change after the Channel is opened.
* When the agent receives a packet from Point A, the channel set the point A IP and port according to the received packet sender IP and port.
* When the agent receives a packet from Point A, the channel set the point A IP and port according to
* the received packet sender IP and port.
* Every packet received from Point B will be relayed to the new Point A IP and port.
* When the agent receives a packet from Point B, the channel set the point B IP and port according to the received packet sender IP and port.
* When the agent receives a packet from Point B, the channel set the point B IP and port according to
* the received packet sender IP and port.
* Every packet received from Point A will be relayed to the new Point B IP and port.
* Create a dynamic channel between two IPs. ( Dynamic Point A - Dynamic Point B )
*
......@@ -222,12 +198,11 @@ public class MediaProxy implements SessionListener {
* @param portB the port number point B of the Channel
* @return the added ProxyCandidate
*/
public ProxyCandidate addSmartAgent(String id, String creator, String hostA, int portA,
public ProxyCandidate addRelayAgent(String id, String creator, String hostA, int portA,
String hostB, int portB)
{
SmartSession session = new SmartSession(id, creator, localhost, hostA, portA, hostB, portB,
minPort, maxPort);
sessions.add(session);
RelaySession session = new RelaySession(id, creator, localhost, hostA, portA, hostB, portB, minPort, maxPort);
sessions.put(id, session);
session.addKeepAlive(idleTime);
session.addLifeTime(lifetime);
session.addAgentListener(this);
......@@ -237,10 +212,13 @@ public class MediaProxy implements SessionListener {
/**
* Add a new Dynamic Session to the mediaproxy WITHOUT defined IPs and ports.
* The IP and port pairs WILL change depending of the Senders IP and port.
* Which means that the IP and port values of the points will dynamic change after the Channel is opened and received packet from both points.
* When the agent receives a packet from Point A, the channel set the point A IP and port according to the received packet sender IP and port.
* Which means that the IP and port values of the points will dynamic change after the Channel is opened
* and received packet from both points.
* When the agent receives a packet from Point A, the channel set the point A IP and port according to
* the received packet sender IP and port.
* Every packet received from Point B will be relayed to the new Point A IP and port.
* When the agent receives a packet from Point B, the channel set the point B IP and port according to the received packet sender IP and port.
* When the agent receives a packet from Point B, the channel set the point B IP and port according to
* the received packet sender IP and port.
* Every packet received from Point A will be relayed to the new Point B IP and port.
* Create a dynamic channel between two IPs. ( Dynamic Point A - Dynamic Point B )
*
......@@ -248,21 +226,21 @@ public class MediaProxy implements SessionListener {
* @param creator the agent creator name or description
* @return the added ProxyCandidate
*/
public ProxyCandidate addSmartAgent(String id, String creator) {
return addSmartAgent(id, creator, localhost, 40000, localhost, 40004);
public ProxyCandidate addRelayAgent(String id, String creator) {
return addRelayAgent(id, creator, localhost, 40000, localhost, 40004);
}
/**
* Stop every running sessions.
*/
public void stopProxy() {
void stopProxy() {
for (MediaProxySession session : getSessions()) {
try {
session.clearAgentListeners();
session.stopAgent();
}
catch (Exception e) {
e.printStackTrace();
Log.error("Error cleaning up media proxy sessions", e);
}
}
sessions.clear();
......
......@@ -28,10 +28,7 @@ import org.xmpp.packet.JID;
import org.xmpp.packet.Packet;
import org.xmpp.packet.PacketError;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.*;
/**
* A proxy service for UDP traffic such as RTP. It provides Jingle transport candidates
......@@ -40,14 +37,14 @@ import java.util.List;
*
* @author Thiago Camargo
*/
public class MediaProxyService extends BasicModule implements ServerItemsProvider, RoutableChannelHandler, DiscoInfoProvider, DiscoItemsProvider {
public class MediaProxyService extends BasicModule
implements ServerItemsProvider, RoutableChannelHandler, DiscoInfoProvider, DiscoItemsProvider {
private String serviceName;
private RoutingTable routingTable;
private PacketRouter router;
private MediaProxy mediaProxy = null;
private String name = "rtpbridge";
private boolean enabled = false;
public static final String NAMESPACE = "http://www.jivesoftware.com/protocol/rtpbridge";
......@@ -59,84 +56,34 @@ public class MediaProxyService extends BasicModule implements ServerItemsProvide
super("Media Proxy Service");
}
/**
* Load config using JiveGlobals
*/
private void loadRTPProxyConfig() {
try {
long idleTime =
Long.valueOf(JiveGlobals.getProperty("mediaproxy.idleTimeout"));
mediaProxy.setIdleTime(idleTime);
}
catch (NumberFormatException e) {
// Do nothing let the default values to be used.
}
try {
long lifetime =
Long.valueOf(JiveGlobals.getProperty("mediaproxy.lifetime"));
mediaProxy.setLifetime(lifetime);
}
catch (NumberFormatException e) {
// Do nothing let the default values to be used.
}
try {
int minPort = Integer.valueOf(JiveGlobals.getProperty("mediaproxy.portMin"));
mediaProxy.setMinPort(minPort);
}
catch (NumberFormatException e) {
// Do nothing let the default values to be used.
}
int maxPort = JiveGlobals.getIntProperty("mediaproxy.portMax", mediaProxy.getMaxPort());
mediaProxy.setMaxPort(maxPort);
this.enabled = JiveGlobals.getBooleanProperty("mediaproxy.enabled");
}
public void destroy() {
super.destroy();
// Unregister component.
try {
mediaProxy.stopProxy();
}
catch (Exception e) {
Log.error(e);
}
mediaProxy = null;
}
public void initialize(XMPPServer server) {
super.initialize(server);
String hostname = JiveGlobals.getProperty("xmpp.domain",
JiveGlobals.getProperty("network.interface", "localhost"));
mediaProxy = new MediaProxy(hostname);
serviceName = JiveGlobals.getProperty("mediaproxy.serviceName", name);
serviceName = serviceName == null ? name : serviceName.equals("") ? name : serviceName;
mediaProxy = new MediaProxy(server.getServerInfo().getName());
String defaultName = "rtpbridge";
serviceName = JiveGlobals.getProperty("mediaproxy.serviceName", defaultName);
serviceName = serviceName.equals("") ? defaultName : serviceName;
routingTable = server.getRoutingTable();
router = server.getPacketRouter();
loadRTPProxyConfig();
initMediaProxy();
}
public void start() {
if (isEnabled()) {
startProxy();
routingTable.addRoute(getAddress(), this);
XMPPServer.getInstance().getIQDiscoItemsHandler().addServerItemsProvider(this);
} else {
XMPPServer.getInstance().getIQDiscoItemsHandler().removeServerItemsProvider(this);
XMPPServer.getInstance().getIQDiscoItemsHandler().removeComponentItem(getAddress().toString());
}
}
public void startProxy() {
routingTable.addRoute(getAddress(), this);
XMPPServer server = XMPPServer.getInstance();
server.getIQDiscoItemsHandler().addServerItemsProvider(this);
}
public void stop() {
super.stop();
mediaProxy.stopProxy();
XMPPServer.getInstance().getIQDiscoItemsHandler()
.removeComponentItem(getAddress().toString());
XMPPServer.getInstance().getIQDiscoItemsHandler().removeComponentItem(getAddress().toString());
routingTable.removeRoute(getAddress());
}
......@@ -172,39 +119,29 @@ public class MediaProxyService extends BasicModule implements ServerItemsProvide
String namespace = childElement.getNamespaceURI();
Element childElementCopy = iq.getChildElement().createCopy();
reply.setChildElement(childElementCopy);
Log.debug("RECEIVED:" + iq.toXML());
if (Log.isDebugEnabled()) {
Log.debug("RECEIVED:" + iq.toXML());
}
if ("http://jabber.org/protocol/disco#info".equals(namespace)) {
try {
reply = XMPPServer.getInstance().getIQDiscoInfoHandler().handleIQ(iq);
router.route(reply);
return;
}
catch (UnauthorizedException e) {
// Do nothing. This error should never happen
}
reply = XMPPServer.getInstance().getIQDiscoInfoHandler().handleIQ(iq);
router.route(reply);
return;
} else if ("http://jabber.org/protocol/disco#items".equals(namespace)) {
try {
// a component
reply = XMPPServer.getInstance().getIQDiscoItemsHandler().handleIQ(iq);
router.route(reply);
return;
}
catch (UnauthorizedException e) {
// Do nothing. This error should never happen
}
// a component
reply = XMPPServer.getInstance().getIQDiscoItemsHandler().handleIQ(iq);
router.route(reply);
return;
} else if (NAMESPACE.equals(namespace) && enabled) {
Element c = childElementCopy.element("candidate");
if (c != null) {
Element candidateElement = childElementCopy.element("candidate");
String sid = childElementCopy.attribute("sid").getValue() + "-" + iq.getFrom();
childElementCopy.remove(c);
if (candidateElement != null) {
childElementCopy.remove(candidateElement);
Element candidate = childElementCopy.addElement("candidate ");
ProxyCandidate proxyCandidate = mediaProxy.addSmartAgent(
childElementCopy.attribute("sid").getValue() + "-" + iq.getFrom(),
iq.getFrom().toString());
Log.debug(childElementCopy.attribute("sid").getValue() + "-" + iq.getFrom());
ProxyCandidate proxyCandidate = mediaProxy.addRelayAgent(sid, iq.getFrom().toString());
Log.debug(sid);
proxyCandidate.start();
candidate.addAttribute("name", "voicechannel");
candidate.addAttribute("ip", mediaProxy.getPublicIP());
......@@ -213,76 +150,91 @@ public class MediaProxyService extends BasicModule implements ServerItemsProvide
candidate.addAttribute("pass", proxyCandidate.getPass());
} else {
c = childElementCopy.element("relay");
if (c != null) {
MediaProxySession session = mediaProxy.getSession(
childElementCopy.attribute("sid").getValue() + "-" + iq.getFrom());
Log.debug(
childElementCopy.attribute("sid").getValue() + "-" + iq.getFrom());
candidateElement = childElementCopy.element("relay");
if (candidateElement != null) {
MediaProxySession session = mediaProxy.getSession(sid);
Log.debug(sid);
if (session != null) {
Attribute pass = c.attribute("pass");
Attribute pass = candidateElement.attribute("pass");
if (pass != null && pass.getValue().trim().equals(session.getPass().trim())) {
Log.debug("RIGHT PASS");
Attribute portA = c.attribute("porta");
Attribute portB = c.attribute("portb");
Attribute hostA = c.attribute("hosta");
Attribute hostB = c.attribute("hostb");
Attribute portA = candidateElement.attribute("porta");
Attribute portB = candidateElement.attribute("portb");
Attribute hostA = candidateElement.attribute("hosta");
Attribute hostB = candidateElement.attribute("hostb");
try {
if (hostA != null) {
if (portA != null) {
for (int i = 0; i < 2; i++) {
session.sendFromPortA(hostB.getValue(),
Integer.parseInt(portB.getValue()));
}
if (hostA != null && portA != null) {
for (int i = 0; i < 2; i++) {
session.sendFromPortA(hostB.getValue(), Integer.parseInt(portB.getValue()));
}
}
}
catch (Exception e) {
Log.error(e);
}
//System.out.println(session.getLocalPortA() + "->" + session.getPortA());
//System.out.println(session.getLocalPortB() + "->" + session.getPortB());
//componentManager.getLog().debug(session.getLocalPortA() + "->" + session.getPortA());
//componentManager.getLog().debug(session.getLocalPortB() + "->" + session.getPortB());
} else {
reply.setError(PacketError.Condition.forbidden);
}
}
childElementCopy.remove(c);
childElementCopy.remove(candidateElement);
}
}
} else {
// Answer an error since the server can't handle the requested namespace
reply.setError(PacketError.Condition.service_unavailable);
}
try {
Log.debug("RETURNED:" + reply.toXML());
if (Log.isDebugEnabled()) {
Log.debug("RETURNED:" + reply.toXML());
}
router.route(reply);
}
catch (Exception e) {
Log.error(e);
}
}
/**
* Load config using JiveGlobals
*/
private void initMediaProxy() {
try {
long idleTime =
Long.valueOf(JiveGlobals.getProperty("mediaproxy.idleTimeout"));
mediaProxy.setIdleTime(idleTime);
}
catch (NumberFormatException e) {
// Do nothing let the default values to be used.
}
try {
long lifetime =
Long.valueOf(JiveGlobals.getProperty("mediaproxy.lifetime"));
mediaProxy.setLifetime(lifetime);
}
catch (NumberFormatException e) {
// Do nothing let the default values to be used.
}
try {
int minPort = Integer.valueOf(JiveGlobals.getProperty("mediaproxy.portMin"));
mediaProxy.setMinPort(minPort);
}
catch (NumberFormatException e) {
// Do nothing let the default values to be used.
}
try {
int maxPort = JiveGlobals.getIntProperty("mediaproxy.portMax", mediaProxy.getMaxPort());
mediaProxy.setMaxPort(maxPort);
}
catch (NumberFormatException e) {
// Do nothing let the default values to be used.
}
this.enabled = JiveGlobals.getBooleanProperty("mediaproxy.enabled");
}
/**
* Returns the fully-qualifed domain name of this chat service.
* The domain is composed by the service name and the
......@@ -363,7 +315,7 @@ public class MediaProxyService extends BasicModule implements ServerItemsProvide
*
* @return list of active agents
*/
public List<MediaProxySession> getAgents() {
public Collection<MediaProxySession> getAgents() {
return mediaProxy.getSessions();
}
......@@ -440,7 +392,7 @@ public class MediaProxyService extends BasicModule implements ServerItemsProvide
public void setEnabled(boolean enabled) {
this.enabled = enabled;
if (isEnabled()) {
startProxy();
start();
} else {
stop();
}
......
......@@ -3,7 +3,10 @@ package org.jivesoftware.wildfire.mediaproxy;
import org.jivesoftware.util.Log;
import java.io.IOException;
import java.net.*;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.*;
/**
......@@ -13,7 +16,7 @@ import java.util.*;
*
* @author Thiago Camargo
*/
public class MediaProxySession extends Thread implements ProxyCandidate, DatagramListener {
public abstract class MediaProxySession extends Thread implements ProxyCandidate, DatagramListener {
private List<SessionListener> sessionListeners = new ArrayList<SessionListener>();
......@@ -66,7 +69,8 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
* @param minPort the minimal port value to be used by the server
* @param maxPort the maximun port value to be used by the server
*/
public MediaProxySession(String id, String creator, String localAddress, String hostA, int portA, String hostB, int portB, int minPort, int maxPort) {
public MediaProxySession(String id, String creator, String localAddress, String hostA, int portA, String hostB,
int portB, int minPort, int maxPort) {
this.id = id;
this.creator = creator;
this.minPort = minPort;
......@@ -86,11 +90,12 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
this.localPortB = getFreePort();
this.socketB = new DatagramSocket(localPortB, this.localAddress);
this.socketBControl = new DatagramSocket(localPortB + 1, this.localAddress);
Log.debug("Session Created at: A " + localPortA + " : B " + localPortB);
if (Log.isDebugEnabled()) {
Log.debug("Session Created at: A " + localPortA + " : B " + localPortB);
}
}
catch (Exception e) {
e.printStackTrace();
Log.error(e);
}
}
......@@ -118,7 +123,7 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
return freePort;
}
catch (IOException e) {
e.printStackTrace();
Log.error(e);
}
}
try {
......@@ -127,7 +132,7 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
ss.close();
}
catch (IOException e) {
e.printStackTrace();
Log.error(e);
} finally {
ss = null;
}
......@@ -174,12 +179,10 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
* Thread override method
*/
public void run() {
// Create channels for parties
createChannels();
channelAtoB = new Channel(socketA, hostB, portB);
channelAtoBControl = new Channel(socketAControl, hostB, portB + 1);
channelBtoA = new Channel(socketB, hostA, portA);
channelBtoAControl = new Channel(socketBControl, hostA, portA + 1);
// Start a thread for each channel
threadAtoB = new Thread(channelAtoB);
threadAtoBControl = new Thread(channelAtoBControl);
threadBtoA = new Thread(channelBtoA);
......@@ -190,12 +193,24 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
threadBtoA.start();
threadBtoAControl.start();
// Listen to channel events
addChannelListeners();
}
/**
* Creates 4 new channels for the two entities. We will create a channel between A and B and vice versa
* and also a control channel betwwen A and B and vice versa.
*/
abstract void createChannels();
/**
* Adds listener to channel events like receiving data.
*/
void addChannelListeners() {
channelAtoB.addListener(this);
channelAtoBControl.addListener(this);
channelBtoA.addListener(this);
channelBtoAControl.addListener(this);
//System.out.println("Session running between " + hostA + " and " + hostB);
}
/**
......@@ -210,7 +225,7 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
idleTimer = null;
}
} catch (Exception e) {
e.printStackTrace();
Log.error(e);
}
try {
......@@ -220,7 +235,7 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
lifeTimer = null;
}
} catch (Exception e) {
e.printStackTrace();
Log.error(e);
}
channelAtoB.removeListener();
......@@ -234,7 +249,7 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
channelBtoA.cancel();
channelBtoAControl.cancel();
} catch (Exception e) {
e.printStackTrace();
Log.error(e);
}
socketA.close();
......@@ -244,7 +259,7 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
dispatchAgentStopped();
System.out.println("Session Stopped");
Log.debug("Session Stopped");
}
/**
......@@ -280,7 +295,9 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
* @param portA the port number for A
*/
public void setPortA(int portA) {
System.out.println("PORT CHANGED(A):" + portA);
if (Log.isDebugEnabled()) {
Log.debug("PORT CHANGED(A):" + portA);
}
this.portA = portA;
}
......@@ -290,7 +307,9 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
* @param portB the port number for B
*/
public void setPortB(int portB) {
System.out.println("PORT CHANGED(B):" + portB);
if (Log.isDebugEnabled()) {
Log.debug("PORT CHANGED(B):" + portB);
}
this.portB = portB;
}
......@@ -386,7 +405,8 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
/**
* Add a keep alive detector.
* If the packet still more than the keep alive delay without receiving any packets. The Session is stoped and remove from agents List.
* If the packet still more than the keep alive delay without receiving any packets. The Session is
* stoped and remove from agents List.
*
* @param delay delay time in millis to check if the channel is inactive
*/
......@@ -420,7 +440,6 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
lifeTimer.scheduleAtFixedRate(new TimerTask() {
public void run() {
stopAgent();
return;
}
}, lifetime, lifetime);
}
......@@ -454,161 +473,10 @@ public class MediaProxySession extends Thread implements ProxyCandidate, Datagra
* Dispatch Stop Event
*/
public void dispatchAgentStopped() {
for (SessionListener sessionListener : sessionListeners)
for (SessionListener sessionListener : sessionListeners) {
try {
sessionListener.sessionClosed(this);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Protected Class Channel.
* Listen packets from defined dataSocket and send packets to the defined host.
*/
protected class Channel implements Runnable {
protected byte[] buf = new byte[5000];
protected DatagramSocket dataSocket;
protected DatagramPacket packet;
protected boolean enabled = true;
List<DatagramListener> listeners = new ArrayList<DatagramListener>();
protected InetAddress host;
protected int port;
/**
* Creates a Channel according to the parameters.
*
* @param dataSocket
* @param host
* @param port
*/
public Channel(DatagramSocket dataSocket, InetAddress host, int port) {
this.dataSocket = dataSocket;
this.host = host;
this.port = port;
}
/**
* Get the host that the packet will be sent to.
*
* @return remote host address
*/
public InetAddress getHost() {
return host;
}
/**
* Set the host that the packet will be sent to.
*/
protected void setHost(InetAddress host) {
this.host = host;
}
/**
* Get the port that the packet will be sent to.
*
* @return The remote port number
*/
public int getPort() {
return port;
}
/**
* Set the port that the packet will be sent to.
*
* @param port
*/
protected void setPort(int port) {
this.port = port;
}
/**
* Adds a DatagramListener to the Channel
*
* @param datagramListener
*/
public void addListener(DatagramListener datagramListener) {
listeners.add(datagramListener);
}
/**
* Remove a DatagramListener from the Channel
*
* @param datagramListener
*/
public void removeListener(DatagramListener datagramListener) {
listeners.remove(datagramListener);
}
/**
* Remove every Listeners
*/
public void removeListener() {
listeners.removeAll(listeners);
}
public void cancel() {
this.enabled = false;
dataSocket.close();
}
/**
* Thread override method
*/
public void run() {
try {
long c = 0;
while (true) {
// Block until a datagram appears:
packet = new DatagramPacket(buf, buf.length);
dataSocket.receive(packet);
if (this.getPort() != packet.getPort())
System.out.println(dataSocket.getLocalAddress().getHostAddress() + ":" + dataSocket.getLocalPort() + " relay to: " + packet.getAddress().getHostAddress() + ":" + packet.getPort());
if (c++ < 5) {
System.out.println("Received:" + dataSocket.getLocalAddress().getHostAddress() + ":" + dataSocket.getLocalPort());
System.out.println("Addr: " + packet.getAddress().getHostName());
}
boolean resend = true;
for (DatagramListener dl : listeners) {
boolean send = dl.datagramReceived(packet);
if (resend)
if (!send)
resend = false;
}
if (resend) relayPacket(packet);
}
}
catch (UnknownHostException uhe) {
if (enabled)
Log.error(uhe);
}
catch (SocketException se) {
if (enabled)
Log.error(se);
}
catch (IOException ioe) {
if (enabled)
Log.error(ioe);
}
}
public void relayPacket(DatagramPacket packet) {
try {
DatagramPacket echo = new DatagramPacket(packet.getData(), packet.getLength(),
host, port);
dataSocket.send(echo);
}
catch (IOException e) {
Log.error(e);
}
}
......
......@@ -14,6 +14,8 @@ import java.net.InetAddress;
/**
* Basic interface to access a Candidate provided by a Session
*
* @author Thiago Camargo
*/
public interface ProxyCandidate {
......@@ -50,5 +52,4 @@ public interface ProxyCandidate {
public void start();
public void stopAgent();
}
......@@ -10,23 +10,25 @@
package org.jivesoftware.wildfire.mediaproxy;
import java.io.IOException;
import java.net.*;
/**
* A Session Class will control "receive and relay" proccess.
* It creates UDP channels from Host A to Host B and from Host B to Host A using or NOT the specified hosts and ports.
* It creates UDP channels from Host A to Host B and from Host B to Host A using or NOT the specified
* hosts and ports.
* The IP and port pairs can change depending of the Senders IP and port.
* Which means that the IP and port values of the points can dynamic change after the Channel is opened.
* When the agent receives a packet from Point A, the channel set the point A IP and port according to the received packet sender IP and port.
* When the agent receives a packet from Point A, the channel set the point A IP and port according to the
* received packet sender IP and port.
* Every packet received from Point B will be relayed to the new Point A IP and port.
* When the agent receives a packet from Point B, the channel set the point B IP and port according to the received packet sender IP and port.
* When the agent receives a packet from Point B, the channel set the point B IP and port according to the
* received packet sender IP and port.
* Every packet received from Point A will be relayed to the new Point B IP and port.
* Create a dynamic channel between two IPs. ( Dynamic Point A - Dynamic Point B )
* It has 4 Channels. 2 for data and 2 for control.
*
* @author Thiago Camargo
*/
public class SmartSession extends MediaProxySession {
public class RelaySession extends MediaProxySession {
/**
* Creates a new Smart Session to provide connectivity between Host A and Host B.
......@@ -41,7 +43,8 @@ public class SmartSession extends MediaProxySession {
* @param minPort the minimal port number to be used by the proxy
* @param maxPort the maximun port number to be used by the proxy
*/
public SmartSession(String id, String creator, String localhost, String hostA, int portA, String hostB, int portB, int minPort, int maxPort) {
public RelaySession(String id, String creator, String localhost, String hostA, int portA, String hostB, int portB,
int minPort, int maxPort) {
super(id, creator, localhost, hostA, portA, hostB, portB, minPort, maxPort);
}
......@@ -56,123 +59,24 @@ public class SmartSession extends MediaProxySession {
* @param portB the port number point B of the Channel
* @param creator the created name or description of the Channel
*/
public SmartSession(String id, String creator, String localhost, String hostA, int portA, String hostB, int portB) {
public RelaySession(String id, String creator, String localhost, String hostA, int portA, String hostB, int portB) {
super(id, creator, localhost, hostA, portA, hostB, portB, 10000, 20000);
}
/**
* Thread override method
*/
public void run() {
channelAtoB = new SmartChannel(socketA, hostB, portB);
channelAtoBControl = new SmartChannel(socketAControl, hostB, portB + 1);
channelBtoA = new SmartChannel(socketB, hostA, portA);
channelBtoAControl = new SmartChannel(socketBControl, hostA, portA + 1);
channelAtoB.addListener((SmartChannel) channelBtoA);
channelAtoBControl.addListener((SmartChannel) channelBtoAControl);
channelBtoA.addListener((SmartChannel) channelAtoB);
channelBtoAControl.addListener((SmartChannel) channelAtoBControl);
threadAtoB = new Thread(channelAtoB);
threadAtoBControl = new Thread(channelAtoBControl);
threadBtoA = new Thread(channelBtoA);
threadBtoAControl = new Thread(channelBtoAControl);
threadAtoB.start();
threadAtoBControl.start();
threadBtoA.start();
threadBtoAControl.start();
channelAtoB.addListener(this);
channelAtoBControl.addListener(this);
channelBtoA.addListener(this);
channelBtoAControl.addListener(this);
void createChannels() {
channelAtoB = new DynamicAddressChannel(socketA, hostB, portB);
channelAtoBControl = new DynamicAddressChannel(socketAControl, hostB, portB + 1);
channelBtoA = new DynamicAddressChannel(socketB, hostA, portA);
channelBtoAControl = new DynamicAddressChannel(socketBControl, hostA, portA + 1);
}
/**
* Protected Class Channel.
* Listen packets from defined dataSocket and send packets to the defined host.
* But also provides a mechanism to dynamic bind host and port implementing DatagramListener methods to change the host and port values according to the received packets.
*/
protected class SmartChannel extends Channel implements Runnable, DatagramListener {
int c = 0;
/**
* Default Channel Constructor
*
* @param dataSocket datasocket to used to send and receive packets
* @param host default destination host for received packets
* @param port default destination port for received packets
*/
public SmartChannel(DatagramSocket dataSocket, InetAddress host, int port) {
super(dataSocket, host, port);
}
/**
* Thread override method
*/
public void run() {
try {
long c = 0;
long band = System.currentTimeMillis();
while (true) {
// Block until a datagram appears:
packet = new DatagramPacket(buf, buf.length);
dataSocket.receive(packet);
// Relay Destination
if (c++ < 100) { // 100 packets are enough to discover relay address
this.setHost(packet.getAddress());
this.setPort(packet.getPort());
} else {
c = 1000; // Prevents long overflow
// Check Source Address. If its different, discard packet.
if (!this.getHost().equals(packet.getAddress())) continue;
}
boolean resend = true;
for (DatagramListener dl : listeners) {
boolean send = dl.datagramReceived(packet);
if (resend)
if (!send)
resend = false;
}
if (resend) relayPacket(packet);
}
} catch (UnknownHostException e) {
if (enabled)
System.err.println("Unknown Host");
}
catch (SocketException e) {
if (enabled)
System.err.println("Socket closed");
} catch (IOException e) {
if (enabled)
System.err.println("Communication error");
e.printStackTrace();
}
}
/**
* Implement DatagramListener method.
* Set the host and port value to the host and port value from the received packet.
*
* @param datagramPacket the received packet
*/
public boolean datagramReceived(DatagramPacket datagramPacket) {
//InetAddress host = datagramPacket.getAddress();
//this.setHost(host);
//int port = datagramPacket.getPort();
//this.setPort(port);
this.relayPacket(datagramPacket);
return false;
}
void addChannelListeners() {
super.addChannelListeners();
// Add channel as listeners
channelAtoB.addListener((DynamicAddressChannel) channelBtoA);
channelAtoBControl.addListener((DynamicAddressChannel) channelBtoAControl);
channelBtoA.addListener((DynamicAddressChannel) channelAtoB);
channelBtoAControl.addListener((DynamicAddressChannel) channelAtoBControl);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment