/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.nio.tcp;

import com.hazelcast.nio.SocketWritable;
import com.hazelcast.nio.ascii.SocketTextWriter;
import com.hazelcast.nio.tcp.AbstractSelectionHandler;
import com.hazelcast.nio.tcp.IOSelector;
import com.hazelcast.nio.tcp.SocketClientDataWriter;
import com.hazelcast.nio.tcp.SocketClientMessageWriter;
import com.hazelcast.nio.tcp.SocketPacketWriter;
import com.hazelcast.nio.tcp.SocketWriter;
import com.hazelcast.nio.tcp.TcpIpConnection;
import com.hazelcast.util.Clock;
import com.hazelcast.util.EmptyStatement;
import com.hazelcast.util.StringUtil;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;

public final class WriteHandler
extends AbstractSelectionHandler
implements Runnable {
    private static final long TIMEOUT = 3L;
    private final Queue<SocketWritable> writeQueue = new ConcurrentLinkedQueue<SocketWritable>();
    private final Queue<SocketWritable> urgentWriteQueue = new ConcurrentLinkedQueue<SocketWritable>();
    private final AtomicBoolean scheduled = new AtomicBoolean(false);
    private ByteBuffer outputBuffer;
    private SocketWritable currentPacket;
    private SocketWriter socketWriter;
    private volatile long lastHandle;
    private volatile long eventCount;
    private boolean shutdown;
    private IOSelector newOwner;

    WriteHandler(TcpIpConnection connection, IOSelector ioSelector) {
        super(connection, ioSelector, 4);
    }

    long getLastHandle() {
        return this.lastHandle;
    }

    public SocketWriter getSocketWriter() {
        return this.socketWriter;
    }

    void setProtocol(final String protocol) {
        final CountDownLatch latch = new CountDownLatch(1);
        this.ioSelector.addTaskAndWakeup(new Runnable(){

            @Override
            public void run() {
                WriteHandler.this.createWriter(protocol);
                latch.countDown();
            }
        });
        try {
            latch.await(3L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            this.logger.finest("CountDownLatch::await interrupted", e);
        }
    }

    private void createWriter(String protocol) {
        if (this.socketWriter == null) {
            if ("HZC".equals(protocol)) {
                this.configureBuffers(this.connectionManager.socketSendBufferSize);
                this.socketWriter = new SocketPacketWriter(this.connection);
                this.outputBuffer.put(StringUtil.stringToBytes("HZC"));
                this.registerOp(4);
            } else if ("CB1".equals(protocol)) {
                this.configureBuffers(this.connectionManager.socketClientSendBufferSize);
                this.socketWriter = new SocketClientDataWriter();
            } else if ("CB2".equals(protocol)) {
                this.configureBuffers(this.connectionManager.socketClientSendBufferSize);
                this.socketWriter = new SocketClientMessageWriter();
            } else {
                this.configureBuffers(this.connectionManager.socketClientSendBufferSize);
                this.socketWriter = new SocketTextWriter(this.connection);
            }
        }
    }

    private void configureBuffers(int size) {
        this.outputBuffer = ByteBuffer.allocate(size);
        try {
            this.connection.setSendBufferSize(size);
        }
        catch (SocketException e) {
            this.logger.finest("Failed to adjust TCP send buffer of " + this.connection + " to " + size + " B.", e);
        }
    }

    public void offer(SocketWritable packet) {
        if (packet.isUrgent()) {
            this.urgentWriteQueue.offer(packet);
        } else {
            this.writeQueue.offer(packet);
        }
        this.schedule();
    }

    private SocketWritable poll() {
        SocketWritable packet;
        while (true) {
            if ((packet = this.urgentWriteQueue.poll()) == null) {
                packet = this.writeQueue.poll();
            }
            if (!(packet instanceof TaskPacket)) break;
            ((TaskPacket)packet).run();
        }
        return packet;
    }

    private void schedule() {
        if (this.scheduled.get()) {
            return;
        }
        if (!this.scheduled.compareAndSet(false, true)) {
            return;
        }
        this.ioSelector.addTaskAndWakeup(this);
    }

    private void unschedule() {
        if (this.dirtyOutputBuffer() || this.currentPacket != null) {
            this.registerOp(4);
            return;
        }
        this.unregisterOp(4);
        this.scheduled.set(false);
        if (this.writeQueue.isEmpty() && this.urgentWriteQueue.isEmpty()) {
            return;
        }
        if (!this.scheduled.compareAndSet(false, true)) {
            return;
        }
        this.ioSelector.addTask(this);
    }

    @Override
    public long getEventCount() {
        return this.eventCount;
    }

    @Override
    @SuppressWarnings(value={"VO_VOLATILE_INCREMENT"}, justification="eventCount is accessed by a single thread only.")
    public void handle() {
        ++this.eventCount;
        this.lastHandle = Clock.currentTimeMillis();
        if (this.shutdown) {
            return;
        }
        if (this.socketWriter == null) {
            this.logger.log(Level.WARNING, "SocketWriter is not set, creating SocketWriter with CLUSTER protocol!");
            this.createWriter("HZC");
        }
        try {
            this.fillOutputBuffer();
            if (this.dirtyOutputBuffer()) {
                this.writeOutputBufferToSocket();
            }
        }
        catch (Throwable t) {
            this.logger.severe("Fatal Error at WriteHandler for endPoint: " + this.connection.getEndPoint(), t);
        }
        if (this.newOwner == null) {
            this.unschedule();
        } else {
            IOSelector newOwner = this.newOwner;
            this.newOwner = null;
            this.startMigration(newOwner);
        }
    }

    private boolean dirtyOutputBuffer() {
        return this.outputBuffer.position() > 0;
    }

    private void writeOutputBufferToSocket() throws Exception {
        this.outputBuffer.flip();
        try {
            this.socketChannel.write(this.outputBuffer);
        }
        catch (Exception e) {
            this.currentPacket = null;
            this.handleSocketException(e);
            return;
        }
        if (!this.outputBuffer.hasRemaining()) {
            this.outputBuffer.clear();
            return;
        }
        this.outputBuffer.compact();
    }

    private void fillOutputBuffer() throws Exception {
        while (this.outputBuffer.hasRemaining()) {
            if (this.currentPacket == null) {
                this.currentPacket = this.poll();
                if (this.currentPacket == null) {
                    return;
                }
            }
            if (!this.socketWriter.write(this.currentPacket, this.outputBuffer)) {
                return;
            }
            this.currentPacket = null;
        }
        return;
    }

    @Override
    public void run() {
        try {
            this.handle();
        }
        catch (Throwable e) {
            this.ioSelector.handleSelectionKeyFailure(e);
        }
    }

    public void shutdown() {
        this.writeQueue.clear();
        this.urgentWriteQueue.clear();
        ShutdownTask shutdownTask = new ShutdownTask();
        this.offer(shutdownTask);
        shutdownTask.awaitCompletion();
    }

    @Override
    public void requestMigration(IOSelector newOwner) {
        this.offer(new StartMigrationTask(newOwner));
    }

    private class ShutdownTask
    extends TaskPacket {
        private final CountDownLatch latch;

        private ShutdownTask() {
            this.latch = new CountDownLatch(1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        void run() {
            WriteHandler.this.shutdown = true;
            try {
                WriteHandler.this.socketChannel.closeOutbound();
            }
            catch (IOException e) {
                WriteHandler.this.logger.finest("Error while closing outbound", e);
            }
            finally {
                this.latch.countDown();
            }
        }

        void awaitCompletion() {
            try {
                this.latch.await(3L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                EmptyStatement.ignore(e);
            }
        }
    }

    private class StartMigrationTask
    extends TaskPacket {
        private final IOSelector theNewOwner;

        public StartMigrationTask(IOSelector theNewOwner) {
            this.theNewOwner = theNewOwner;
        }

        @Override
        void run() {
            assert (WriteHandler.this.newOwner == null) : "No migration can be in progress";
            if (WriteHandler.this.ioSelector == this.theNewOwner) {
                return;
            }
            WriteHandler.this.newOwner = this.theNewOwner;
        }
    }

    private abstract class TaskPacket
    implements SocketWritable {
        private TaskPacket() {
        }

        abstract void run();

        @Override
        public boolean writeTo(ByteBuffer destination) {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean isUrgent() {
            return true;
        }
    }
}

