MediaProxySession.java 14.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/**
 * Copyright (C) 2004-2009 Jive Software. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

17
package org.jivesoftware.openfire.mediaproxy;
18 19

import java.io.IOException;
20 21 22 23
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.ServerSocket;
24 25 26 27 28 29 30 31
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
32 33

/**
Matt Tucker's avatar
Matt Tucker committed
34 35 36 37 38
 * A media proxy session enables two clients to exchange UDP traffic. Each client connects to
 * a UDP port and then the proxy is responsible for exchanging traffic. Each session uses
 * a total of four ports: two for traffic exchange, and two control ports.
 *
 * @author Thiago Camargo
39
 */
40
public abstract class MediaProxySession extends Thread implements ProxyCandidate, DatagramListener {
41

42 43
	private static final Logger Log = LoggerFactory.getLogger(MediaProxySession.class);

44 45 46 47 48 49 50
    private List<SessionListener> sessionListeners = new ArrayList<SessionListener>();

    private String id;
    private String pass;
    private String creator = "";
    private long timestamp = 0;

Matt Tucker's avatar
Matt Tucker committed
51
    protected InetAddress localAddress;
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
    protected InetAddress hostA;
    protected InetAddress hostB;

    protected int portA;
    protected int portB;

    protected int localPortA;
    protected int localPortB;

    protected DatagramSocket socketA;
    protected DatagramSocket socketAControl;
    protected DatagramSocket socketB;
    protected DatagramSocket socketBControl;

    protected Channel channelAtoB;
    protected Channel channelAtoBControl;
    protected Channel channelBtoA;
    protected Channel channelBtoAControl;

    protected Thread threadAtoB;
    protected Thread threadAtoBControl;
    protected Thread threadBtoA;
    protected Thread threadBtoAControl;

Matt Tucker's avatar
Matt Tucker committed
76
    private Timer idleTimer = null;
77
    private Timer lifeTimer = null;
78 79 80 81 82 83 84

    private int minPort = 10000;
    private int maxPort = 20000;

    /**
     * Creates a new static UDP channel between Host A and Host B.
     *
85
     * @param id           of the Session (Could be a Jingle session ID)
Thiago Camargo's avatar
Thiago Camargo committed
86
     * @param creator      the session creator name or description
Matt Tucker's avatar
Matt Tucker committed
87
     * @param localAddress the localhost IP that will listen for UDP packets
88 89 90 91
     * @param hostA        the hostname or IP of the point A of the Channel
     * @param portA        the port number point A of the Channel
     * @param hostB        the hostname or IP of the point B of the Channel
     * @param portB        the port number point B of the Channel
Thiago Camargo's avatar
Thiago Camargo committed
92 93
     * @param minPort      the minimal port value to be used by the server
     * @param maxPort      the maximun port value to be used by the server
94
     */
95 96
    public MediaProxySession(String id, String creator, String localAddress, String hostA, int portA, String hostB,
                             int portB, int minPort, int maxPort) {
97 98 99 100 101 102 103 104 105 106 107 108
        this.id = id;
        this.creator = creator;
        this.minPort = minPort;
        this.maxPort = maxPort;
        this.pass = String.valueOf(Math.abs(new Random().nextLong()));
        try {
            this.hostA = InetAddress.getByName(hostA);
            this.hostB = InetAddress.getByName(hostB);

            this.portA = portA;
            this.portB = portB;

Matt Tucker's avatar
Matt Tucker committed
109
            this.localAddress = InetAddress.getByName(localAddress);
110
            this.localPortA = getFreePort();
Matt Tucker's avatar
Matt Tucker committed
111 112
            this.socketA = new DatagramSocket(localPortA, this.localAddress);
            this.socketAControl = new DatagramSocket(localPortA + 1, this.localAddress);
113
            this.localPortB = getFreePort();
Matt Tucker's avatar
Matt Tucker committed
114 115
            this.socketB = new DatagramSocket(localPortB, this.localAddress);
            this.socketBControl = new DatagramSocket(localPortB + 1, this.localAddress);
116
            if (Log.isDebugEnabled()) {
117
                Log.debug("MediaProxySession: Session Created at: A " + localPortA + " : B " + localPortB);
118
            }
Matt Tucker's avatar
Matt Tucker committed
119 120
        }
        catch (Exception e) {
121
            Log.error(e.getMessage(), e);
122 123 124 125 126 127 128 129 130 131 132
        }
    }

