/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.spi.impl;

import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.MemberLeftException;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.instance.OutOfMemoryErrorDispatcher;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.IOUtil;
import com.hazelcast.nio.Packet;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.partition.InternalPartition;
import com.hazelcast.partition.InternalPartitionService;
import com.hazelcast.partition.ReplicaErrorLogger;
import com.hazelcast.spi.BackupAwareOperation;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.ExecutionTracingService;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.spi.InvocationBuilder;
import com.hazelcast.spi.Notifier;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationAccessor;
import com.hazelcast.spi.OperationFactory;
import com.hazelcast.spi.OperationService;
import com.hazelcast.spi.PartitionAwareOperation;
import com.hazelcast.spi.ReadonlyOperation;
import com.hazelcast.spi.ResponseHandler;
import com.hazelcast.spi.UrgentSystemOperation;
import com.hazelcast.spi.WaitSupport;
import com.hazelcast.spi.annotation.PrivateApi;
import com.hazelcast.spi.exception.CallerNotMemberException;
import com.hazelcast.spi.exception.PartitionMigratingException;
import com.hazelcast.spi.exception.WrongTargetException;
import com.hazelcast.spi.impl.Backup;
import com.hazelcast.spi.impl.BackupResponse;
import com.hazelcast.spi.impl.BasicBackPressureService;
import com.hazelcast.spi.impl.BasicDispatcher;
import com.hazelcast.spi.impl.BasicInvocation;
import com.hazelcast.spi.impl.BasicInvocationBuilder;
import com.hazelcast.spi.impl.BasicOperationScheduler;
import com.hazelcast.spi.impl.BasicPartitionInvocation;
import com.hazelcast.spi.impl.BasicTargetInvocation;
import com.hazelcast.spi.impl.CallTimeoutResponse;
import com.hazelcast.spi.impl.InternalOperationService;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.NormalResponse;
import com.hazelcast.spi.impl.PartitionIteratingOperation;
import com.hazelcast.spi.impl.RemoteOperationExceptionHandler;
import com.hazelcast.spi.impl.RemotePropagatable;
import com.hazelcast.spi.impl.Response;
import com.hazelcast.spi.impl.ResponseHandlerFactory;
import com.hazelcast.util.Clock;
import com.hazelcast.util.EmptyStatement;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.executor.ExecutorType;
import com.hazelcast.util.executor.ManagedExecutorService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

