NonBlockingAcceptingMode.java 5.59 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
/**
 * $RCSfile$
 * $Revision: $
 * $Date: $
 *
 * Copyright (C) 2006 Jive Software. All rights reserved.
 *
 * This software is published under the terms of the GNU Public License (GPL),
 * a copy of which is included in this distribution.
 */

package org.jivesoftware.wildfire.net;

import org.jivesoftware.util.LocaleUtils;
import org.jivesoftware.util.Log;
import org.jivesoftware.wildfire.ConnectionManager;
import org.jivesoftware.wildfire.ServerPort;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Accepts new socket connections using a non-blocking model. A single selector is
 * used for all connected clients and also for accepting new connections.
 *
 * @author Daniele Piras
 */
class NonBlockingAcceptingMode extends SocketAcceptingMode {

    // Time (in ms) to sleep from a reading-cycle to another
    private static final long CYCLE_TIME = 10;

    // Selector to collect messages from client connections.
    private Selector selector;

    protected NonBlockingAcceptingMode(ConnectionManager connManager, ServerPort serverPort,
            InetAddress bindInterface) throws IOException {
        super(connManager, serverPort);

        // Chaning server to use NIO
        // Open selector...
        selector = Selector.open();
        // Create a new ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // Retrieve socket and bind socket with specified address
        this.serverSocket = serverSocketChannel.socket();
        this.serverSocket.bind(new InetSocketAddress(bindInterface, serverPort.getPort()));
        // Configure Blocking to unblocking
        serverSocketChannel.configureBlocking(false);
        // Registering connection with selector.
        SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        AcceptConnection acceptConnection = new AcceptConnection();
        sk.attach(acceptConnection);
    }

    /**
     * DANIELE:
     * This thread use the selector NIO features to retrieve client connections
     * and messages.
     */
    public void run() {
        while (notTerminated && !Thread.interrupted()) {
            try {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext()) {
                    SelectionKey key = (SelectionKey) it.next();
                    it.remove();
                    SelectorAction action = (SelectorAction) key.attachment();
                    if (action == null) {
                        continue;
                    }
                    if (key.isAcceptable()) {
                        action.connect(key);
                    }
                    else if (key.isReadable()) {
                        action.read(key);
                    }
                }
                Thread.sleep(CYCLE_TIME);
            }
            catch (IOException ie) {
                if (notTerminated) {
                    Log.error(LocaleUtils.getLocalizedString("admin.error.accept"),
                            ie);
                }
            }
            catch (Exception e) {
                Log.error(LocaleUtils.getLocalizedString("admin.error.accept"), e);
            }
        }
    }

    /*
    * InnerClass that is use when a new client arrive.
    * It's use the reactor pattern to register an abstract action
    * to the selector.
    */
    class AcceptConnection implements SelectorAction {


        public void read(SelectionKey key) throws IOException {
        }

        /*
        * A client arrive...
        */
        public void connect(SelectionKey key) throws IOException {
            // Retrieve the server socket channel...
            ServerSocketChannel sChannel = (ServerSocketChannel) key.channel();
            // Accept the connection
            SocketChannel socketChannel = sChannel.accept();
            // Retrieve socket for incoming connection
            Socket sock = socketChannel.socket();
            socketChannel.configureBlocking(false);
            // Registering READING operation into the selector
            SelectionKey sockKey = socketChannel.register(selector, SelectionKey.OP_READ);
            if (sock != null) {
                System.out.println("Connect " + sock.toString());
                Log.debug("Connect " + sock.toString());
                try {
                    SocketReader reader =
                            connManager.createSocketReader(sock, false, serverPort, false);
                    SelectorAction action = new ReadAction(reader);
                    sockKey.attach(action);
                }
                catch (Exception e) {
                    // There is an exception...
                    Log.error(LocaleUtils.getLocalizedString("admin.error.accept"), e);
                }
            }
        }
    }

    class ReadAction implements SelectorAction {

        SocketReader reader;

        public ReadAction(SocketReader reader) {
            this.reader = reader;
        }

        public void read(SelectionKey key) throws IOException {
            // Socket reader (using non-blocking mode) will read the stream and process, in
            // another thread, any number of stanzas found in the stream.
            reader.run();
        }

        public void connect(SelectionKey key) throws IOException {
        }
    }
}