    /**
     * Obtain a free port with a nested control port we can use.
     *
     * @return A free port number.
     */
    protected int getFreePort() {
        ServerSocket ss;
        int freePort = 0;
Thiago Camargo's avatar
Thiago Camargo committed
133
        int controlPort;
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148

        for (int i = 0; i < 10; i++) {
            freePort = (int) (minPort + Math.round(Math.random() * (maxPort - minPort)));
            freePort = freePort % 2 == 0 ? freePort : freePort + 1;
            try {
                ss = new ServerSocket(freePort);
                freePort = ss.getLocalPort();
                ss.close();
                ss = new ServerSocket(freePort + 1);
                controlPort = ss.getLocalPort();
                ss.close();
                if (controlPort == (freePort + 1))
                    return freePort;
            }
            catch (IOException e) {
149
                Log.error(e.getMessage(), e);
150 151 152 153 154 155 156 157
            }
        }
        try {
            ss = new ServerSocket(0);
            freePort = ss.getLocalPort();
            ss.close();
        }
        catch (IOException e) {
158
            Log.error(e.getMessage(), e);
159 160 161 162 163 164 165 166 167
        } finally {
            ss = null;
        }
        return freePort;
    }

    /**
     * Get the ID of the Session
     *
Thiago Camargo's avatar
Thiago Camargo committed
168
     * @return the ID of the session
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
     */
    public String getSID() {
        return id;
    }

    /**
     * Get the pass of this Session
     * A pass can be used to authorize an Session modification
     */
    public String getPass() {
        return pass;
    }

    /**
     * Get the agent creator.
     * This field is open to MediaProxy users and just can be set in constructor.
     *
Thiago Camargo's avatar
Thiago Camargo committed
186
     * @return the session creator name or description
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
     */
    public String getCreator() {
        return creator;
    }

    /**
     * Get last packet arrived timestamp
     *
     * @return TimeStamp in Millis
     */
    public long getTimestamp() {
        return timestamp;
    }

    /**
     * Thread override method
     */
204 205
    @Override
	public void run() {
206 207
        // Create channels for parties
        createChannels();
208

209
        // Start a thread for each channel
210 211 212 213 214 215 216 217 218 219
        threadAtoB = new Thread(channelAtoB);
        threadAtoBControl = new Thread(channelAtoBControl);
        threadBtoA = new Thread(channelBtoA);
        threadBtoAControl = new Thread(channelBtoAControl);

        threadAtoB.start();
        threadAtoBControl.start();
        threadBtoA.start();
        threadBtoAControl.start();

220 221 222 223 224 225 226 227 228 229 230 231 232 233
        // Listen to channel events
        addChannelListeners();
    }

    /**
     * Creates 4 new channels for the two entities. We will create a channel between A and B and vice versa
     * and also a control channel betwwen A and B and vice versa.
     */
    abstract void createChannels();

    /**
     * Adds listener to channel events like receiving data.
     */
    void addChannelListeners() {
234 235 236 237 238 239 240 241 242 243 244 245
        channelAtoB.addListener(this);
        channelAtoBControl.addListener(this);
        channelBtoA.addListener(this);
        channelBtoAControl.addListener(this);
    }

