/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.nio.tcp;

import com.hazelcast.cluster.BindOperation;
import com.hazelcast.config.SocketInterceptorConfig;
import com.hazelcast.instance.NodeInitializer;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.ConnectionListener;
import com.hazelcast.nio.ConnectionManager;
import com.hazelcast.nio.IOService;
import com.hazelcast.nio.IOUtil;
import com.hazelcast.nio.MemberSocketInterceptor;
import com.hazelcast.nio.Packet;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.PortableContext;
import com.hazelcast.nio.tcp.IOSelector;
import com.hazelcast.nio.tcp.IOSelectorOutOfMemoryHandler;
import com.hazelcast.nio.tcp.InSelectorImpl;
import com.hazelcast.nio.tcp.OutSelectorImpl;
import com.hazelcast.nio.tcp.PacketReader;
import com.hazelcast.nio.tcp.PacketWriter;
import com.hazelcast.nio.tcp.SocketAcceptor;
import com.hazelcast.nio.tcp.SocketChannelWrapper;
import com.hazelcast.nio.tcp.SocketChannelWrapperFactory;
import com.hazelcast.nio.tcp.SocketConnector;
import com.hazelcast.nio.tcp.TcpIpConnection;
import com.hazelcast.nio.tcp.TcpIpConnectionMonitor;
import com.hazelcast.util.ConcurrencyUtil;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.util.executor.StripedRunnable;
import java.io.IOException;
import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