final class BasicOperationService
implements InternalOperationService {
    private static final int INITIAL_CAPACITY = 1000;
    private static final float LOAD_FACTOR = 0.75f;
    private static final long SCHEDULE_DELAY = 1111L;
    private static final int CORE_SIZE_CHECK = 8;
    private static final int CORE_SIZE_FACTOR = 4;
    private static final int CONCURRENCY_LEVEL = 16;
    private static final int ASYNC_QUEUE_CAPACITY = 100000;
    final ConcurrentMap<Long, BasicInvocation> invocations;
    final BasicOperationScheduler scheduler;
    final ILogger invocationLogger;
    final ManagedExecutorService asyncExecutor;
    private final AtomicLong executedOperationsCount = new AtomicLong();
    private final NodeEngineImpl nodeEngine;
    private final Node node;
    private final ILogger logger;
    private final AtomicLong callIdGen = new AtomicLong(1L);
    private final Map<RemoteCallKey, RemoteCallKey> executingCalls;
    private final long defaultCallTimeoutMillis;
    private final long backupOperationTimeoutMillis;
    private final ExecutionService executionService;
    private final OperationHandler operationHandler;
    private final OperationBackupHandler operationBackupHandler;
    private final OperationPacketHandler operationPacketHandler;
    private final ResponsePacketHandler responsePacketHandler;
    private final BasicBackPressureService backPressureService;
    private volatile boolean shutdown;

    BasicOperationService(NodeEngineImpl nodeEngine) {
        this.nodeEngine = nodeEngine;
        this.node = nodeEngine.getNode();
        this.logger = this.node.getLogger(OperationService.class);
        this.invocationLogger = nodeEngine.getLogger(BasicInvocation.class);
        this.defaultCallTimeoutMillis = this.node.getGroupProperties().OPERATION_CALL_TIMEOUT_MILLIS.getLong();
        this.backupOperationTimeoutMillis = this.node.getGroupProperties().OPERATION_BACKUP_TIMEOUT_MILLIS.getLong();
        this.executionService = nodeEngine.getExecutionService();
        this.backPressureService = new BasicBackPressureService(this.node.getGroupProperties(), this.logger);
        int coreSize = Runtime.getRuntime().availableProcessors();
        boolean reallyMultiCore = coreSize >= 8;
        int concurrencyLevel = reallyMultiCore ? coreSize * 4 : 16;
        this.executingCalls = new ConcurrentHashMap<RemoteCallKey, RemoteCallKey>(1000, 0.75f, concurrencyLevel);
        this.invocations = new ConcurrentHashMap<Long, BasicInvocation>(1000, 0.75f, concurrencyLevel);
        this.scheduler = new BasicOperationScheduler(this.node, this.executionService, new BasicDispatcherImpl());
        this.operationHandler = new OperationHandler();
        this.operationBackupHandler = new OperationBackupHandler();
        this.operationPacketHandler = new OperationPacketHandler();
        this.responsePacketHandler = new ResponsePacketHandler();
        this.asyncExecutor = this.executionService.register("hz:async", coreSize, 100000, ExecutorType.CONCRETE);
        this.startCleanupThread();
    }

    private void startCleanupThread() {
        CleanupThread t = new CleanupThread();
        t.start();
    }

    @Override
    public void dumpPerformanceMetrics(StringBuffer sb) {
        this.scheduler.dumpPerformanceMetrics(sb);
    }

    @Override
    public int getPartitionOperationThreadCount() {
        return this.scheduler.partitionOperationThreads.length;
    }

    @Override
    public int getGenericOperationThreadCount() {
        return this.scheduler.genericOperationThreads.length;
    }

    @Override
    public int getRunningOperationsCount() {
        return this.executingCalls.size();
    }

    @Override
    public long getExecutedOperationCount() {
        return this.executedOperationsCount.get();
    }

    @Override
    public int getRemoteOperationsCount() {
        return this.invocations.size();
    }

    @Override
    public int getResponseQueueSize() {
        return this.scheduler.getResponseQueueSize();
    }

    @Override
    public int getOperationExecutorQueueSize() {
        return this.scheduler.getOperationExecutorQueueSize();
    }

    @Override
    public int getPriorityOperationExecutorQueueSize() {
        return this.scheduler.getPriorityOperationExecutorQueueSize();
    }

    @Override
    public void execute(Runnable task, int partitionId) {
        this.scheduler.execute(task, partitionId);
    }

    @Override
    public InvocationBuilder createInvocationBuilder(String serviceName, Operation op, int partitionId) {
        if (partitionId < 0) {
            throw new IllegalArgumentException("Partition id cannot be negative!");
        }
        return new BasicInvocationBuilder(this.nodeEngine, serviceName, op, partitionId);
    }

    @Override
    public InvocationBuilder createInvocationBuilder(String serviceName, Operation op, Address target) {
        if (target == null) {
            throw new IllegalArgumentException("Target cannot be null!");
        }
        return new BasicInvocationBuilder(this.nodeEngine, serviceName, op, target);
    }

    @Override
    @PrivateApi
    public void executeOperation(Packet packet) {
        this.scheduler.execute(packet);
    }

    @Override
    public void runOperationOnCallingThread(Operation op) {
        if (!this.scheduler.isAllowedToRunInCurrentThread(op)) {
            throw new IllegalThreadStateException("Operation: " + op + " cannot be run in current thread! -> " + Thread.currentThread());
        }
        this.operationHandler.handle(op);
    }

    @Override
    public void executeOperation(Operation op) {
        this.scheduler.execute(op);
    }

    @Override
    public boolean isAllowedToRunOnCallingThread(Operation op) {
        return this.scheduler.isAllowedToRunInCurrentThread(op);
    }

    @Override
    public <E> InternalCompletableFuture<E> invokeOnPartition(String serviceName, Operation op, int partitionId) {
        return new BasicPartitionInvocation(this.nodeEngine, serviceName, op, partitionId, 0, 250, 500L, -1L, null, null, true).invoke();
    }

    @Override
    public <E> InternalCompletableFuture<E> invokeOnTarget(String serviceName, Operation op, Address target) {
        return new BasicTargetInvocation(this.nodeEngine, serviceName, op, target, 250, 500L, -1L, null, null, true).invoke();
    }

    @Override
    public void notifyBackupCall(long callId) {
        try {
            BasicInvocation invocation = (BasicInvocation)this.invocations.get(callId);
            if (invocation != null) {
                invocation.signalOneBackupComplete();
            }
        }
        catch (Exception e) {
            ReplicaErrorLogger.log(e, this.logger);
        }
    }

    @Override
    @PrivateApi
    public boolean isCallTimedOut(Operation op) {
        if (op.returnsResponse() && op.getCallId() != 0L) {
            long now;
            long callTimeout = op.getCallTimeout();
            long invocationTime = op.getInvocationTime();
            long expireTime = invocationTime + callTimeout;
            if (expireTime > 0L && expireTime < Long.MAX_VALUE && expireTime < (now = this.nodeEngine.getClusterTime())) {
                return true;
            }
        }
        return false;
    }

    @Override
    public Map<Integer, Object> invokeOnAllPartitions(String serviceName, OperationFactory operationFactory) throws Exception {
        Map<Address, List<Integer>> memberPartitions = this.nodeEngine.getPartitionService().getMemberPartitionsMap();
        InvokeOnPartitions invokeOnPartitions = new InvokeOnPartitions(serviceName, operationFactory, memberPartitions);
        return invokeOnPartitions.invoke();
    }

    @Override
    public Map<Integer, Object> invokeOnPartitions(String serviceName, OperationFactory operationFactory, Collection<Integer> partitions) throws Exception {
        HashMap memberPartitions = new HashMap(3);
        InternalPartitionService partitionService = this.nodeEngine.getPartitionService();
        for (int partition : partitions) {
            Address owner = partitionService.getPartitionOwnerOrWait(partition);
            if (!memberPartitions.containsKey(owner)) {
                memberPartitions.put(owner, new ArrayList());
            }
            ((List)memberPartitions.get(owner)).add(partition);
        }
        InvokeOnPartitions invokeOnPartitions = new InvokeOnPartitions(serviceName, operationFactory, memberPartitions);
        return invokeOnPartitions.invoke();
    }

    @Override
    public boolean send(Operation op, Address target) {
        if (target == null) {
            throw new IllegalArgumentException("Target is required!");
        }
        if (this.nodeEngine.getThisAddress().equals(target)) {
            throw new IllegalArgumentException("Target is this node! -> " + target + ", op: " + op);
        }
        Data data = this.nodeEngine.toData(op);
        int partitionId = this.scheduler.getPartitionIdForExecution(op);
        Packet packet = new Packet(data, partitionId, this.nodeEngine.getPortableContext());
        packet.setHeader(0);
        if (op instanceof UrgentSystemOperation) {
            packet.setHeader(4);
        }
        Connection connection = this.node.getConnectionManager().getOrConnect(target);
        return this.nodeEngine.send(packet, connection);
    }

    @Override
    public boolean send(Response response, Address target) {
        if (target == null) {
            throw new IllegalArgumentException("Target is required!");
        }
        if (this.nodeEngine.getThisAddress().equals(target)) {
            throw new IllegalArgumentException("Target is this node! -> " + target + ", response: " + response);
        }
        Data data = this.nodeEngine.toData(response);
        Packet packet = new Packet(data, this.nodeEngine.getPortableContext());
        packet.setHeader(0);
        packet.setHeader(1);
        if (response.isUrgent()) {
            packet.setHeader(4);
        }
        Connection connection = this.node.getConnectionManager().getOrConnect(target);
        return this.nodeEngine.send(packet, connection);
    }

    public void registerInvocation(BasicInvocation invocation) {
        long callId = this.callIdGen.getAndIncrement();
        Operation op = invocation.op;
        if (op.getCallId() != 0L) {
            this.invocations.remove(op.getCallId());
        }
        this.invocations.put(callId, invocation);
        OperationAccessor.setCallId(invocation.op, callId);
    }

    public void deregisterInvocation(BasicInvocation invocation) {
        this.invocations.remove(invocation.op.getCallId());
    }

    @PrivateApi
    long getDefaultCallTimeoutMillis() {
        return this.defaultCallTimeoutMillis;
    }

    @PrivateApi
    boolean isOperationExecuting(Address callerAddress, String callerUuid, long operationCallId) {
        return this.executingCalls.containsKey(new RemoteCallKey(callerAddress, operationCallId));
    }

    @PrivateApi
    boolean isOperationExecuting(Address callerAddress, String callerUuid, String serviceName, Object identifier) {
        Object service = this.nodeEngine.getService(serviceName);
        if (service == null) {
            this.logger.severe("Not able to find operation execution info. Invalid service: " + serviceName);
            return false;
        }
        if (service instanceof ExecutionTracingService) {
            return ((ExecutionTracingService)service).isOperationExecuting(callerAddress, callerUuid, identifier);
        }
        this.logger.severe("Not able to find operation execution info. Invalid service: " + service);
        return false;
    }

    @Override
    public void onMemberLeft(final MemberImpl member) {
        this.nodeEngine.getExecutionService().schedule(new Runnable(){

            @Override
            public void run() {
                Iterator iter = BasicOperationService.this.invocations.values().iterator();
                while (iter.hasNext()) {
                    BasicInvocation invocation = (BasicInvocation)iter.next();
                    if (!invocation.isCallTarget(member)) continue;
                    iter.remove();
                    invocation.notify(new MemberLeftException(member));
                }
            }
        }, 1111L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void shutdown() {
        this.shutdown = true;
        this.logger.finest("Stopping operation threads...");
        for (BasicInvocation invocation : this.invocations.values()) {
            try {
                invocation.notify(new HazelcastInstanceNotActiveException());
            }
            catch (Throwable e) {
                this.logger.warning(invocation + " could not be notified with shutdown message -> " + e.getMessage());
            }
        }
        this.invocations.clear();
        this.scheduler.shutdown();
    }

    private final class CleanupThread
    extends Thread {
        public static final int DELAY_MILLIS = 1000;

        private CleanupThread() {
            super(BasicOperationService.this.node.getThreadNamePrefix("CleanupThread"));
        }

        @Override
        public void run() {
            try {
                while (!BasicOperationService.this.shutdown) {
                    this.scanHandleOperationTimeout();
                    BasicOperationService.this.backPressureService.cleanup();
                    this.sleep();
                }
            }
            catch (Throwable t) {
                OutOfMemoryErrorDispatcher.inspectOutputMemoryError(t);
                BasicOperationService.this.logger.severe("Failed to run", t);
            }
        }

        private void sleep() {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException ignore) {
                EmptyStatement.ignore(ignore);
            }
        }

        private void scanHandleOperationTimeout() {
            if (BasicOperationService.this.invocations.isEmpty()) {
                return;
            }
            for (BasicInvocation invocation : BasicOperationService.this.invocations.values()) {
                try {
                    invocation.handleOperationTimeout();
                }
                catch (Throwable t) {
                    OutOfMemoryErrorDispatcher.inspectOutputMemoryError(t);
                    BasicOperationService.this.logger.severe("Failed to handle operation timeout of invocation:" + invocation, t);
                }
                try {
                    invocation.handleBackupTimeout(BasicOperationService.this.backupOperationTimeoutMillis);
                }
                catch (Throwable t) {
                    OutOfMemoryErrorDispatcher.inspectOutputMemoryError(t);
                    BasicOperationService.this.logger.severe("Failed to handle backup timeout of invocation:" + invocation, t);
                }
            }
        }
    }

    private static final class RemoteCallKey {
        private final long time = Clock.currentTimeMillis();
        private final Address callerAddress;
        private final long callId;

        private RemoteCallKey(Address callerAddress, long callId) {
            if (callerAddress == null) {
                throw new IllegalArgumentException("Caller address is required!");
            }
            this.callerAddress = callerAddress;
            this.callId = callId;
        }

        private RemoteCallKey(Operation op) {
            this.callerAddress = op.getCallerAddress();
            if (this.callerAddress == null) {
                throw new IllegalArgumentException("Caller address is required! -> " + op);
            }
            this.callId = op.getCallId();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            RemoteCallKey callKey = (RemoteCallKey)o;
            if (this.callId != callKey.callId) {
                return false;
            }
            return this.callerAddress.equals(callKey.callerAddress);
        }

        public int hashCode() {
            int result = this.callerAddress.hashCode();
            result = 31 * result + (int)(this.callId ^ this.callId >>> 32);
            return result;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("RemoteCallKey");
            sb.append("{callerAddress=").append(this.callerAddress);
            sb.append(", callId=").append(this.callId);
            sb.append(", time=").append(this.time);
            sb.append('}');
            return sb.toString();
        }
    }

    private final class OperationBackupHandler {
        private OperationBackupHandler() {
        }

        public int backup(BackupAwareOperation backupAwareOp) throws Exception {
            int asyncBackupCount;
            int requestedAsyncBackupCount;
            int requestedSyncBackupCount = backupAwareOp.getSyncBackupCount() > 0 ? Math.min(6, backupAwareOp.getSyncBackupCount()) : 0;
            int totalRequestedBackupCount = requestedSyncBackupCount + (requestedAsyncBackupCount = backupAwareOp.getAsyncBackupCount() > 0 ? Math.min(6 - requestedSyncBackupCount, backupAwareOp.getAsyncBackupCount()) : 0);
            if (totalRequestedBackupCount == 0) {
                return 0;
            }
            Operation op = (Operation)((Object)backupAwareOp);
            InternalPartitionService partitionService = BasicOperationService.this.node.getPartitionService();
            long[] replicaVersions = partitionService.incrementPartitionReplicaVersions(op.getPartitionId(), totalRequestedBackupCount);
            int maxPossibleBackupCount = partitionService.getMaxBackupCount();
            int syncBackupCount = Math.min(maxPossibleBackupCount, requestedSyncBackupCount);
            int totalBackupCount = syncBackupCount + (asyncBackupCount = Math.min(maxPossibleBackupCount - syncBackupCount, requestedAsyncBackupCount));
            if (totalBackupCount == 0) {
                return 0;
            }
            if (!op.returnsResponse()) {
                syncBackupCount = 0;
            }
            return this.makeBackups(backupAwareOp, op.getPartitionId(), replicaVersions, syncBackupCount, totalBackupCount);
        }

        private int makeBackups(BackupAwareOperation backupAwareOp, int partitionId, long[] replicaVersions, int syncBackupCount, int totalBackupCount) {
            Boolean backPressureNeeded = null;
            int sentSyncBackupCount = 0;
            InternalPartitionService partitionService = BasicOperationService.this.node.getPartitionService();
            InternalPartition partition = partitionService.getPartition(partitionId);
            for (int replicaIndex = 1; replicaIndex <= totalBackupCount; ++replicaIndex) {
                Address target = partition.getReplicaAddress(replicaIndex);
                if (target == null) continue;
                this.assertNoBackupOnPrimaryMember(partition, target);
                boolean isSyncBackup = true;
                if (replicaIndex > syncBackupCount) {
                    if (backPressureNeeded == null) {
                        backPressureNeeded = BasicOperationService.this.backPressureService.isBackPressureNeeded((Operation)((Object)backupAwareOp));
                    }
                    if (!backPressureNeeded.booleanValue()) {
                        isSyncBackup = false;
                    }
                }
                Backup backup = this.newBackup(backupAwareOp, replicaVersions, replicaIndex, isSyncBackup);
                BasicOperationService.this.send(backup, target);
                if (!isSyncBackup) continue;
                ++sentSyncBackupCount;
            }
            return sentSyncBackupCount;
        }

        private Backup newBackup(BackupAwareOperation backupAwareOp, long[] replicaVersions, int replicaIndex, boolean isSyncBackup) {
            Operation op = (Operation)((Object)backupAwareOp);
            Operation backupOp = this.initBackupOperation(backupAwareOp, replicaIndex);
            Object backupOpData = BasicOperationService.this.nodeEngine.getSerializationService().toData(backupOp);
            Backup backup = new Backup((Data)backupOpData, op.getCallerAddress(), replicaVersions, isSyncBackup);
            backup.setPartitionId(op.getPartitionId()).setReplicaIndex(replicaIndex).setServiceName(op.getServiceName()).setCallerUuid(BasicOperationService.this.nodeEngine.getLocalMember().getUuid());
            OperationAccessor.setCallId(backup, op.getCallId());
            return backup;
        }

        private Operation initBackupOperation(BackupAwareOperation backupAwareOp, int replicaIndex) {
            Operation backupOp = backupAwareOp.getBackupOperation();
            if (backupOp == null) {
                throw new IllegalArgumentException("Backup operation should not be null!");
            }
            Operation op = (Operation)((Object)backupAwareOp);
            backupOp.setPartitionId(op.getPartitionId()).setReplicaIndex(replicaIndex).setServiceName(op.getServiceName());
            return backupOp;
        }

        private void assertNoBackupOnPrimaryMember(InternalPartition partition, Address target) {
            if (target.equals(BasicOperationService.this.node.getThisAddress())) {
                throw new IllegalStateException("Normally shouldn't happen! Owner node and backup node are the same! " + partition);
            }
        }
    }

    private final class OperationHandler {
        private OperationHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handle(Operation op) {
            RemoteCallKey callKey;
            block7: {
                block6: {
                    BasicOperationService.this.executedOperationsCount.incrementAndGet();
                    callKey = null;
                    if (!this.timeout(op)) break block6;
                    this.afterCallExecution(op, callKey);
                    return;
                }
                callKey = this.beforeCallExecution(op);
                this.ensureNoPartitionProblems(op);
                op.beforeRun();
                if (!this.waitingNeeded(op)) break block7;
                this.afterCallExecution(op, callKey);
                return;
            }
            try {
                op.run();
                this.handleResponse(op);
                this.afterRun(op);
                this.afterCallExecution(op, callKey);
            }
            catch (Throwable e) {
                try {
                    this.handleOperationError(op, e);
                    this.afterCallExecution(op, callKey);
                }
                catch (Throwable throwable) {
                    this.afterCallExecution(op, callKey);
                    throw throwable;
                }
            }
        }

        private boolean waitingNeeded(Operation op) {
            WaitSupport waitSupport;
            if (op instanceof WaitSupport && (waitSupport = (WaitSupport)((Object)op)).shouldWait()) {
                ((BasicOperationService)BasicOperationService.this).nodeEngine.waitNotifyService.await(waitSupport);
                return true;
            }
            return false;
        }

        private boolean timeout(Operation op) {
            if (BasicOperationService.this.isCallTimedOut(op)) {
                op.getResponseHandler().sendResponse(new CallTimeoutResponse(op.getCallId(), op.isUrgent()));
                return true;
            }
            return false;
        }

        private void handleResponse(Operation op) throws Exception {
            boolean returnsResponse = op.returnsResponse();
            Object response = null;
            if (op instanceof BackupAwareOperation) {
                BackupAwareOperation backupAwareOp = (BackupAwareOperation)((Object)op);
                int syncBackupCount = 0;
                if (backupAwareOp.shouldBackup()) {
                    syncBackupCount = BasicOperationService.this.operationBackupHandler.backup(backupAwareOp);
                }
                if (returnsResponse) {
                    response = new NormalResponse(op.getResponse(), op.getCallId(), syncBackupCount, op.isUrgent());
                }
            }
            if (returnsResponse) {
                ResponseHandler responseHandler;
                if (response == null) {
                    response = op.getResponse();
                }
                if ((responseHandler = op.getResponseHandler()) == null) {
                    throw new IllegalStateException("ResponseHandler should not be null!");
                }
                responseHandler.sendResponse(response);
            }
        }

        private void afterRun(Operation op) {
            try {
                Notifier notifier;
                op.afterRun();
                if (op instanceof Notifier && (notifier = (Notifier)((Object)op)).shouldNotify()) {
                    ((BasicOperationService)BasicOperationService.this).nodeEngine.waitNotifyService.notify(notifier);
                }
            }
            catch (Throwable e) {
                this.logOperationError(op, e);
            }
        }

        private void ensureNoPartitionProblems(Operation op) {
            if (!(op instanceof PartitionAwareOperation)) {
                return;
            }
            int partitionId = op.getPartitionId();
            if (partitionId < 0) {
                throw new IllegalArgumentException("Partition id cannot be negative! -> " + partitionId);
            }
            InternalPartition internalPartition = BasicOperationService.this.nodeEngine.getPartitionService().getPartition(partitionId);
            if (this.retryDuringMigration(op) && internalPartition.isMigrating()) {
                throw new PartitionMigratingException(BasicOperationService.this.node.getThisAddress(), partitionId, op.getClass().getName(), op.getServiceName());
            }
            Address owner = internalPartition.getReplicaAddress(op.getReplicaIndex());
            if (op.validatesTarget() && !BasicOperationService.this.node.getThisAddress().equals(owner)) {
                throw new WrongTargetException(BasicOperationService.this.node.getThisAddress(), owner, partitionId, op.getReplicaIndex(), op.getClass().getName(), op.getServiceName());
            }
        }

        private boolean retryDuringMigration(Operation op) {
            return !(op instanceof ReadonlyOperation) && !OperationAccessor.isMigrationOperation(op);
        }

        private RemoteCallKey beforeCallExecution(Operation op) {
            RemoteCallKey callKey = null;
            if (op.getCallId() != 0L && op.returnsResponse()) {
                callKey = new RemoteCallKey(op);
                RemoteCallKey current = BasicOperationService.this.executingCalls.put(callKey, callKey);
                if (current != null) {
                    BasicOperationService.this.logger.warning("Duplicate Call record! -> " + callKey + " / " + current + " == " + op.getClass().getName());
                }
            }
            return callKey;
        }

        private void afterCallExecution(Operation op, RemoteCallKey callKey) {
            if (callKey != null && op.getCallId() != 0L && op.returnsResponse() && BasicOperationService.this.executingCalls.remove(callKey) == null) {
                BasicOperationService.this.logger.severe("No Call record has been found: -> " + callKey + " == " + op.getClass().getName());
            }
        }

        private void handleOperationError(RemotePropagatable remotePropagatable, Throwable e) {
            if (e instanceof OutOfMemoryError) {
                OutOfMemoryErrorDispatcher.onOutOfMemory((OutOfMemoryError)e);
            }
            remotePropagatable.logError(e);
            ResponseHandler responseHandler = remotePropagatable.getResponseHandler();
            if (remotePropagatable.returnsResponse() && responseHandler != null) {
                try {
                    if (BasicOperationService.this.node.isActive()) {
                        responseHandler.sendResponse(e);
                    } else if (responseHandler.isLocal()) {
                        responseHandler.sendResponse(new HazelcastInstanceNotActiveException());
                    }
                }
                catch (Throwable t) {
                    BasicOperationService.this.logger.warning("While sending op error... op: " + remotePropagatable + ", error: " + e, t);
                }
            }
        }

        private void logOperationError(Operation op, Throwable e) {
            if (e instanceof OutOfMemoryError) {
                OutOfMemoryErrorDispatcher.onOutOfMemory((OutOfMemoryError)e);
            }
            op.logError(e);
        }
    }

    private final class OperationPacketHandler {
        private OperationPacketHandler() {
        }

        private void handle(Packet packet) {
            try {
                Operation op = this.loadOperation(packet);
                if (!this.ensureValidMember(op)) {
                    return;
                }
                this.handle(op);
            }
            catch (Throwable e) {
                BasicOperationService.this.logger.severe(e);
            }
        }

        private Operation loadOperation(Packet packet) throws Exception {
            Connection conn = packet.getConn();
            Address caller = conn.getEndPoint();
            Data data = packet.getData();
            try {
                Object object = BasicOperationService.this.nodeEngine.toObject(data);
                Operation op = (Operation)object;
                op.setNodeEngine(BasicOperationService.this.nodeEngine);
                OperationAccessor.setCallerAddress(op, caller);
                OperationAccessor.setConnection(op, conn);
                this.setCallerUuidIfNotSet(caller, op);
                ResponseHandlerFactory.setRemoteResponseHandler(BasicOperationService.this.nodeEngine, op);
                return op;
            }
            catch (Throwable throwable) {
                long callId = IOUtil.extractOperationCallId(data, BasicOperationService.this.node.getSerializationService());
                RemoteOperationExceptionHandler exceptionHandler = new RemoteOperationExceptionHandler(callId);
                exceptionHandler.setNodeEngine(BasicOperationService.this.nodeEngine);
                exceptionHandler.setCallerAddress(caller);
                exceptionHandler.setConnection(conn);
                ResponseHandlerFactory.setRemoteResponseHandler(BasicOperationService.this.nodeEngine, exceptionHandler);
                BasicOperationService.this.operationHandler.handleOperationError(exceptionHandler, throwable);
                throw ExceptionUtil.rethrow(throwable);
            }
        }

        private void setCallerUuidIfNotSet(Address caller, Operation op) {
            if (op.getCallerUuid() != null) {
                return;
            }
            MemberImpl callerMember = ((BasicOperationService)BasicOperationService.this).node.clusterService.getMember(caller);
            if (callerMember != null) {
                op.setCallerUuid(callerMember.getUuid());
            }
        }

        private boolean ensureValidMember(Operation op) {
            if (!OperationAccessor.isJoinOperation(op) && ((BasicOperationService)BasicOperationService.this).node.clusterService.getMember(op.getCallerAddress()) == null) {
                CallerNotMemberException error = new CallerNotMemberException(op.getCallerAddress(), op.getPartitionId(), op.getClass().getName(), op.getServiceName());
                BasicOperationService.this.operationHandler.handleOperationError(op, error);
                return false;
            }
            return true;
        }

        private void handle(Operation op) {
            String executorName = op.getExecutorName();
            if (executorName == null) {
                BasicOperationService.this.operationHandler.handle(op);
            } else {
                this.offloadOperationHandling(op);
            }
        }

        private void offloadOperationHandling(final Operation op) {
            String executorName = op.getExecutorName();
            ManagedExecutorService executor = BasicOperationService.this.executionService.getExecutor(executorName);
            if (executor == null) {
                throw new IllegalStateException("Could not found executor with name: " + executorName);
            }
            executor.execute(new Runnable(){

                @Override
                public void run() {
                    BasicOperationService.this.operationHandler.handle(op);
                }
            });
        }
    }

    private final class ResponsePacketHandler {
        private ResponsePacketHandler() {
        }

        private void handle(Packet packet) {
            block4: {
                try {
                    Data data = packet.getData();
                    Response response = (Response)BasicOperationService.this.nodeEngine.toObject(data);
                    if (response instanceof NormalResponse || response instanceof CallTimeoutResponse) {
                        this.notifyRemoteCall(response);
                        break block4;
                    }
                    if (response instanceof BackupResponse) {
                        BasicOperationService.this.notifyBackupCall(response.getCallId());
                        break block4;
                    }
                    throw new IllegalStateException("Unrecognized response type: " + response);
                }
                catch (Throwable e) {
                    BasicOperationService.this.logger.severe("While processing response...", e);
                }
            }
        }

        private void notifyRemoteCall(Response response) {
            BasicInvocation invocation = (BasicInvocation)BasicOperationService.this.invocations.get(response.getCallId());
            if (invocation == null) {
                if (BasicOperationService.this.nodeEngine.isActive()) {
                    throw new HazelcastException("No invocation for response: " + response);
                }
                return;
            }
            invocation.notify(response);
        }
    }

    public final class BasicDispatcherImpl
    implements BasicDispatcher {
        @Override
        public void dispatch(Object task) {
            if (task == null) {
                throw new IllegalArgumentException();
            }
            if (task instanceof Operation) {
                BasicOperationService.this.operationHandler.handle((Operation)task);
            } else if (task instanceof Packet) {
                Packet packet = (Packet)task;
                if (packet.isHeaderSet(1)) {
                    BasicOperationService.this.responsePacketHandler.handle(packet);
                } else {
                    BasicOperationService.this.operationPacketHandler.handle(packet);
                }
            } else if (task instanceof Runnable) {
                ((Runnable)task).run();
            } else {
                throw new IllegalArgumentException("Unrecognized task:" + task);
            }
        }
    }

    private final class InvokeOnPartitions {
        public static final int TRY_COUNT = 10;
        public static final int TRY_PAUSE_MILLIS = 300;
        private final String serviceName;
        private final OperationFactory operationFactory;
        private final Map<Address, List<Integer>> memberPartitions;
        private final Map<Address, Future> futures;
        private final Map<Integer, Object> partitionResults;

        private InvokeOnPartitions(String serviceName, OperationFactory operationFactory, Map<Address, List<Integer>> memberPartitions) {
            this.serviceName = serviceName;
            this.operationFactory = operationFactory;
            this.memberPartitions = memberPartitions;
            this.futures = new HashMap<Address, Future>(memberPartitions.size());
            this.partitionResults = new HashMap<Integer, Object>(BasicOperationService.this.nodeEngine.getPartitionService().getPartitionCount());
        }

        private Map<Integer, Object> invoke() throws Exception {
            this.ensureNotCallingFromOperationThread();
            this.invokeOnAllPartitions();
            this.awaitCompletion();
            this.retryFailedPartitions();
            return this.partitionResults;
        }

        private void ensureNotCallingFromOperationThread() {
            Thread currentThread = Thread.currentThread();
            if (currentThread instanceof BasicOperationScheduler.OperationThread) {
                throw new IllegalThreadStateException(currentThread + " cannot make invocation on multiple partitions!");
            }
        }

        private void invokeOnAllPartitions() {
            for (Map.Entry<Address, List<Integer>> mp : this.memberPartitions.entrySet()) {
                Address address = mp.getKey();
                List<Integer> partitions = mp.getValue();
                PartitionIteratingOperation pi = new PartitionIteratingOperation(partitions, this.operationFactory);
                InternalCompletableFuture future = BasicOperationService.this.createInvocationBuilder(this.serviceName, (Operation)pi, address).setTryCount(10).setTryPauseMillis(300L).invoke();
                this.futures.put(address, future);
            }
        }

        private void awaitCompletion() {
            for (Map.Entry<Address, Future> response : this.futures.entrySet()) {
                try {
                    Future future = response.getValue();
                    PartitionIteratingOperation.PartitionResponse result = (PartitionIteratingOperation.PartitionResponse)BasicOperationService.this.nodeEngine.toObject(future.get());
                    this.partitionResults.putAll(result.asMap());
                }
                catch (Throwable t) {
                    if (BasicOperationService.this.logger.isFinestEnabled()) {
                        BasicOperationService.this.logger.finest(t);
                    } else {
                        BasicOperationService.this.logger.warning(t.getMessage());
                    }
                    List<Integer> partitions = this.memberPartitions.get(response.getKey());
                    for (Integer partition : partitions) {
                        this.partitionResults.put(partition, t);
                    }
                }
            }
        }

        private void retryFailedPartitions() throws InterruptedException, ExecutionException {
            Object result;
            LinkedList<Integer> failedPartitions = new LinkedList<Integer>();
            for (Map.Entry<Integer, Object> partitionResult : this.partitionResults.entrySet()) {
                int partitionId = partitionResult.getKey();
                result = partitionResult.getValue();
                if (!(result instanceof Throwable)) continue;
                failedPartitions.add(partitionId);
            }
            for (Integer failedPartition : failedPartitions) {
                InternalCompletableFuture f = BasicOperationService.this.createInvocationBuilder(this.serviceName, this.operationFactory.createOperation(), failedPartition).invoke();
                this.partitionResults.put(failedPartition, f);
            }
            for (Integer failedPartition : failedPartitions) {
                Future f = (Future)this.partitionResults.get(failedPartition);
                result = f.get();
                this.partitionResults.put(failedPartition, result);
            }
        }
    }
}