    /**
     * Stop the Session
     */
    public void stopAgent() {

        try {
Matt Tucker's avatar
Matt Tucker committed
246 247 248 249
            if (idleTimer != null) {
                idleTimer.cancel();
                idleTimer.purge();
                idleTimer = null;
250 251
            }
        } catch (Exception e) {
252
            Log.error(e.getMessage(), e);
253 254
        }

255 256 257 258 259 260 261
        try {
            if (lifeTimer != null) {
                lifeTimer.cancel();
                lifeTimer.purge();
                lifeTimer = null;
            }
        } catch (Exception e) {
262
            Log.error(e.getMessage(), e);
263 264
        }

Thiago Camargo's avatar
Thiago Camargo committed
265 266 267 268
        channelAtoB.removeListeners();
        channelAtoBControl.removeListeners();
        channelBtoA.removeListeners();
        channelBtoAControl.removeListeners();
269 270

        try {
271 272 273 274
            channelAtoB.cancel();
            channelAtoBControl.cancel();
            channelBtoA.cancel();
            channelBtoAControl.cancel();
275
        } catch (Exception e) {
276
            Log.error(e.getMessage(), e);
277 278 279 280 281 282 283 284 285
        }

        socketA.close();
        socketAControl.close();
        socketB.close();
        socketBControl.close();

        dispatchAgentStopped();

286
        Log.debug("MediaProxySession: Session Stopped");
287 288 289 290 291
    }

    /**
     * Get localhost of the Session
     *
Thiago Camargo's avatar
Thiago Camargo committed
292
     * @return the localhost of the session
293 294
     */
    public InetAddress getLocalhost() {
Matt Tucker's avatar
Matt Tucker committed
295
        return localAddress;
296 297 298 299 300
    }

    /**
     * Get the Host A IP
     *
Thiago Camargo's avatar
Thiago Camargo committed
301
     * @return the host A ip
302 303 304 305 306 307 308 309
     */
    public InetAddress getHostA() {
        return hostA;
    }

    /**
     * Get the Host B IP
     *
Thiago Camargo's avatar
Thiago Camargo committed
310
     * @return the host B ip
311 312 313 314 315 316 317 318
     */
    public InetAddress getHostB() {
        return hostB;
    }

    /**
     * Set port A value
     *
Thiago Camargo's avatar
Thiago Camargo committed
319
     * @param portA the port number for A
320 321
     */
    public void setPortA(int portA) {
322
        if (Log.isDebugEnabled()) {
323
            Log.debug("MediaProxySession: PORT CHANGED(A):" + portA);
324
        }
325 326 327 328 329 330
        this.portA = portA;
    }

    /**
     * Set port B value
     *
Thiago Camargo's avatar
Thiago Camargo committed
331
     * @param portB the port number for B
332 333
     */
    public void setPortB(int portB) {
334
        if (Log.isDebugEnabled()) {
335
            Log.debug("MediaProxySession: PORT CHANGED(B):" + portB);
336
        }
337 338 339 340 341 342
        this.portB = portB;
    }

    /**
     * Set the Host A IP
     *
Thiago Camargo's avatar
Thiago Camargo committed
343
     * @param hostA the host for A
344 345 346 347 348 349 350 351
     */
    public void setHostA(InetAddress hostA) {
        this.hostA = hostA;
    }

    /**
     * Set the Host B IP
     *
Thiago Camargo's avatar
Thiago Camargo committed
352
     * @param hostB the host for B
353 354 355 356 357 358 359 360
     */
    public void setHostB(InetAddress hostB) {
        this.hostB = hostB;
    }

    /**
     * Get the Port A IP
     *
361
     * @return the port for A
362 363 364 365 366 367 368 369
     */
    public int getPortA() {
        return portA;
    }

    /**
     * Get the Port B IP
     *
Thiago Camargo's avatar
Thiago Camargo committed
370
     * @return the port for B
371 372 373 374 375 376 377 378
     */
    public int getPortB() {
        return portB;
    }

    /**
     * Get the localport that listen for Host A Packets
     *
Thiago Camargo's avatar
Thiago Camargo committed
379
     * @return the local port for A
380 381 382 383 384 385 386 387
     */
    public int getLocalPortA() {
        return localPortA;
    }

