/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.internal.networking.nio;

import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.networking.Channel;
import com.hazelcast.internal.networking.ChannelCloseListener;
import com.hazelcast.internal.networking.ChannelErrorHandler;
import com.hazelcast.internal.networking.ChannelInitializer;
import com.hazelcast.internal.networking.EventLoopGroup;
import com.hazelcast.internal.networking.nio.NioChannel;
import com.hazelcast.internal.networking.nio.NioChannelReader;
import com.hazelcast.internal.networking.nio.NioChannelWriter;
import com.hazelcast.internal.networking.nio.NioThread;
import com.hazelcast.internal.networking.nio.SelectorMode;
import com.hazelcast.internal.networking.nio.iobalancer.IOBalancer;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingService;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.HashUtil;
import com.hazelcast.util.Preconditions;
import com.hazelcast.util.ThreadUtil;
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

public class NioEventLoopGroup
implements EventLoopGroup {
    private volatile NioThread[] inputThreads;
    private volatile NioThread[] outputThreads;
    private final AtomicInteger nextInputThreadIndex = new AtomicInteger();
    private final AtomicInteger nextOutputThreadIndex = new AtomicInteger();
    private final ILogger logger;
    private final MetricsRegistry metricsRegistry;
    private final LoggingService loggingService;
    private final String hzName;
    private final ChannelErrorHandler errorHandler;
    private final int balanceIntervalSeconds;
    private final ChannelInitializer channelInitializer;
    private final int inputThreadCount;
    private final int outputThreadCount;
    private final Set<NioChannel> channels = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ChannelCloseListener channelCloseListener = new ChannelCloseListenerImpl();
    private SelectorMode selectorMode;
    private BackoffIdleStrategy idleStrategy;
    private volatile IOBalancer ioBalancer;
    private boolean selectorWorkaroundTest = Boolean.getBoolean("hazelcast.io.selector.workaround.test");

    public NioEventLoopGroup(LoggingService loggingService, MetricsRegistry metricsRegistry, String hzName, ChannelErrorHandler errorHandler, int inputThreadCount, int outputThreadCount, int balanceIntervalSeconds, ChannelInitializer channelInitializer) {
        this.hzName = hzName;
        this.metricsRegistry = metricsRegistry;
        this.loggingService = loggingService;
        this.inputThreadCount = inputThreadCount;
        this.outputThreadCount = outputThreadCount;
        this.logger = loggingService.getLogger(NioEventLoopGroup.class);
        this.errorHandler = errorHandler;
        this.balanceIntervalSeconds = balanceIntervalSeconds;
        this.channelInitializer = channelInitializer;
    }

    private SelectorMode getSelectorMode() {
        if (this.selectorMode == null) {
            this.selectorMode = SelectorMode.getConfiguredValue();
            String selectorModeString = SelectorMode.getConfiguredString();
            if (selectorModeString.startsWith("selectnow,")) {
                this.idleStrategy = BackoffIdleStrategy.createBackoffIdleStrategy(selectorModeString);
            }
        }
        return this.selectorMode;
    }

    public void setSelectorMode(SelectorMode mode) {
        this.selectorMode = mode;
    }

    void setSelectorWorkaroundTest(boolean selectorWorkaroundTest) {
        this.selectorWorkaroundTest = selectorWorkaroundTest;
    }

    @SuppressFBWarnings(value={"EI_EXPOSE_REP"}, justification="used only for testing")
    public NioThread[] getInputThreads() {
        return this.inputThreads;
    }

    @SuppressFBWarnings(value={"EI_EXPOSE_REP"}, justification="used only for testing")
    public NioThread[] getOutputThreads() {
        return this.outputThreads;
    }

    public IOBalancer getIOBalancer() {
        return this.ioBalancer;
    }

    @Override
    public void start() {
        NioThread thread;
        int i;
        if (this.logger.isFineEnabled()) {
            this.logger.fine("TcpIpConnectionManager configured with Non Blocking IO-threading model: " + this.inputThreadCount + " input threads and " + this.outputThreadCount + " output threads");
        }
        this.logger.log(this.getSelectorMode() != SelectorMode.SELECT ? Level.INFO : Level.FINE, "IO threads selector mode is " + (Object)((Object)this.getSelectorMode()));
        this.inputThreads = new NioThread[this.inputThreadCount];
        for (i = 0; i < this.inputThreads.length; ++i) {
            thread = new NioThread(ThreadUtil.createThreadPoolName(this.hzName, "IO") + "in-" + i, this.loggingService.getLogger(NioThread.class), this.errorHandler, this.selectorMode, this.idleStrategy);
            thread.id = i;
            thread.setSelectorWorkaroundTest(this.selectorWorkaroundTest);
            this.inputThreads[i] = thread;
            this.metricsRegistry.scanAndRegister(thread, "tcp.inputThread[" + thread.getName() + "]");
            thread.start();
        }
        this.outputThreads = new NioThread[this.outputThreadCount];
        for (i = 0; i < this.outputThreads.length; ++i) {
            thread = new NioThread(ThreadUtil.createThreadPoolName(this.hzName, "IO") + "out-" + i, this.loggingService.getLogger(NioThread.class), this.errorHandler, this.selectorMode, this.idleStrategy);
            thread.id = i;
            thread.setSelectorWorkaroundTest(this.selectorWorkaroundTest);
            this.outputThreads[i] = thread;
            this.metricsRegistry.scanAndRegister(thread, "tcp.outputThread[" + thread.getName() + "]");
            thread.start();
        }
        this.startIOBalancer();
        if (this.metricsRegistry.minimumLevel().isEnabled(ProbeLevel.DEBUG)) {
            this.metricsRegistry.scheduleAtFixedRate(new PublishAllTask(), 1L, TimeUnit.SECONDS);
        }
    }

    private void startIOBalancer() {
        this.ioBalancer = new IOBalancer(this.inputThreads, this.outputThreads, this.hzName, this.balanceIntervalSeconds, this.loggingService);
        this.ioBalancer.start();
        this.metricsRegistry.scanAndRegister(this.ioBalancer, "tcp.balancer");
    }

    @Override
    public void shutdown() {
        this.ioBalancer.stop();
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Shutting down IO Threads... Total: " + (this.inputThreads.length + this.outputThreads.length));
        }
        this.shutdown(this.inputThreads);
        this.inputThreads = null;
        this.shutdown(this.outputThreads);
        this.outputThreads = null;
    }

    private void shutdown(NioThread[] threads) {
        if (threads == null) {
            return;
        }
        for (NioThread thread : threads) {
            thread.shutdown();
        }
    }

    @Override
    public void register(Channel channel) {
        NioChannel nioChannel = Preconditions.checkInstanceOf(NioChannel.class, channel);
        try {
            nioChannel.socketChannel().configureBlocking(false);
        }
        catch (IOException e) {
            throw ExceptionUtil.rethrow(e);
        }
        NioChannelReader reader = this.newChannelReader(nioChannel);
        NioChannelWriter writer = this.newChannelWriter(nioChannel);
        this.channels.add(nioChannel);
        nioChannel.setReader(reader);
        nioChannel.setWriter(writer);
        this.ioBalancer.channelAdded(reader, writer);
        String metricsId = channel.getLocalSocketAddress() + "->" + channel.getRemoteSocketAddress();
        this.metricsRegistry.scanAndRegister(writer, "tcp.connection[" + metricsId + "].out");
        this.metricsRegistry.scanAndRegister(reader, "tcp.connection[" + metricsId + "].in");
        reader.start();
        writer.start();
        channel.addCloseListener(this.channelCloseListener);
    }

    private NioChannelWriter newChannelWriter(NioChannel channel) {
        int index = HashUtil.hashToIndex(this.nextOutputThreadIndex.getAndIncrement(), this.outputThreadCount);
        NioThread[] threads = this.outputThreads;
        if (threads == null) {
            throw new IllegalStateException("IO thread is closed!");
        }
        return new NioChannelWriter(channel, threads[index], this.loggingService.getLogger(NioChannelWriter.class), this.ioBalancer, this.channelInitializer);
    }

    private NioChannelReader newChannelReader(NioChannel channel) {
        int index = HashUtil.hashToIndex(this.nextInputThreadIndex.getAndIncrement(), this.inputThreadCount);
        NioThread[] threads = this.inputThreads;
        if (threads == null) {
            throw new IllegalStateException("IO thread is closed!");
        }
        return new NioChannelReader(channel, threads[index], this.loggingService.getLogger(NioChannelReader.class), this.ioBalancer, this.channelInitializer);
    }

    private class ChannelCloseListenerImpl
    implements ChannelCloseListener {
        private ChannelCloseListenerImpl() {
        }

        @Override
        public void onClose(Channel channel) {
            NioChannel nioChannel = (NioChannel)channel;
            NioEventLoopGroup.this.channels.remove(channel);
            NioEventLoopGroup.this.ioBalancer.channelRemoved(nioChannel.getReader(), nioChannel.getWriter());
            NioEventLoopGroup.this.metricsRegistry.deregister(nioChannel.getReader());
            NioEventLoopGroup.this.metricsRegistry.deregister(nioChannel.getWriter());
        }
    }

    private class PublishAllTask
    implements Runnable {
        private PublishAllTask() {
        }

        @Override
        public void run() {
            for (NioChannel channel : NioEventLoopGroup.this.channels) {
                NioChannelWriter writer;
                NioThread outputThread;
                final NioChannelReader reader = channel.getReader();
                NioThread inputThread = reader.getOwner();
                if (inputThread != null) {
                    inputThread.addTaskAndWakeup(new Runnable(){

                        @Override
                        public void run() {
                            reader.publish();
                        }
                    });
                }
                if ((outputThread = (writer = channel.getWriter()).getOwner()) == null) continue;
                outputThread.addTaskAndWakeup(new Runnable(){

                    @Override
                    public void run() {
                        writer.publish();
                    }
                });
            }
        }
    }
}