public class TcpIpConnectionManager
implements ConnectionManager {
    private static final int DEFAULT_KILL_THREAD_MILLIS = 10000;
    final int socketReceiveBufferSize;
    final IOService ioService;
    final int socketSendBufferSize;
    private final ConstructorFunction<Address, TcpIpConnectionMonitor> monitorConstructor = new ConstructorFunction<Address, TcpIpConnectionMonitor>(){

        @Override
        public TcpIpConnectionMonitor createNew(Address endpoint) {
            return new TcpIpConnectionMonitor(TcpIpConnectionManager.this, endpoint);
        }
    };
    private final ILogger logger;
    private final int socketLingerSeconds;
    private final boolean socketKeepAlive;
    private final boolean socketNoDelay;
    private final ConcurrentMap<Address, Connection> connectionsMap = new ConcurrentHashMap<Address, Connection>(100);
    private final ConcurrentMap<Address, TcpIpConnectionMonitor> monitors = new ConcurrentHashMap<Address, TcpIpConnectionMonitor>(100);
    private final Set<Address> connectionsInProgress = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Set<ConnectionListener> connectionListeners = new CopyOnWriteArraySet<ConnectionListener>();
    private final Set<SocketChannelWrapper> acceptedSockets = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Set<TcpIpConnection> activeConnections = Collections.newSetFromMap(new ConcurrentHashMap());
    private final AtomicInteger allTextConnections = new AtomicInteger();
    private final AtomicInteger connectionIdGen = new AtomicInteger();
    private volatile boolean live;
    private final ServerSocketChannel serverSocketChannel;
    private final int selectorThreadCount;
    private final IOSelector[] inSelectors;
    private final IOSelector[] outSelectors;
    private final AtomicInteger nextSelectorIndex = new AtomicInteger();
    private final SocketChannelWrapperFactory socketChannelWrapperFactory;
    private final int outboundPortCount;
    private final LinkedList<Integer> outboundPorts = new LinkedList();
    private final PortableContext portableContext;
    private volatile Thread socketAcceptorThread;
    private final NodeInitializer initializer;

    public TcpIpConnectionManager(IOService ioService, ServerSocketChannel serverSocketChannel, NodeInitializer initializer) {
        this.initializer = initializer;
        this.ioService = ioService;
        this.serverSocketChannel = serverSocketChannel;
        this.logger = ioService.getLogger(TcpIpConnectionManager.class.getName());
        this.socketReceiveBufferSize = ioService.getSocketReceiveBufferSize() * 1024;
        this.socketSendBufferSize = ioService.getSocketSendBufferSize() * 1024;
        this.socketLingerSeconds = ioService.getSocketLingerSeconds();
        this.socketKeepAlive = ioService.getSocketKeepAlive();
        this.socketNoDelay = ioService.getSocketNoDelay();
        this.selectorThreadCount = ioService.getSelectorThreadCount();
        this.inSelectors = new IOSelector[this.selectorThreadCount];
        this.outSelectors = new IOSelector[this.selectorThreadCount];
        Collection<Integer> ports = ioService.getOutboundPorts();
        int n = this.outboundPortCount = ports == null ? 0 : ports.size();
        if (ports != null) {
            this.outboundPorts.addAll(ports);
        }
        this.socketChannelWrapperFactory = initializer.getSocketChannelWrapperFactory();
        this.portableContext = ioService.getPortableContext();
    }

    public void interceptSocket(Socket socket, boolean onAccept) throws IOException {
        if (!this.isSocketInterceptorEnabled()) {
            return;
        }
        MemberSocketInterceptor memberSocketInterceptor = this.initializer.getMemberSocketInterceptor();
        if (memberSocketInterceptor == null) {
            return;
        }
        if (onAccept) {
            memberSocketInterceptor.onAccept(socket);
        } else {
            memberSocketInterceptor.onConnect(socket);
        }
    }

    public boolean isSocketInterceptorEnabled() {
        SocketInterceptorConfig socketInterceptorConfig = this.ioService.getSocketInterceptorConfig();
        return socketInterceptorConfig != null && socketInterceptorConfig.isEnabled();
    }

    public PacketReader createPacketReader(TcpIpConnection connection) {
        return this.initializer.createPacketReader(connection, this.ioService);
    }

    public PacketWriter createPacketWriter(TcpIpConnection connection) {
        return this.initializer.createPacketWriter(connection, this.ioService);
    }

    @Override
    public int getActiveConnectionCount() {
        return this.activeConnections.size();
    }

    @Override
    public int getAllTextConnections() {
        return this.allTextConnections.get();
    }

    @Override
    public int getConnectionCount() {
        return this.connectionsMap.size();
    }

    public boolean isSSLEnabled() {
        return this.socketChannelWrapperFactory.isSSlEnabled();
    }

    public void incrementTextConnections() {
        this.allTextConnections.incrementAndGet();
    }

    public PortableContext getPortableContext() {
        return this.portableContext;
    }

    public IOService getIOHandler() {
        return this.ioService;
    }

    @Override
    public void addConnectionListener(ConnectionListener listener) {
        this.connectionListeners.add(listener);
    }

    public boolean bind(TcpIpConnection connection, Address remoteEndPoint, Address localEndpoint, boolean reply) {
        if (this.logger.isFinestEnabled()) {
            this.log(Level.FINEST, "Binding " + connection + " to " + remoteEndPoint + ", reply is " + reply);
        }
        Address thisAddress = this.ioService.getThisAddress();
        if (!connection.isClient() && !thisAddress.equals(localEndpoint)) {
            this.log(Level.WARNING, "Wrong bind request from " + remoteEndPoint + "! This node is not requested endpoint: " + localEndpoint);
            connection.close();
            return false;
        }
        connection.setEndPoint(remoteEndPoint);
        if (reply) {
            this.sendBindRequest(connection, remoteEndPoint, false);
        }
        if (this.checkAlreadyConnected(connection, remoteEndPoint)) {
            return false;
        }
        return this.registerConnection(remoteEndPoint, connection);
    }

    @Override
    public boolean registerConnection(final Address remoteEndPoint, final Connection connection) {
        if (remoteEndPoint.equals(this.ioService.getThisAddress())) {
            return false;
        }
        if (connection instanceof TcpIpConnection) {
            TcpIpConnection tcpConnection = (TcpIpConnection)connection;
            Address currentEndPoint = tcpConnection.getEndPoint();
            if (currentEndPoint != null && !currentEndPoint.equals(remoteEndPoint)) {
                throw new IllegalArgumentException(connection + " has already a different endpoint than: " + remoteEndPoint);
            }
            tcpConnection.setEndPoint(remoteEndPoint);
            if (!connection.isClient()) {
                TcpIpConnectionMonitor connectionMonitor = this.getConnectionMonitor(remoteEndPoint, true);
                tcpConnection.setMonitor(connectionMonitor);
            }
        }
        this.connectionsMap.put(remoteEndPoint, connection);
        this.connectionsInProgress.remove(remoteEndPoint);
        this.ioService.getEventService().executeEventCallback(new StripedRunnable(){

            @Override
            public void run() {
                for (ConnectionListener listener : TcpIpConnectionManager.this.connectionListeners) {
                    listener.connectionAdded(connection);
                }
            }

            @Override
            public int getKey() {
                return remoteEndPoint.hashCode();
            }
        });
        return true;
    }

    private boolean checkAlreadyConnected(TcpIpConnection connection, Address remoteEndPoint) {
        Connection existingConnection = (Connection)this.connectionsMap.get(remoteEndPoint);
        if (existingConnection != null && existingConnection.live()) {
            if (existingConnection != connection) {
                if (this.logger.isFinestEnabled()) {
                    this.log(Level.FINEST, existingConnection + " is already bound to " + remoteEndPoint + ", new one is " + connection);
                }
                this.activeConnections.add(connection);
            }
            return true;
        }
        return false;
    }

    void sendBindRequest(TcpIpConnection connection, Address remoteEndPoint, boolean replyBack) {
        connection.setEndPoint(remoteEndPoint);
        BindOperation bind = new BindOperation(this.ioService.getThisAddress(), remoteEndPoint, replyBack);
        Data bindData = this.ioService.toData(bind);
        Packet packet = new Packet(bindData, this.portableContext);
        packet.setHeader(0);
        connection.write(packet);
    }

    private int nextSelectorIndex() {
        return Math.abs(this.nextSelectorIndex.getAndIncrement()) % this.selectorThreadCount;
    }

    SocketChannelWrapper wrapSocketChannel(SocketChannel socketChannel, boolean client) throws Exception {
        SocketChannelWrapper wrapper = this.socketChannelWrapperFactory.wrapSocketChannel(socketChannel, client);
        this.acceptedSockets.add(wrapper);
        return wrapper;
    }

    TcpIpConnection assignSocketChannel(SocketChannelWrapper channel, Address endpoint) {
        int index = this.nextSelectorIndex();
        TcpIpConnection connection = new TcpIpConnection(this, this.inSelectors[index], this.outSelectors[index], this.connectionIdGen.incrementAndGet(), channel);
        connection.setEndPoint(endpoint);
        this.activeConnections.add(connection);
        this.acceptedSockets.remove(channel);
        connection.getReadHandler().register();
        this.log(Level.INFO, "Established socket connection between " + channel.socket().getLocalSocketAddress() + " and " + channel.socket().getRemoteSocketAddress());
        return connection;
    }

    void failedConnection(Address address, Throwable t, boolean silent) {
        this.connectionsInProgress.remove(address);
        this.ioService.onFailedConnection(address);
        if (!silent) {
            this.getConnectionMonitor(address, false).onError(t);
        }
    }

    @Override
    public Connection getConnection(Address address) {
        return (Connection)this.connectionsMap.get(address);
    }

    @Override
    public Connection getOrConnect(Address address) {
        return this.getOrConnect(address, false);
    }

    @Override
    public Connection getOrConnect(Address address, boolean silent) {
        Connection connection = (Connection)this.connectionsMap.get(address);
        if (connection == null && this.live && this.connectionsInProgress.add(address)) {
            this.ioService.shouldConnectTo(address);
            this.ioService.executeAsync(new SocketConnector(this, address, silent));
        }
        return connection;
    }

    private TcpIpConnectionMonitor getConnectionMonitor(Address endpoint, boolean reset) {
        TcpIpConnectionMonitor monitor = ConcurrencyUtil.getOrPutIfAbsent(this.monitors, endpoint, this.monitorConstructor);
        if (reset) {
            monitor.reset();
        }
        return monitor;
    }

    @Override
    public void destroyConnection(final Connection connection) {
        if (connection == null) {
            return;
        }
        if (this.logger.isFinestEnabled()) {
            this.log(Level.FINEST, "Destroying " + connection);
        }
        this.activeConnections.remove(connection);
        final Address endPoint = connection.getEndPoint();
        if (endPoint != null) {
            this.connectionsInProgress.remove(endPoint);
            Connection existingConn = (Connection)this.connectionsMap.get(endPoint);
            if (existingConn == connection && this.live) {
                this.connectionsMap.remove(endPoint);
                this.ioService.getEventService().executeEventCallback(new StripedRunnable(){

                    @Override
                    public void run() {
                        for (ConnectionListener listener : TcpIpConnectionManager.this.connectionListeners) {
                            listener.connectionRemoved(connection);
                        }
                    }

                    @Override
                    public int getKey() {
                        return endPoint.hashCode();
                    }
                });
            }
        }
        if (connection.live()) {
            connection.close();
        }
    }

    protected void initSocket(Socket socket) throws Exception {
        if (this.socketLingerSeconds > 0) {
            socket.setSoLinger(true, this.socketLingerSeconds);
        }
        socket.setKeepAlive(this.socketKeepAlive);
        socket.setTcpNoDelay(this.socketNoDelay);
        socket.setReceiveBufferSize(this.socketReceiveBufferSize);
        socket.setSendBufferSize(this.socketSendBufferSize);
    }

    @Override
    public synchronized void start() {
        if (this.live) {
            return;
        }
        this.live = true;
        this.log(Level.FINEST, "Starting ConnectionManager and IO selectors.");
        IOSelectorOutOfMemoryHandler oomeHandler = new IOSelectorOutOfMemoryHandler(){

            @Override
            public void handle(OutOfMemoryError error) {
                TcpIpConnectionManager.this.ioService.onOutOfMemory(error);
            }
        };
        for (int i = 0; i < this.inSelectors.length; ++i) {
            this.inSelectors[i] = new InSelectorImpl(this.ioService.getThreadGroup(), this.ioService.getThreadPrefix() + "in-" + i, this.ioService.getLogger(InSelectorImpl.class.getName()), oomeHandler);
            this.outSelectors[i] = new OutSelectorImpl(this.ioService.getThreadGroup(), this.ioService.getThreadPrefix() + "out-" + i, this.ioService.getLogger(OutSelectorImpl.class.getName()), oomeHandler);
            this.inSelectors[i].start();
            this.outSelectors[i].start();
        }
        if (this.socketAcceptorThread != null) {
            this.logger.warning("SocketAcceptor thread is already live! Shutting down old acceptor...");
            this.shutdownSocketAcceptor();
        }
        SocketAcceptor acceptRunnable = new SocketAcceptor(this.serverSocketChannel, this);
        this.socketAcceptorThread = new Thread(this.ioService.getThreadGroup(), acceptRunnable, this.ioService.getThreadPrefix() + "Acceptor");
        this.socketAcceptorThread.start();
    }

    @Override
    public synchronized void restart() {
        this.stop();
        this.start();
    }

    @Override
    public synchronized void shutdown() {
        if (!this.live) {
            return;
        }
        this.live = false;
        this.shutdownSocketAcceptor();
        this.closeServerSocket();
        this.stop();
        this.connectionListeners.clear();
    }

    private void closeServerSocket() {
        try {
            if (this.logger.isFinestEnabled()) {
                this.log(Level.FINEST, "Closing server socket channel: " + this.serverSocketChannel);
            }
            this.serverSocketChannel.close();
        }
        catch (IOException ignore) {
            this.logger.finest(ignore);
        }
    }

    private void stop() {
        this.live = false;
        this.log(Level.FINEST, "Stopping ConnectionManager");
        this.shutdownSocketAcceptor();
        for (SocketChannelWrapper socketChannelWrapper : this.acceptedSockets) {
            IOUtil.closeResource(socketChannelWrapper);
        }
        for (Connection connection : this.connectionsMap.values()) {
            try {
                this.destroyConnection(connection);
            }
            catch (Throwable ignore) {
                this.logger.finest(ignore);
            }
        }
        for (TcpIpConnection tcpIpConnection : this.activeConnections) {
            try {
                this.destroyConnection(tcpIpConnection);
            }
            catch (Throwable ignore) {
                this.logger.finest(ignore);
            }
        }
        this.shutdownIOSelectors();
        this.acceptedSockets.clear();
        this.connectionsInProgress.clear();
        this.connectionsMap.clear();
        this.monitors.clear();
        this.activeConnections.clear();
    }

    private synchronized void shutdownIOSelectors() {
        if (this.logger.isFinestEnabled()) {
            this.log(Level.FINEST, "Shutting down IO selectors... Total: " + this.selectorThreadCount);
        }
        for (int i = 0; i < this.selectorThreadCount; ++i) {
            IOSelector ioSelector = this.inSelectors[i];
            if (ioSelector != null) {
                ioSelector.shutdown();
            }
            this.inSelectors[i] = null;
            ioSelector = this.outSelectors[i];
            if (ioSelector != null) {
                ioSelector.shutdown();
            }
            this.outSelectors[i] = null;
        }
    }

    private void shutdownSocketAcceptor() {
        this.log(Level.FINEST, "Shutting down SocketAcceptor thread.");
        Thread killingThread = this.socketAcceptorThread;
        if (killingThread == null) {
            return;
        }
        this.socketAcceptorThread = null;
        killingThread.interrupt();
        try {
            killingThread.join(10000L);
        }
        catch (InterruptedException e) {
            this.logger.finest(e);
        }
    }

    @Override
    public int getCurrentClientConnections() {
        int count = 0;
        for (TcpIpConnection conn : this.activeConnections) {
            if (!conn.live() || !conn.isClient()) continue;
            ++count;
        }
        return count;
    }

    public boolean isLive() {
        return this.live;
    }

    private void log(Level level, String message) {
        this.logger.log(level, message);
    }

    boolean useAnyOutboundPort() {
        return this.outboundPortCount == 0;
    }

    int getOutboundPortCount() {
        return this.outboundPortCount;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int acquireOutboundPort() {
        if (this.useAnyOutboundPort()) {
            return 0;
        }
        LinkedList<Integer> linkedList = this.outboundPorts;
        synchronized (linkedList) {
            Integer port = this.outboundPorts.removeFirst();
            this.outboundPorts.addLast(port);
            return port;
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("Connections {");
        for (Connection conn : this.connectionsMap.values()) {
            sb.append("\n");
            sb.append(conn);
        }
        sb.append("\nlive=");
        sb.append(this.live);
        sb.append("\n}");
        return sb.toString();
    }
}