    /**
     * Get the localport that listen for Host B Packets
     *
Thiago Camargo's avatar
Thiago Camargo committed
388
     * @return the local port for B
389 390 391 392 393 394 395 396 397 398 399 400
     */
    public int getLocalPortB() {
        return localPortB;
    }

    public void sendFromPortA(String host, int port) {
        try {
            InetAddress address = InetAddress.getByName(host);
            channelAtoB.setHost(address);
            channelAtoB.setPort(port);
            channelAtoBControl.setHost(address);
            channelAtoBControl.setPort(port + 1);
Matt Tucker's avatar
Matt Tucker committed
401 402
        }
        catch (Exception e) {
403
            Log.error(e.getMessage(), e);
404 405 406 407 408 409 410 411 412 413
        }
    }

    public void sendFromPortB(String host, int port) {
        try {
            InetAddress address = InetAddress.getByName(host);
            channelBtoA.setHost(address);
            channelBtoA.setPort(port);
            channelBtoAControl.setHost(address);
            channelBtoAControl.setPort(port + 1);
Matt Tucker's avatar
Matt Tucker committed
414 415
        }
        catch (Exception e) {
416
            Log.error(e.getMessage(), e);
417 418 419 420 421 422 423 424 425
        }
    }

    /**
     * Implement DatagramListener to timestamp last packet arrived
     *
     * @param datagramPacket
     */
    public boolean datagramReceived(DatagramPacket datagramPacket) {
Matt Tucker's avatar
Matt Tucker committed
426
        timestamp = System.currentTimeMillis();
427 428 429 430 431
        return true;
    }

    /**
     * Add a keep alive detector.
432 433
     * If the packet still more than the keep alive delay without receiving any packets. The Session is
     * stoped and remove from agents List.
434 435 436 437
     *
     * @param delay delay time in millis to check if the channel is inactive
     */
    void addKeepAlive(long delay) {
Matt Tucker's avatar
Matt Tucker committed
438 439 440
        if (idleTimer != null) return;
        idleTimer = new Timer();
        idleTimer.scheduleAtFixedRate(new TimerTask() {
441 442
            long lastTimeStamp = getTimestamp();

443 444
            @Override
			public void run() {
445 446 447 448 449 450 451 452 453
                if (lastTimeStamp == getTimestamp()) {
                    stopAgent();
                    return;
                }
                lastTimeStamp = getTimestamp();
            }
        }, delay, delay);
    }

454 455 456 457 458 459 460 461 462 463 464 465
    /**
     * Add a limited life time to the Session.
     * The Session is stoped and remove from agents List after a certain time.
     * Prevents that network cycles, refreshes a Session forever.
     *
     * @param lifetime time in Seconds to kill the Session
     */
    void addLifeTime(long lifetime) {
        lifetime *= 1000;
        if (lifeTimer != null) return;
        lifeTimer = new Timer();
        lifeTimer.scheduleAtFixedRate(new TimerTask() {
466 467
            @Override
			public void run() {
468 469 470 471 472
                stopAgent();
            }
        }, lifetime, lifetime);
    }

473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501
    /**
     * Adds a listener for Session events
     *
     * @param sessionListener
     */
    public void addAgentListener(SessionListener sessionListener) {
        sessionListeners.add(sessionListener);
    }

    /**
     * Removes an Session events listener
     *
     * @param sessionListener
     */
    public void removeAgentListener(SessionListener sessionListener) {
        sessionListeners.remove(sessionListener);
    }

    /**
     * Removes every Session events listeners
     */
    public void clearAgentListeners() {
        sessionListeners.clear();
    }

    /**
     * Dispatch Stop Event
     */
    public void dispatchAgentStopped() {
502
        for (SessionListener sessionListener : sessionListeners) {
503
            try {
Matt Tucker's avatar
Matt Tucker committed
504
                sessionListener.sessionClosed(this);
505
            } catch (Exception e) {
506
                Log.error(e.getMessage(), e);
507 508 509
            }
        }
    }
Matt Tucker's avatar
Matt Tucker committed
510
}