/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.partition.impl;

import com.hazelcast.cluster.MemberInfo;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.MigrationEvent;
import com.hazelcast.core.MigrationListener;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.partition.InternalPartition;
import com.hazelcast.partition.InternalPartitionService;
import com.hazelcast.partition.MigrationEndpoint;
import com.hazelcast.partition.MigrationInfo;
import com.hazelcast.partition.PartitionInfo;
import com.hazelcast.partition.PartitionRuntimeState;
import com.hazelcast.partition.PartitionServiceProxy;
import com.hazelcast.partition.impl.AssignPartitions;
import com.hazelcast.partition.impl.ClearReplicaOperation;
import com.hazelcast.partition.impl.FinalizeMigrationOperation;
import com.hazelcast.partition.impl.HasOngoingMigration;
import com.hazelcast.partition.impl.InternalPartitionImpl;
import com.hazelcast.partition.impl.IsReplicaVersionSync;
import com.hazelcast.partition.impl.MigrationRequestOperation;
import com.hazelcast.partition.impl.PartitionListener;
import com.hazelcast.partition.impl.PartitionReplicaChangeEvent;
import com.hazelcast.partition.impl.PartitionReplicaVersions;
import com.hazelcast.partition.impl.PartitionStateGenerator;
import com.hazelcast.partition.impl.PartitionStateGeneratorImpl;
import com.hazelcast.partition.impl.PartitionStateOperation;
import com.hazelcast.partition.impl.PromoteFromBackupOperation;
import com.hazelcast.partition.impl.ReplicaSyncInfo;
import com.hazelcast.partition.impl.ReplicaSyncRequest;
import com.hazelcast.partition.impl.SyncReplicaVersion;
import com.hazelcast.partition.membergroup.MemberGroup;
import com.hazelcast.partition.membergroup.MemberGroupFactory;
import com.hazelcast.partition.membergroup.MemberGroupFactoryFactory;
import com.hazelcast.spi.Callback;
import com.hazelcast.spi.EventPublishingService;
import com.hazelcast.spi.EventRegistration;
import com.hazelcast.spi.EventService;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.spi.InvocationBuilder;
import com.hazelcast.spi.ManagedService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationService;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.ResponseHandlerFactory;
import com.hazelcast.util.Clock;
import com.hazelcast.util.FutureUtil;
import com.hazelcast.util.scheduler.CoalescingDelayedTrigger;
import com.hazelcast.util.scheduler.EntryTaskScheduler;
import com.hazelcast.util.scheduler.EntryTaskSchedulerFactory;
import com.hazelcast.util.scheduler.ScheduleType;
import com.hazelcast.util.scheduler.ScheduledEntry;
import com.hazelcast.util.scheduler.ScheduledEntryProcessor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;

public class InternalPartitionServiceImpl
implements InternalPartitionService,
ManagedService,
EventPublishingService<MigrationEvent, MigrationListener> {
    private static final String EXCEPTION_MSG_PARTITION_STATE_SYNC_TIMEOUT = "Partition state sync invocation timed out";
    private static final int DEFAULT_PAUSE_MILLIS = 1000;
    private static final int DEFAULT_SLEEP_MILLIS = 10;
    private static final float DEFAULT_MIGRATION_TIMEOUT_MULTIPLICATOR = 1.5f;
    private final Node node;
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final int partitionCount;
    private final InternalPartitionImpl[] partitions;
    private final PartitionReplicaVersions[] replicaVersions;
    private final AtomicReferenceArray<ReplicaSyncInfo> replicaSyncRequests;
    private final EntryTaskScheduler<Integer, ReplicaSyncInfo> replicaSyncScheduler;
    private final AtomicInteger replicaSyncProcessCount = new AtomicInteger();
    private final MigrationThread migrationThread;
    private final long partitionMigrationInterval;
    private final long partitionMigrationTimeout;
    private final PartitionStateGenerator partitionStateGenerator;
    private final MemberGroupFactory memberGroupFactory;
    private final PartitionServiceProxy proxy;
    private final Lock lock = new ReentrantLock();
    private final AtomicInteger stateVersion = new AtomicInteger();
    private final BlockingQueue<Runnable> migrationQueue = new LinkedBlockingQueue<Runnable>();
    private final AtomicBoolean migrationActive = new AtomicBoolean(true);
    private final AtomicLong lastRepartitionTime = new AtomicLong();
    private final CoalescingDelayedTrigger delayedResumeMigrationTrigger;
    private final FutureUtil.ExceptionHandler partitionStateSyncTimeoutHandler;
    private volatile int memberGroupsSize;
    private volatile boolean initialized;
    private final ConcurrentMap<Integer, MigrationInfo> activeMigrations = new ConcurrentHashMap<Integer, MigrationInfo>(3, 0.75f, 1);
    private final LinkedList<MigrationInfo> completedMigrations = new LinkedList();

    public InternalPartitionServiceImpl(Node node) {
        int i;
        this.partitionCount = node.groupProperties.PARTITION_COUNT.getInteger();
        this.node = node;
        this.nodeEngine = node.nodeEngine;
        this.logger = node.getLogger(InternalPartitionService.class);
        this.partitionStateSyncTimeoutHandler = FutureUtil.logAllExceptions(this.logger, EXCEPTION_MSG_PARTITION_STATE_SYNC_TIMEOUT, Level.FINEST);
        this.partitions = new InternalPartitionImpl[this.partitionCount];
        LocalPartitionListener partitionListener = new LocalPartitionListener(this, node.getThisAddress());
        for (i = 0; i < this.partitionCount; ++i) {
            this.partitions[i] = new InternalPartitionImpl(i, partitionListener);
        }
        this.replicaVersions = new PartitionReplicaVersions[this.partitionCount];
        for (i = 0; i < this.replicaVersions.length; ++i) {
            this.replicaVersions[i] = new PartitionReplicaVersions(i);
        }
        this.memberGroupFactory = MemberGroupFactoryFactory.newMemberGroupFactory(node.getConfig().getPartitionGroupConfig());
        this.partitionStateGenerator = new PartitionStateGeneratorImpl();
        this.partitionMigrationInterval = node.groupProperties.PARTITION_MIGRATION_INTERVAL.getLong() * 1000L;
        long defaultMigrationTimeout = node.groupProperties.PARTITION_MIGRATION_TIMEOUT.getLong();
        this.partitionMigrationTimeout = (long)((float)defaultMigrationTimeout * 1.5f);
        this.migrationThread = new MigrationThread(node);
        this.proxy = new PartitionServiceProxy(this);
        this.replicaSyncRequests = new AtomicReferenceArray<ReplicaSyncInfo>(new ReplicaSyncInfo[this.partitionCount]);
        ExecutionService executionService = this.nodeEngine.getExecutionService();
        ScheduledExecutorService scheduledExecutor = executionService.getDefaultScheduledExecutor();
        this.replicaSyncScheduler = EntryTaskSchedulerFactory.newScheduler(scheduledExecutor, new ReplicaSyncEntryProcessor(this), ScheduleType.SCHEDULE_IF_NEW);
        long maxMigrationDelayMs = this.calculateMaxMigrationDelayOnMemberRemoved();
        long minMigrationDelayMs = this.calculateMigrationDelayOnMemberRemoved(maxMigrationDelayMs);
        this.delayedResumeMigrationTrigger = new CoalescingDelayedTrigger(executionService, minMigrationDelayMs, maxMigrationDelayMs, new Runnable(){

            @Override
            public void run() {
                InternalPartitionServiceImpl.this.resumeMigration();
            }
        });
    }

    private long calculateMaxMigrationDelayOnMemberRemoved() {
        return this.node.groupProperties.OPERATION_CALL_TIMEOUT_MILLIS.getLong() / 2L;
    }

    private long calculateMigrationDelayOnMemberRemoved(long maxDelayMs) {
        long migrationDelayMs = this.node.groupProperties.MIGRATION_MIN_DELAY_ON_MEMBER_REMOVED_SECONDS.getLong() * 1000L;
        long connectionErrorDetectionIntervalMs = this.node.groupProperties.CONNECTION_MONITOR_INTERVAL.getLong() * (long)this.node.groupProperties.CONNECTION_MONITOR_MAX_FAULTS.getInteger() * 5L;
        migrationDelayMs = Math.max(migrationDelayMs, connectionErrorDetectionIntervalMs);
        long heartbeatIntervalMs = this.node.groupProperties.HEARTBEAT_INTERVAL_SECONDS.getLong() * 1000L;
        migrationDelayMs = Math.max(migrationDelayMs, heartbeatIntervalMs * 3L);
        migrationDelayMs = Math.min(migrationDelayMs, maxDelayMs);
        return migrationDelayMs;
    }

    @Override
    public void init(NodeEngine nodeEngine, Properties properties) {
        this.migrationThread.start();
        int partitionTableSendInterval = this.node.groupProperties.PARTITION_TABLE_SEND_INTERVAL.getInteger();
        if (partitionTableSendInterval <= 0) {
            partitionTableSendInterval = 1;
        }
        ExecutionService executionService = nodeEngine.getExecutionService();
        executionService.scheduleAtFixedRate(new SendClusterStateTask(), partitionTableSendInterval, partitionTableSendInterval, TimeUnit.SECONDS);
        int backupSyncCheckInterval = this.node.groupProperties.PARTITION_BACKUP_SYNC_INTERVAL.getInteger();
        if (backupSyncCheckInterval <= 0) {
            backupSyncCheckInterval = 1;
        }
        executionService.scheduleWithFixedDelay(new SyncReplicaVersionTask(), backupSyncCheckInterval, backupSyncCheckInterval, TimeUnit.SECONDS);
    }

    @Override
    public Address getPartitionOwner(int partitionId) {
        if (!this.initialized) {
            this.firstArrangement();
        }
        if (this.partitions[partitionId].getOwnerOrNull() == null && !this.node.isMaster() && this.node.joined()) {
            this.notifyMasterToAssignPartitions();
        }
        return this.partitions[partitionId].getOwnerOrNull();
    }

    @Override
    public Address getPartitionOwnerOrWait(int partition) throws InterruptedException {
        Address owner = this.getPartitionOwner(partition);
        while (owner == null) {
            Thread.sleep(100L);
            owner = this.getPartitionOwner(partition);
        }
        return owner;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyMasterToAssignPartitions() {
        if (this.lock.tryLock()) {
            try {
                if (!this.initialized && !this.node.isMaster() && this.node.getMasterAddress() != null && this.node.joined()) {
                    InternalCompletableFuture f = this.nodeEngine.getOperationService().createInvocationBuilder("hz:core:partitionService", (Operation)new AssignPartitions(), this.node.getMasterAddress()).setTryCount(1).invoke();
                    f.get(1L, TimeUnit.SECONDS);
                }
            }
            catch (Exception e) {
                this.logger.finest(e);
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void firstArrangement() {
        if (!this.node.isMaster() || !this.node.isActive()) {
            return;
        }
        if (!this.initialized) {
            this.lock.lock();
            try {
                if (this.initialized) {
                    return;
                }
                PartitionStateGenerator psg = this.partitionStateGenerator;
                Collection members = this.node.getClusterService().getMembers();
                Collection<MemberGroup> memberGroups = this.memberGroupFactory.createMemberGroups(members);
                if (memberGroups.isEmpty()) {
                    this.logger.warning("No member group is available to assign partition ownership...");
                    return;
                }
                this.logger.info("Initializing cluster partition table first arrangement...");
                Address[][] newState = psg.initialize(memberGroups, this.partitionCount);
                if (newState.length != this.partitionCount) {
                    throw new HazelcastException("Invalid partition count! Expected: " + this.partitionCount + ", Actual: " + newState.length);
                }
                for (int partitionId = 0; partitionId < this.partitionCount; ++partitionId) {
                    InternalPartitionImpl partition = this.partitions[partitionId];
                    Address[] replicas = newState[partitionId];
                    partition.setPartitionInfo(replicas);
                }
                this.initialized = true;
                this.publishPartitionRuntimeState();
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    private void updateMemberGroupsSize() {
        Collection members = this.node.getClusterService().getMembers();
        Collection<MemberGroup> groups = this.memberGroupFactory.createMemberGroups(members);
        int size = 0;
        for (MemberGroup group : groups) {
            if (group.size() <= 0) continue;
            ++size;
        }
        this.memberGroupsSize = size;
    }

    @Override
    public int getMemberGroupsSize() {
        int size = this.memberGroupsSize;
        return size > 0 ? size : 1;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void memberAdded(MemberImpl member) {
        if (!member.localMember()) {
            this.updateMemberGroupsSize();
        }
        if (this.node.isMaster() && this.node.isActive()) {
            this.lock.lock();
            try {
                this.migrationQueue.clear();
                if (this.initialized) {
                    this.migrationQueue.add(new RepartitioningTask());
                    Collection<MemberImpl> members = this.node.clusterService.getMemberList();
                    PartitionStateOperation op = new PartitionStateOperation(this.createPartitionState(members));
                    this.nodeEngine.getOperationService().send(op, member.getAddress());
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void memberRemoved(MemberImpl member) {
        this.updateMemberGroupsSize();
        Address deadAddress = member.getAddress();
        Address thisAddress = this.node.getThisAddress();
        if (deadAddress == null || deadAddress.equals(thisAddress)) {
            return;
        }
        this.lock.lock();
        try {
            this.migrationQueue.clear();
            if (!this.activeMigrations.isEmpty()) {
                if (this.node.isMaster()) {
                    this.rollbackActiveMigrationsFromPreviousMaster(this.node.getLocalMember().getUuid());
                }
                for (MigrationInfo migrationInfo : this.activeMigrations.values()) {
                    if (!deadAddress.equals(migrationInfo.getSource()) && !deadAddress.equals(migrationInfo.getDestination())) continue;
                    migrationInfo.invalidate();
                }
            }
            this.pauseMigration();
            this.promoteFromBackups(deadAddress, thisAddress);
            if (this.node.isMaster() && this.initialized) {
                this.migrationQueue.add(new RepartitioningTask());
            }
            this.resumeMigrationEventually();
        }
        finally {
            this.lock.unlock();
        }
    }

    private void resumeMigrationEventually() {
        this.delayedResumeMigrationTrigger.executeWithDelay();
    }

    private void promoteFromBackups(Address deadAddress, Address thisAddress) {
        for (InternalPartitionImpl partition : this.partitions) {
            boolean promote = false;
            if (deadAddress.equals(partition.getOwnerOrNull()) && thisAddress.equals(partition.getReplicaAddress(1))) {
                promote = true;
                partition.setMigrating(true);
            }
            partition.onDeadAddress(deadAddress);
            if (partition.onDeadAddress(deadAddress)) {
                throw new IllegalStateException("Duplicate address found in partition replicas!");
            }
            if (!promote) continue;
            PromoteFromBackupOperation op = new PromoteFromBackupOperation();
            op.setPartitionId(partition.getPartitionId()).setNodeEngine(this.nodeEngine).setValidateTarget(false).setService(this);
            this.nodeEngine.getOperationService().executeOperation(op);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rollbackActiveMigrationsFromPreviousMaster(String currentMasterUuid) {
        this.lock.lock();
        try {
            if (!this.activeMigrations.isEmpty()) {
                for (MigrationInfo migrationInfo : this.activeMigrations.values()) {
                    if (currentMasterUuid.equals(migrationInfo.getMasterUuid())) continue;
                    this.logger.info("Rolling-back migration initiated by the old master -> " + migrationInfo);
                    this.finalizeActiveMigration(migrationInfo);
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PartitionRuntimeState createPartitionState(Collection<MemberImpl> members) {
        this.lock.lock();
        try {
            ArrayList<MemberInfo> memberInfos = new ArrayList<MemberInfo>(members.size());
            for (MemberImpl member : members) {
                MemberInfo memberInfo = new MemberInfo(member.getAddress(), member.getUuid(), member.getAttributes());
                memberInfos.add(memberInfo);
            }
            ArrayList<MigrationInfo> migrationInfos = new ArrayList<MigrationInfo>(this.completedMigrations);
            long clusterTime = this.node.getClusterService().getClusterTime();
            ILogger logger = this.node.getLogger(PartitionRuntimeState.class);
            PartitionRuntimeState partitionRuntimeState = new PartitionRuntimeState(logger, memberInfos, this.partitions, migrationInfos, clusterTime, this.stateVersion.get());
            return partitionRuntimeState;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void publishPartitionRuntimeState() {
        if (!this.initialized) {
            return;
        }
        if (!(this.node.isMaster() && this.node.isActive() && this.node.joined())) {
            return;
        }
        if (!this.migrationActive.get()) {
            return;
        }
        this.lock.lock();
        try {
            Collection<MemberImpl> members = this.node.clusterService.getMemberList();
            PartitionRuntimeState partitionState = this.createPartitionState(members);
            PartitionStateOperation op = new PartitionStateOperation(partitionState);
            OperationService operationService = this.nodeEngine.getOperationService();
            for (MemberImpl member : members) {
                if (member.localMember()) continue;
                try {
                    operationService.send(op, member.getAddress());
                }
                catch (Exception e) {
                    this.logger.finest(e);
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private void syncPartitionRuntimeState() {
        this.syncPartitionRuntimeState(this.node.clusterService.getMemberList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncPartitionRuntimeState(Collection<MemberImpl> members) {
        if (!this.initialized) {
            return;
        }
        if (!(this.node.isMaster() && this.node.isActive() && this.node.joined())) {
            return;
        }
        this.lock.lock();
        try {
            PartitionRuntimeState partitionState = this.createPartitionState(members);
            OperationService operationService = this.nodeEngine.getOperationService();
            List<Future> calls = this.firePartitionStateOperation(members, partitionState, operationService);
            FutureUtil.waitWithDeadline(calls, 3L, TimeUnit.SECONDS, this.partitionStateSyncTimeoutHandler);
        }
        finally {
            this.lock.unlock();
        }
    }

    private List<Future> firePartitionStateOperation(Collection<MemberImpl> members, PartitionRuntimeState partitionState, OperationService operationService) {
        ArrayList<Future> calls = new ArrayList<Future>(members.size());
        for (MemberImpl member : members) {
            if (member.localMember()) continue;
            try {
                Address address = member.getAddress();
                PartitionStateOperation operation = new PartitionStateOperation(partitionState, true);
                InternalCompletableFuture f = operationService.invokeOnTarget("hz:core:partitionService", operation, address);
                calls.add(f);
            }
            catch (Exception e) {
                this.logger.finest(e);
            }
        }
        return calls;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void processPartitionRuntimeState(PartitionRuntimeState partitionState) {
        this.lock.lock();
        try {
            if (!this.node.isActive() || !this.node.joined()) {
                if (this.logger.isFinestEnabled()) {
                    this.logger.finest("Node should be active(" + this.node.isActive() + ") and joined(" + this.node.joined() + ") to be able to process partition table!");
                }
                return;
            }
            Address sender = partitionState.getEndpoint();
            Address master = this.node.getMasterAddress();
            if (this.node.isMaster()) {
                this.logger.warning("This is the master node and received a PartitionRuntimeState from " + sender + ". Ignoring incoming state! ");
                return;
            }
            if (sender == null || !sender.equals(master)) {
                if (this.node.clusterService.getMember(sender) == null) {
                    this.logger.severe("Received a ClusterRuntimeState from an unknown member! => Sender: " + sender + ", Master: " + master + "! ");
                    return;
                }
                this.logger.warning("Received a ClusterRuntimeState, but its sender doesn't seem to be master! => Sender: " + sender + ", Master: " + master + "! " + "(Ignore if master node has changed recently.)");
            }
            this.stateVersion.set(partitionState.getVersion());
            this.initialized = true;
            PartitionInfo[] state = partitionState.getPartitions();
            this.filterAndLogUnknownAddressesInPartitionTable(sender, state);
            this.finalizeOrRollbackMigration(partitionState, state);
        }
        finally {
            this.lock.unlock();
        }
    }

    private void finalizeOrRollbackMigration(PartitionRuntimeState partitionState, PartitionInfo[] state) {
        Collection<MigrationInfo> completedMigrations = partitionState.getCompletedMigrations();
        for (MigrationInfo completedMigration : completedMigrations) {
            this.addCompletedMigration(completedMigration);
            this.finalizeActiveMigration(completedMigration);
        }
        if (!this.activeMigrations.isEmpty()) {
            MemberImpl masterMember = this.getMasterMember();
            this.rollbackActiveMigrationsFromPreviousMaster(masterMember.getUuid());
        }
        this.allocateReplicas(state);
    }

    private void allocateReplicas(PartitionInfo[] state) {
        for (int partitionId = 0; partitionId < this.partitionCount; ++partitionId) {
            InternalPartitionImpl partition = this.partitions[partitionId];
            Address[] replicas = state[partitionId].getReplicaAddresses();
            partition.setPartitionInfo(replicas);
        }
    }

    private void filterAndLogUnknownAddressesInPartitionTable(Address sender, PartitionInfo[] state) {
        HashSet<Address> unknownAddresses = new HashSet<Address>();
        for (int partitionId = 0; partitionId < state.length; ++partitionId) {
            PartitionInfo partitionInfo = state[partitionId];
            InternalPartitionImpl currentPartition = this.partitions[partitionId];
            this.searchUnknownAddressesInPartitionTable(sender, unknownAddresses, partitionId, partitionInfo);
            currentPartition.setOwner(partitionInfo.getReplicaAddress(0));
        }
        this.logUnknownAddressesInPartitionTable(sender, unknownAddresses);
    }

    private void logUnknownAddressesInPartitionTable(Address sender, Set<Address> unknownAddresses) {
        if (!unknownAddresses.isEmpty() && this.logger.isLoggable(Level.WARNING)) {
            StringBuilder s = new StringBuilder("Following unknown addresses are found in partition table").append(" sent from master[").append(sender).append("].").append(" (Probably they have recently joined or left the cluster.)").append(" {");
            for (Address address : unknownAddresses) {
                s.append("\n\t").append(address);
            }
            s.append("\n}");
            this.logger.warning(s.toString());
        }
    }

    private void searchUnknownAddressesInPartitionTable(Address sender, Set<Address> unknownAddresses, int partitionId, PartitionInfo partitionInfo) {
        for (int index = 0; index < 7; ++index) {
            Address address = partitionInfo.getReplicaAddress(index);
            if (address == null || this.getMember(address) != null) continue;
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Unknown " + address + " found in partition table sent from master " + sender + ". It has probably already left the cluster. Partition: " + partitionId);
            }
            unknownAddresses.add(address);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void finalizeActiveMigration(final MigrationInfo migrationInfo) {
        if (this.activeMigrations.containsKey(migrationInfo.getPartitionId())) {
            this.lock.lock();
            try {
                if (this.activeMigrations.containsValue(migrationInfo)) {
                    if (migrationInfo.startProcessing()) {
                        this.processMigrationInfo(migrationInfo);
                    } else {
                        this.logger.info("Scheduling finalization of " + migrationInfo + ", because migration process is currently running.");
                        this.nodeEngine.getExecutionService().schedule(new Runnable(){

                            @Override
                            public void run() {
                                InternalPartitionServiceImpl.this.finalizeActiveMigration(migrationInfo);
                            }
                        }, 3L, TimeUnit.SECONDS);
                    }
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processMigrationInfo(MigrationInfo migrationInfo) {
        try {
            Address thisAddress = this.node.getThisAddress();
            boolean source = thisAddress.equals(migrationInfo.getSource());
            boolean destination = thisAddress.equals(migrationInfo.getDestination());
            if (source || destination) {
                int partitionId = migrationInfo.getPartitionId();
                InternalPartitionImpl migratingPartition = this.getPartitionImpl(partitionId);
                Address ownerAddress = migratingPartition.getOwnerOrNull();
                boolean success = migrationInfo.getDestination().equals(ownerAddress);
                MigrationEndpoint endpoint = source ? MigrationEndpoint.SOURCE : MigrationEndpoint.DESTINATION;
                FinalizeMigrationOperation op = new FinalizeMigrationOperation(endpoint, success);
                op.setPartitionId(partitionId).setNodeEngine(this.nodeEngine).setValidateTarget(false).setService(this);
                this.nodeEngine.getOperationService().executeOperation(op);
            }
        }
        catch (Exception e) {
            this.logger.warning(e);
        }
        finally {
            migrationInfo.doneProcessing();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addActiveMigration(MigrationInfo migrationInfo) {
        this.lock.lock();
        try {
            int partitionId = migrationInfo.getPartitionId();
            this.partitions[partitionId].setMigrating(true);
            MigrationInfo currentMigrationInfo = this.activeMigrations.putIfAbsent(partitionId, migrationInfo);
            if (currentMigrationInfo != null) {
                MigrationInfo newMigration;
                MigrationInfo oldMigration;
                boolean oldMaster = false;
                MemberImpl masterMember = this.getMasterMember();
                String master = masterMember.getUuid();
                if (!master.equals(currentMigrationInfo.getMasterUuid())) {
                    oldMigration = currentMigrationInfo;
                    newMigration = migrationInfo;
                    oldMaster = true;
                } else if (!master.equals(migrationInfo.getMasterUuid())) {
                    oldMigration = migrationInfo;
                    newMigration = currentMigrationInfo;
                    oldMaster = true;
                } else if (!currentMigrationInfo.isProcessing() && migrationInfo.isProcessing()) {
                    oldMigration = currentMigrationInfo;
                    newMigration = migrationInfo;
                } else {
                    String message = "Something is seriously wrong! There are two migration requests for the same partition! First-> " + currentMigrationInfo + ", Second -> " + migrationInfo;
                    IllegalStateException error = new IllegalStateException(message);
                    this.logger.severe(message, error);
                    throw error;
                }
                if (oldMaster) {
                    this.logger.info("Finalizing migration instantiated by the old master -> " + oldMigration);
                } else if (this.logger.isFinestEnabled()) {
                    this.logger.finest("Finalizing previous migration -> " + oldMigration);
                }
                this.finalizeActiveMigration(oldMigration);
                this.activeMigrations.put(partitionId, newMigration);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private MemberImpl getMasterMember() {
        return this.node.clusterService.getMember(this.node.getMasterAddress());
    }

    MigrationInfo getActiveMigration(int partitionId) {
        return (MigrationInfo)this.activeMigrations.get(partitionId);
    }

    MigrationInfo removeActiveMigration(int partitionId) {
        this.partitions[partitionId].setMigrating(false);
        return (MigrationInfo)this.activeMigrations.remove(partitionId);
    }

    @Override
    public Collection<MigrationInfo> getActiveMigrations() {
        return Collections.unmodifiableCollection(this.activeMigrations.values());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addCompletedMigration(MigrationInfo migrationInfo) {
        this.lock.lock();
        try {
            if (this.completedMigrations.size() > 25) {
                this.completedMigrations.removeFirst();
            }
            this.completedMigrations.add(migrationInfo);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void evictCompletedMigrations() {
        this.lock.lock();
        try {
            if (!this.completedMigrations.isEmpty()) {
                this.completedMigrations.removeFirst();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private void clearPartitionReplica(int partitionId, int replicaIndex) {
        ClearReplicaOperation op = new ClearReplicaOperation();
        op.setPartitionId(partitionId).setNodeEngine(this.nodeEngine).setService(this);
        this.nodeEngine.getOperationService().executeOperation(op);
    }

    void triggerPartitionReplicaSync(int partitionId, int replicaIndex) {
        this.syncPartitionReplica(partitionId, replicaIndex, 0L, false);
    }

    void forcePartitionReplicaSync(int partitionId, int replicaIndex) {
        this.syncPartitionReplica(partitionId, replicaIndex, 0L, true);
    }

    void schedulePartitionReplicaSync(int partitionId, int replicaIndex, long delayMillis) {
        this.syncPartitionReplica(partitionId, replicaIndex, delayMillis, true);
    }

    private void syncPartitionReplica(int partitionId, int replicaIndex, long delayMillis, boolean force) {
        if (replicaIndex < 0 || replicaIndex > 7) {
            throw new IllegalArgumentException("Invalid replica index: " + replicaIndex);
        }
        InternalPartitionImpl partitionImpl = this.getPartition(partitionId);
        Address target = partitionImpl.getOwnerOrNull();
        if (target != null) {
            if (this.checkSyncPartitionTarget(partitionId, replicaIndex, force, partitionImpl, target)) {
                return;
            }
            ReplicaSyncRequest syncRequest = new ReplicaSyncRequest();
            syncRequest.setPartitionId(partitionId).setReplicaIndex(replicaIndex);
            ReplicaSyncInfo currentSyncInfo = this.replicaSyncRequests.get(partitionId);
            ReplicaSyncInfo syncInfo = new ReplicaSyncInfo(partitionId, replicaIndex, target);
            boolean sendRequest = false;
            if (currentSyncInfo == null) {
                sendRequest = this.replicaSyncRequests.compareAndSet(partitionId, null, syncInfo);
            } else if (currentSyncInfo.requestTime < Clock.currentTimeMillis() - 10000L || this.nodeEngine.getClusterService().getMember(currentSyncInfo.target) == null) {
                sendRequest = this.replicaSyncRequests.compareAndSet(partitionId, currentSyncInfo, syncInfo);
            } else if (force) {
                this.replicaSyncRequests.set(partitionId, syncInfo);
                sendRequest = true;
            }
            if (sendRequest) {
                this.fireSyncReplicaRequest(partitionId, replicaIndex, delayMillis, target, syncRequest, syncInfo);
            }
        } else {
            this.logger.warning("Sync replica target is null, no need to sync -> partition: " + partitionId + ", replica: " + replicaIndex);
        }
    }

    private void fireSyncReplicaRequest(int partitionId, int replicaIndex, long delayMillis, Address target, ReplicaSyncRequest syncRequest, ReplicaSyncInfo syncInfo) {
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Sending sync replica request to -> " + target + "; for partition: " + partitionId + ", replica: " + replicaIndex);
        }
        this.replicaSyncScheduler.cancel(partitionId);
        if (delayMillis <= 0L) {
            this.replicaSyncScheduler.schedule(15000L, partitionId, syncInfo);
            this.nodeEngine.getOperationService().send(syncRequest, target);
        } else {
            this.replicaSyncScheduler.schedule(delayMillis, partitionId, syncInfo);
        }
    }

    private boolean checkSyncPartitionTarget(int partitionId, int replicaIndex, boolean force, InternalPartitionImpl partitionImpl, Address target) {
        if (target.equals(this.nodeEngine.getThisAddress())) {
            if (force) {
                Address thisAddress = this.node.nodeEngine.getThisAddress();
                throw new IllegalStateException("Replica target cannot be this node -> thisNode: " + thisAddress + " partitionId: " + partitionId + ", replicaIndex: " + replicaIndex + ", partition-info: " + partitionImpl);
            }
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("This node is now owner of partition, cannot sync replica -> partitionId: " + partitionId + ", replicaIndex: " + replicaIndex + ", partition-info: " + partitionImpl);
            }
            return true;
        }
        return false;
    }

    @Override
    public InternalPartition[] getPartitions() {
        InternalPartition[] result = new InternalPartition[this.partitions.length];
        System.arraycopy(this.partitions, 0, result, 0, this.partitions.length);
        return result;
    }

    @Override
    public MemberImpl getMember(Address address) {
        return this.node.clusterService.getMember(address);
    }

    private InternalPartitionImpl getPartitionImpl(int partitionId) {
        return this.partitions[partitionId];
    }

    @Override
    public InternalPartitionImpl getPartition(int partitionId) {
        return this.getPartition(partitionId, true);
    }

    @Override
    public InternalPartitionImpl getPartition(int partitionId, boolean triggerOwnerAssignment) {
        InternalPartitionImpl p = this.getPartitionImpl(partitionId);
        if (triggerOwnerAssignment && p.getOwnerOrNull() == null) {
            this.getPartitionOwner(partitionId);
        }
        return p;
    }

    @Override
    public boolean prepareToSafeShutdown(long timeout, TimeUnit unit) {
        long timeoutInMillis = unit.toMillis(timeout);
        int sleep = 1000;
        while (timeoutInMillis > 0L) {
            while (timeoutInMillis > 0L && this.shouldWaitMigrationOrBackups(Level.INFO)) {
                timeoutInMillis = this.sleepWithBusyWait(timeoutInMillis, sleep);
            }
            if (timeoutInMillis <= 0L) break;
            if (this.node.isMaster()) {
                this.syncPartitionRuntimeState();
            } else if ((timeoutInMillis = this.waitForOngoingMigrations(timeoutInMillis, sleep)) <= 0L) break;
            long start = Clock.currentTimeMillis();
            boolean ok = this.checkReplicaSyncState();
            timeoutInMillis -= Clock.currentTimeMillis() - start;
            if (ok) {
                this.logger.finest("Replica sync state before shutdown is OK");
                return true;
            }
            if (timeoutInMillis <= 0L) break;
            this.logger.info("Some backup replicas are inconsistent with primary, waiting for synchronization. Timeout: " + timeoutInMillis + "ms");
            timeoutInMillis = this.sleepWithBusyWait(timeoutInMillis, sleep);
        }
        return false;
    }

    private long waitForOngoingMigrations(long timeoutInMillis, int sleep) {
        long timeout = timeoutInMillis;
        while (timeout > 0L && this.hasOnGoingMigrationMaster(Level.WARNING)) {
            this.logger.info("Waiting for the master node to complete remaining migrations!");
            timeout = this.sleepWithBusyWait(timeout, sleep);
        }
        return timeout;
    }

    private long sleepWithBusyWait(long timeoutInMillis, int sleep) {
        try {
            Thread.sleep(sleep);
        }
        catch (InterruptedException ie) {
            Logger.getLogger(InternalPartitionServiceImpl.class).finest("Busy wait interrupted", ie);
        }
        return timeoutInMillis - (long)sleep;
    }

    @Override
    public boolean isMemberStateSafe() {
        if (this.hasOnGoingMigrationLocal()) {
            return false;
        }
        if (!this.node.isMaster() && this.hasOnGoingMigrationMaster(Level.OFF)) {
            return false;
        }
        return this.isReplicaInSyncState();
    }

    @Override
    public boolean hasOnGoingMigration() {
        return this.hasOnGoingMigrationLocal() || !this.node.isMaster() && this.hasOnGoingMigrationMaster(Level.FINEST);
    }

    private boolean hasOnGoingMigrationMaster(Level level) {
        HasOngoingMigration operation = new HasOngoingMigration();
        Address masterAddress = this.node.getMasterAddress();
        OperationService operationService = this.nodeEngine.getOperationService();
        InvocationBuilder invocationBuilder = operationService.createInvocationBuilder("hz:core:partitionService", (Operation)operation, masterAddress);
        InternalCompletableFuture future = invocationBuilder.setTryCount(100).setTryPauseMillis(100L).invoke();
        try {
            return (Boolean)future.get(1L, TimeUnit.MINUTES);
        }
        catch (InterruptedException ie) {
            Logger.getLogger(InternalPartitionServiceImpl.class).finest("Future wait interrupted", ie);
        }
        catch (Exception e) {
            this.logger.log(level, "Could not get a response from master about migrations! -> " + e.toString());
        }
        return false;
    }

    boolean hasOnGoingMigrationLocal() {
        return !this.activeMigrations.isEmpty() || !this.migrationQueue.isEmpty() || !this.migrationActive.get() || this.migrationThread.isMigrating() || this.shouldWaitMigrationOrBackups(Level.OFF);
    }

    private boolean isReplicaInSyncState() {
        if (!this.initialized || this.getMemberGroupsSize() < 2) {
            return true;
        }
        boolean replicaIndex = true;
        ArrayList<Future> futures = new ArrayList<Future>();
        Address thisAddress = this.node.getThisAddress();
        for (InternalPartitionImpl partition : this.partitions) {
            Address owner = partition.getOwnerOrNull();
            if (!thisAddress.equals(owner) || partition.getReplicaAddress(1) == null) continue;
            int partitionId = partition.getPartitionId();
            long replicaVersion = this.getCurrentReplicaVersion(1, partitionId);
            Operation operation = this.createReplicaSyncStateOperation(replicaVersion, partitionId);
            Future future = this.invoke(operation, 1, partitionId);
            futures.add(future);
        }
        if (futures.isEmpty()) {
            return true;
        }
        for (int i = 0; i < futures.size(); ++i) {
            Future future = (Future)futures.get(i);
            boolean sync = this.getFutureResult(future, 10L, TimeUnit.SECONDS);
            if (sync) continue;
            return false;
        }
        return true;
    }

    private long getCurrentReplicaVersion(int replicaIndex, int partitionId) {
        long[] versions = this.getPartitionReplicaVersions(partitionId);
        return versions[replicaIndex - 1];
    }

    private boolean getFutureResult(Future future, long seconds, TimeUnit unit) {
        boolean sync;
        try {
            sync = (Boolean)future.get(seconds, unit);
        }
        catch (Throwable t) {
            sync = false;
            this.logger.warning("Exception while getting future", t);
        }
        return sync;
    }

    private Future invoke(Operation operation, int replicaIndex, int partitionId) {
        OperationService operationService = this.nodeEngine.getOperationService();
        return operationService.createInvocationBuilder("hz:core:partitionService", operation, partitionId).setTryCount(3).setTryPauseMillis(250L).setReplicaIndex(replicaIndex).invoke();
    }

    private Operation createReplicaSyncStateOperation(long replicaVersion, int partitionId) {
        IsReplicaVersionSync op = new IsReplicaVersionSync(replicaVersion);
        op.setService(this);
        op.setNodeEngine(this.nodeEngine);
        op.setResponseHandler(ResponseHandlerFactory.createErrorLoggingResponseHandler(this.node.getLogger(IsReplicaVersionSync.class)));
        op.setPartitionId(partitionId);
        return op;
    }

    private boolean checkReplicaSyncState() {
        if (!this.initialized) {
            return true;
        }
        if (this.getMemberGroupsSize() < 2) {
            return true;
        }
        Address thisAddress = this.node.getThisAddress();
        final Semaphore s = new Semaphore(0);
        final AtomicBoolean ok = new AtomicBoolean(true);
        Callback<Object> callback = new Callback<Object>(){

            @Override
            public void notify(Object object) {
                if (Boolean.FALSE.equals(object)) {
                    ok.compareAndSet(true, false);
                } else if (object instanceof Throwable) {
                    ok.compareAndSet(true, false);
                }
                s.release();
            }
        };
        int notOwnedCount = this.syncReplicaVersion(thisAddress, s, ok, callback);
        s.release(notOwnedCount);
        try {
            if (ok.get()) {
                return s.tryAcquire(this.partitionCount, 10L, TimeUnit.SECONDS) && ok.get();
            }
            return false;
        }
        catch (InterruptedException ignored) {
            return false;
        }
    }

    private int syncReplicaVersion(Address thisAddress, Semaphore s, AtomicBoolean ok, Callback<Object> callback) {
        int notOwnedCount = 0;
        ILogger responseLogger = this.node.getLogger(SyncReplicaVersion.class);
        for (InternalPartitionImpl partition : this.partitions) {
            Address owner = partition.getOwnerOrNull();
            if (thisAddress.equals(owner)) {
                if (partition.getReplicaAddress(1) != null) {
                    SyncReplicaVersion op = new SyncReplicaVersion(1, callback);
                    op.setService(this);
                    op.setNodeEngine(this.nodeEngine);
                    op.setResponseHandler(ResponseHandlerFactory.createErrorLoggingResponseHandler(responseLogger));
                    op.setPartitionId(partition.getPartitionId());
                    this.nodeEngine.getOperationService().executeOperation(op);
                    continue;
                }
                ok.set(false);
                s.release();
                continue;
            }
            if (owner == null) {
                ok.set(false);
            }
            ++notOwnedCount;
        }
        return notOwnedCount;
    }

    private boolean shouldWaitMigrationOrBackups(Level level) {
        if (!this.preCheckShouldWaitMigrationOrBackups()) {
            return false;
        }
        if (this.checkForActiveMigrations(level)) {
            return true;
        }
        for (InternalPartitionImpl partition : this.partitions) {
            if (partition.getReplicaAddress(1) == null) {
                if (this.logger.isLoggable(level)) {
                    this.logger.log(level, "Should take backup of partition: " + partition.getPartitionId());
                }
                return true;
            }
            int replicaSyncProcesses = this.replicaSyncProcessCount.get();
            if (replicaSyncProcesses <= 0) continue;
            if (this.logger.isLoggable(level)) {
                this.logger.log(level, "Processing replica sync requests: " + replicaSyncProcesses);
            }
            return true;
        }
        return false;
    }

    private boolean preCheckShouldWaitMigrationOrBackups() {
        if (!this.initialized) {
            return false;
        }
        return this.getMemberGroupsSize() >= 2;
    }

    private boolean checkForActiveMigrations(Level level) {
        int activeSize = this.activeMigrations.size();
        if (activeSize != 0) {
            if (this.logger.isLoggable(level)) {
                this.logger.log(level, "Waiting for active migration tasks: " + activeSize);
            }
            return true;
        }
        int queueSize = this.migrationQueue.size();
        if (queueSize != 0) {
            if (this.logger.isLoggable(level)) {
                this.logger.log(level, "Waiting for cluster migration tasks: " + queueSize);
            }
            return true;
        }
        return false;
    }

    @Override
    public final int getPartitionId(Data key) {
        int hash = key.getPartitionHash();
        if (hash == Integer.MIN_VALUE) {
            return 0;
        }
        return Math.abs(hash) % this.partitionCount;
    }

    @Override
    public final int getPartitionId(Object key) {
        return this.getPartitionId(this.nodeEngine.toData(key));
    }

    @Override
    public final int getPartitionCount() {
        return this.partitionCount;
    }

    @Override
    public long[] incrementPartitionReplicaVersions(int partitionId, int backupCount) {
        PartitionReplicaVersions replicaVersion = this.replicaVersions[partitionId];
        return replicaVersion.incrementAndGet(backupCount);
    }

    @Override
    public void updatePartitionReplicaVersions(int partitionId, long[] versions, int replicaIndex) {
        PartitionReplicaVersions partitionVersion = this.replicaVersions[partitionId];
        if (!partitionVersion.update(versions, replicaIndex)) {
            this.triggerPartitionReplicaSync(partitionId, replicaIndex);
        }
    }

    @Override
    public long[] getPartitionReplicaVersions(int partitionId) {
        return this.replicaVersions[partitionId].get();
    }

    @Override
    public void setPartitionReplicaVersions(int partitionId, long[] versions) {
        this.replicaVersions[partitionId].reset(versions);
    }

    @Override
    public void clearPartitionReplicaVersions(int partitionId) {
        this.replicaVersions[partitionId].clear();
    }

    void finalizeReplicaSync(int partitionId, long[] versions) {
        this.setPartitionReplicaVersions(partitionId, versions);
        this.replicaSyncRequests.set(partitionId, null);
        this.replicaSyncScheduler.cancel(partitionId);
    }

    boolean incrementReplicaSyncProcessCount() {
        int c = this.replicaSyncProcessCount.get();
        if (c >= 4) {
            return false;
        }
        c = this.replicaSyncProcessCount.incrementAndGet();
        if (c >= 4) {
            this.replicaSyncProcessCount.decrementAndGet();
            return false;
        }
        return true;
    }

    void decrementReplicaSyncProcessCount() {
        this.replicaSyncProcessCount.decrementAndGet();
    }

    @Override
    public Map<Address, List<Integer>> getMemberPartitionsMap() {
        int members = this.node.getClusterService().getSize();
        HashMap<Address, List<Integer>> memberPartitions = new HashMap<Address, List<Integer>>(members);
        for (int i = 0; i < this.partitionCount; ++i) {
            Address owner;
            while ((owner = this.getPartitionOwner(i)) == null) {
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException e) {
                    throw new HazelcastException(e);
                }
            }
            ArrayList<Integer> ownedPartitions = (ArrayList<Integer>)memberPartitions.get(owner);
            if (ownedPartitions == null) {
                ownedPartitions = new ArrayList<Integer>();
                memberPartitions.put(owner, ownedPartitions);
            }
            ownedPartitions.add(i);
        }
        return memberPartitions;
    }

    @Override
    public List<Integer> getMemberPartitions(Address target) {
        LinkedList<Integer> ownedPartitions = new LinkedList<Integer>();
        for (int i = 0; i < this.partitionCount; ++i) {
            Address owner = this.getPartitionOwner(i);
            if (!target.equals(owner)) continue;
            ownedPartitions.add(i);
        }
        return ownedPartitions;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reset() {
        this.migrationQueue.clear();
        for (int k = 0; k < this.replicaSyncRequests.length(); ++k) {
            this.replicaSyncRequests.set(k, null);
        }
        this.replicaSyncScheduler.cancelAll();
        this.lock.lock();
        try {
            this.initialized = false;
            for (InternalPartitionImpl partition : this.partitions) {
                for (int i = 0; i < 7; ++i) {
                    partition.setReplicaAddress(i, null);
                    partition.setMigrating(false);
                }
            }
            this.activeMigrations.clear();
            this.completedMigrations.clear();
            this.stateVersion.set(0);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void pauseMigration() {
        this.migrationActive.set(false);
    }

    @Override
    public void resumeMigration() {
        this.migrationActive.set(true);
    }

    @Override
    public void shutdown(boolean terminate) {
        this.logger.finest("Shutting down the partition service");
        this.migrationThread.stopNow();
        this.reset();
    }

    @Override
    public long getMigrationQueueSize() {
        return this.migrationQueue.size();
    }

    @Override
    public PartitionServiceProxy getPartitionServiceProxy() {
        return this.proxy;
    }

    private void sendMigrationEvent(MigrationInfo migrationInfo, MigrationEvent.MigrationStatus status) {
        MemberImpl current = this.getMember(migrationInfo.getSource());
        MemberImpl newOwner = this.getMember(migrationInfo.getDestination());
        MigrationEvent event = new MigrationEvent(migrationInfo.getPartitionId(), current, newOwner, status);
        EventService eventService = this.nodeEngine.getEventService();
        Collection<EventRegistration> registrations = eventService.getRegistrations("hz:core:partitionService", "hz:core:partitionService");
        eventService.publishEvent("hz:core:partitionService", registrations, (Object)event, event.getPartitionId());
    }

    @Override
    public String addMigrationListener(MigrationListener listener) {
        EventService eventService = this.nodeEngine.getEventService();
        EventRegistration registration = eventService.registerListener("hz:core:partitionService", "hz:core:partitionService", listener);
        return registration.getId();
    }

    @Override
    public boolean removeMigrationListener(String registrationId) {
        EventService eventService = this.nodeEngine.getEventService();
        return eventService.deregisterListener("hz:core:partitionService", "hz:core:partitionService", registrationId);
    }

    @Override
    public void dispatchEvent(MigrationEvent migrationEvent, MigrationListener migrationListener) {
        MigrationEvent.MigrationStatus status = migrationEvent.getStatus();
        switch (status) {
            case STARTED: {
                migrationListener.migrationStarted(migrationEvent);
                break;
            }
            case COMPLETED: {
                migrationListener.migrationCompleted(migrationEvent);
                break;
            }
            case FAILED: {
                migrationListener.migrationFailed(migrationEvent);
                break;
            }
            default: {
                throw new IllegalArgumentException("Not a known MigrationStatus: " + (Object)((Object)status));
            }
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("PartitionManager[" + this.stateVersion + "] {\n");
        sb.append("\n");
        sb.append("migrationQ: ").append(this.migrationQueue.size());
        sb.append("\n}");
        return sb.toString();
    }

    public Node getNode() {
        return this.node;
    }

    private static class ReplicaSyncEntryProcessor
    implements ScheduledEntryProcessor<Integer, ReplicaSyncInfo> {
        private InternalPartitionServiceImpl partitionService;

        public ReplicaSyncEntryProcessor(InternalPartitionServiceImpl partitionService) {
            this.partitionService = partitionService;
        }

        @Override
        public void process(EntryTaskScheduler<Integer, ReplicaSyncInfo> scheduler, Collection<ScheduledEntry<Integer, ReplicaSyncInfo>> entries) {
            for (ScheduledEntry<Integer, ReplicaSyncInfo> entry : entries) {
                ReplicaSyncInfo syncInfo = entry.getValue();
                if (!this.partitionService.replicaSyncRequests.compareAndSet(entry.getKey(), syncInfo, null)) continue;
                this.logRendingSyncReplicaRequest(syncInfo);
                this.partitionService.triggerPartitionReplicaSync(syncInfo.partitionId, syncInfo.replicaIndex);
            }
        }

        private void logRendingSyncReplicaRequest(ReplicaSyncInfo syncInfo) {
            ILogger logger = this.partitionService.logger;
            if (logger.isFinestEnabled()) {
                logger.finest("Re-sending sync replica request for partition: " + syncInfo.partitionId + ", replica: " + syncInfo.replicaIndex);
            }
        }
    }

    private static final class LocalPartitionListener
    implements PartitionListener {
        final Address thisAddress;
        private InternalPartitionServiceImpl partitionService;

        private LocalPartitionListener(InternalPartitionServiceImpl partitionService, Address thisAddress) {
            this.thisAddress = thisAddress;
            this.partitionService = partitionService;
        }

        @Override
        public void replicaChanged(PartitionReplicaChangeEvent event) {
            int replicaIndex = event.getReplicaIndex();
            Address newAddress = event.getNewAddress();
            if (replicaIndex > 0) {
                int partitionId = event.getPartitionId();
                if (this.thisAddress.equals(event.getOldAddress())) {
                    InternalPartitionImpl partition = this.partitionService.partitions[partitionId];
                    if (!partition.isOwnerOrBackup(this.thisAddress)) {
                        this.partitionService.clearPartitionReplica(partitionId, replicaIndex);
                    }
                } else if (this.thisAddress.equals(newAddress)) {
                    this.partitionService.forcePartitionReplicaSync(partitionId, replicaIndex);
                }
            }
            Node node = this.partitionService.node;
            if (replicaIndex == 0 && newAddress == null && node.isActive() && node.joined()) {
                this.logOwnerOfPartitionIsRemoved(event);
            }
            if (this.partitionService.node.isMaster()) {
                this.partitionService.stateVersion.incrementAndGet();
            }
        }

        private void logOwnerOfPartitionIsRemoved(PartitionReplicaChangeEvent event) {
            String warning = "Owner of partition is being removed! Possible data loss for partition[" + event.getPartitionId() + "]. " + event;
            this.partitionService.logger.warning(warning);
        }
    }

    private class MigrationThread
    extends Thread
    implements Runnable {
        private final long sleepTime;
        private volatile boolean migrating;

        MigrationThread(Node node) {
            super(node.threadGroup, node.getThreadNamePrefix("migration"));
            this.sleepTime = Math.max(250L, InternalPartitionServiceImpl.this.partitionMigrationInterval);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                while (!this.isInterrupted()) {
                    this.doRun();
                }
            }
            catch (InterruptedException e) {
                if (InternalPartitionServiceImpl.this.logger.isFinestEnabled()) {
                    InternalPartitionServiceImpl.this.logger.finest("MigrationThread is interrupted: " + e.getMessage());
                }
            }
            finally {
                InternalPartitionServiceImpl.this.migrationQueue.clear();
            }
        }

        private void doRun() throws InterruptedException {
            Runnable r;
            while (InternalPartitionServiceImpl.this.migrationActive.get() && (r = (Runnable)InternalPartitionServiceImpl.this.migrationQueue.poll(1L, TimeUnit.SECONDS)) != null) {
                this.processTask(r);
                if (InternalPartitionServiceImpl.this.partitionMigrationInterval <= 0L) continue;
                Thread.sleep(InternalPartitionServiceImpl.this.partitionMigrationInterval);
            }
            boolean hasNoTasks = InternalPartitionServiceImpl.this.migrationQueue.isEmpty();
            if (hasNoTasks) {
                if (this.migrating) {
                    this.migrating = false;
                    InternalPartitionServiceImpl.this.logger.info("All migration tasks have been completed, queues are empty.");
                }
                InternalPartitionServiceImpl.this.evictCompletedMigrations();
                Thread.sleep(this.sleepTime);
            } else if (!InternalPartitionServiceImpl.this.migrationActive.get()) {
                Thread.sleep(this.sleepTime);
            }
        }

        boolean processTask(Runnable r) {
            if (r == null || this.isInterrupted()) {
                return false;
            }
            try {
                this.migrating = r instanceof MigrateTask;
                r.run();
            }
            catch (Throwable t) {
                InternalPartitionServiceImpl.this.logger.warning(t);
            }
            return true;
        }

        void stopNow() {
            InternalPartitionServiceImpl.this.migrationQueue.clear();
            this.interrupt();
        }

        boolean isMigrating() {
            return this.migrating;
        }
    }

    private class MigrateTask
    implements Runnable {
        final MigrationInfo migrationInfo;
        final BackupMigrationTask backupTask;

        MigrateTask(MigrationInfo migrationInfo, BackupMigrationTask backupTask) {
            this.migrationInfo = migrationInfo;
            this.backupTask = backupTask;
            MemberImpl masterMember = InternalPartitionServiceImpl.this.getMasterMember();
            if (masterMember != null) {
                migrationInfo.setMasterUuid(masterMember.getUuid());
                migrationInfo.setMaster(masterMember.getAddress());
            }
        }

        @Override
        public void run() {
            if (!InternalPartitionServiceImpl.this.node.isActive() || !InternalPartitionServiceImpl.this.node.isMaster()) {
                return;
            }
            MigrationRequestOperation migrationRequestOp = new MigrationRequestOperation(this.migrationInfo);
            try {
                MigrationInfo info = this.migrationInfo;
                InternalPartitionImpl partition = InternalPartitionServiceImpl.this.partitions[info.getPartitionId()];
                Address owner = partition.getOwnerOrNull();
                if (owner == null) {
                    InternalPartitionServiceImpl.this.logger.severe("ERROR: partition owner is not set! -> " + partition + " -VS- " + info);
                    return;
                }
                if (!owner.equals(info.getSource())) {
                    InternalPartitionServiceImpl.this.logger.severe("ERROR: partition owner is not the source of migration! -> " + partition + " -VS- " + info + " found owner:" + owner);
                    return;
                }
                InternalPartitionServiceImpl.this.sendMigrationEvent(this.migrationInfo, MigrationEvent.MigrationStatus.STARTED);
                Boolean result = Boolean.FALSE;
                MemberImpl fromMember = InternalPartitionServiceImpl.this.getMember(this.migrationInfo.getSource());
                if (InternalPartitionServiceImpl.this.logger.isFinestEnabled()) {
                    InternalPartitionServiceImpl.this.logger.finest("Started Migration : " + this.migrationInfo);
                }
                if (fromMember == null) {
                    InternalPartitionServiceImpl.this.logger.warning("Partition is lost! Assign new owner and exit...");
                    result = Boolean.TRUE;
                } else {
                    result = this.executeMigrateOperation(migrationRequestOp, fromMember);
                }
                this.processMigrationResult(result);
            }
            catch (Throwable t) {
                Level level = this.migrationInfo.isValid() ? Level.WARNING : Level.FINEST;
                InternalPartitionServiceImpl.this.logger.log(level, "Error [" + t.getClass() + ": " + t.getMessage() + "] while executing " + migrationRequestOp);
                InternalPartitionServiceImpl.this.logger.finest(t);
                this.migrationTaskFailed();
            }
        }

        private void processMigrationResult(Boolean result) {
            if (Boolean.TRUE.equals(result)) {
                String message = "Finished Migration: " + this.migrationInfo;
                if (InternalPartitionServiceImpl.this.logger.isFinestEnabled()) {
                    InternalPartitionServiceImpl.this.logger.finest(message);
                }
                this.processMigrationResult();
            } else {
                Level level = this.migrationInfo.isValid() ? Level.WARNING : Level.FINEST;
                InternalPartitionServiceImpl.this.logger.log(level, "Migration failed: " + this.migrationInfo);
                this.migrationTaskFailed();
            }
        }

        private Boolean executeMigrateOperation(MigrationRequestOperation migrationRequestOp, MemberImpl fromMember) {
            InternalCompletableFuture future = InternalPartitionServiceImpl.this.nodeEngine.getOperationService().createInvocationBuilder("hz:core:partitionService", (Operation)migrationRequestOp, this.migrationInfo.getSource()).setTryPauseMillis(1000L).invoke();
            try {
                Object response = future.get(InternalPartitionServiceImpl.this.partitionMigrationTimeout, TimeUnit.SECONDS);
                return (Boolean)InternalPartitionServiceImpl.this.nodeEngine.toObject(response);
            }
            catch (Throwable e) {
                Level level = InternalPartitionServiceImpl.this.node.isActive() && this.migrationInfo.isValid() ? Level.WARNING : Level.FINEST;
                InternalPartitionServiceImpl.this.logger.log(level, "Failed migration from " + fromMember, e);
                return Boolean.FALSE;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void migrationTaskFailed() {
            InternalPartitionServiceImpl.this.lock.lock();
            try {
                InternalPartitionServiceImpl.this.addCompletedMigration(this.migrationInfo);
                InternalPartitionServiceImpl.this.finalizeActiveMigration(this.migrationInfo);
                InternalPartitionServiceImpl.this.syncPartitionRuntimeState();
            }
            finally {
                InternalPartitionServiceImpl.this.lock.unlock();
            }
            InternalPartitionServiceImpl.this.sendMigrationEvent(this.migrationInfo, MigrationEvent.MigrationStatus.FAILED);
            InternalPartitionServiceImpl.this.migrationQueue.clear();
            InternalPartitionServiceImpl.this.migrationQueue.add(new RepartitioningTask());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processMigrationResult() {
            InternalPartitionServiceImpl.this.lock.lock();
            try {
                int partitionId = this.migrationInfo.getPartitionId();
                Address newOwner = this.migrationInfo.getDestination();
                InternalPartitionImpl partition = InternalPartitionServiceImpl.this.partitions[partitionId];
                partition.setOwner(newOwner);
                InternalPartitionServiceImpl.this.addCompletedMigration(this.migrationInfo);
                InternalPartitionServiceImpl.this.finalizeActiveMigration(this.migrationInfo);
                if (this.backupTask != null) {
                    this.backupTask.run();
                }
                InternalPartitionServiceImpl.this.syncPartitionRuntimeState();
            }
            finally {
                InternalPartitionServiceImpl.this.lock.unlock();
            }
            InternalPartitionServiceImpl.this.sendMigrationEvent(this.migrationInfo, MigrationEvent.MigrationStatus.COMPLETED);
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("MigrateTask{");
            sb.append("migrationInfo=").append(this.migrationInfo);
            sb.append('}');
            return sb.toString();
        }
    }

    private class BackupMigrationTask
    implements Runnable {
        final int partitionId;
        final Address[] replicas;

        BackupMigrationTask(int partitionId, Address[] replicas) {
            this.partitionId = partitionId;
            this.replicas = replicas;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            InternalPartitionServiceImpl.this.lock.lock();
            try {
                InternalPartitionImpl currentPartition = InternalPartitionServiceImpl.this.partitions[this.partitionId];
                for (int index = 1; index < 7; ++index) {
                    currentPartition.setReplicaAddress(index, this.replicas[index]);
                }
            }
            finally {
                InternalPartitionServiceImpl.this.lock.unlock();
            }
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("BackupMigrationTask{");
            sb.append("partitionId=").append(this.partitionId);
            sb.append("replicas=").append(Arrays.toString(this.replicas));
            sb.append('}');
            return sb.toString();
        }
    }

    private class RepartitioningTask
    implements Runnable {
        private RepartitioningTask() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (InternalPartitionServiceImpl.this.node.isMaster() && InternalPartitionServiceImpl.this.node.isActive()) {
                InternalPartitionServiceImpl.this.lock.lock();
                try {
                    if (!InternalPartitionServiceImpl.this.initialized) {
                        return;
                    }
                    if (!this.isMigrationAllowed()) {
                        return;
                    }
                    InternalPartitionServiceImpl.this.migrationQueue.clear();
                    PartitionStateGenerator psg = InternalPartitionServiceImpl.this.partitionStateGenerator;
                    Collection<MemberImpl> members = InternalPartitionServiceImpl.this.node.getClusterService().getMemberList();
                    Collection<MemberGroup> memberGroups = InternalPartitionServiceImpl.this.memberGroupFactory.createMemberGroups(members);
                    Address[][] newState = psg.reArrange(memberGroups, InternalPartitionServiceImpl.this.partitions);
                    if (!this.isMigrationAllowed()) {
                        return;
                    }
                    int migrationCount = 0;
                    int lostCount = 0;
                    InternalPartitionServiceImpl.this.lastRepartitionTime.set(Clock.currentTimeMillis());
                    for (int partitionId = 0; partitionId < InternalPartitionServiceImpl.this.partitionCount; ++partitionId) {
                        Address[] replicas = newState[partitionId];
                        InternalPartitionImpl currentPartition = InternalPartitionServiceImpl.this.partitions[partitionId];
                        Address currentOwner = currentPartition.getOwnerOrNull();
                        Address newOwner = replicas[0];
                        if (currentOwner == null) {
                            ++lostCount;
                            this.assignNewPartitionOwner(partitionId, replicas, currentPartition, newOwner);
                            continue;
                        }
                        if (newOwner != null && !currentOwner.equals(newOwner)) {
                            ++migrationCount;
                            this.migratePartitionToNewOwner(partitionId, replicas, currentOwner, newOwner);
                            continue;
                        }
                        currentPartition.setPartitionInfo(replicas);
                    }
                    InternalPartitionServiceImpl.this.syncPartitionRuntimeState(members);
                    this.logMigrationStatistics(migrationCount, lostCount);
                }
                finally {
                    InternalPartitionServiceImpl.this.lock.unlock();
                }
            }
        }

        private void logMigrationStatistics(int migrationCount, int lostCount) {
            if (lostCount > 0) {
                InternalPartitionServiceImpl.this.logger.warning("Assigning new owners for " + lostCount + " LOST partitions!");
            }
            if (migrationCount > 0) {
                InternalPartitionServiceImpl.this.logger.info("Re-partitioning cluster data... Migration queue size: " + migrationCount);
            } else {
                InternalPartitionServiceImpl.this.logger.info("Partition balance is ok, no need to re-partition cluster data... ");
            }
        }

        private void migratePartitionToNewOwner(int partitionId, Address[] replicas, Address currentOwner, Address newOwner) {
            MigrationInfo info = new MigrationInfo(partitionId, currentOwner, newOwner);
            MigrateTask migrateTask = new MigrateTask(info, new BackupMigrationTask(partitionId, replicas));
            boolean offered = InternalPartitionServiceImpl.this.migrationQueue.offer(migrateTask);
            if (!offered) {
                InternalPartitionServiceImpl.this.logger.severe("Failed to offer: " + migrateTask);
            }
        }

        private void assignNewPartitionOwner(int partitionId, Address[] replicas, InternalPartitionImpl currentPartition, Address newOwner) {
            currentPartition.setPartitionInfo(replicas);
            MigrationInfo migrationInfo = new MigrationInfo(partitionId, null, newOwner);
            InternalPartitionServiceImpl.this.sendMigrationEvent(migrationInfo, MigrationEvent.MigrationStatus.STARTED);
            InternalPartitionServiceImpl.this.sendMigrationEvent(migrationInfo, MigrationEvent.MigrationStatus.COMPLETED);
        }

        private boolean isMigrationAllowed() {
            if (InternalPartitionServiceImpl.this.migrationActive.get()) {
                return true;
            }
            InternalPartitionServiceImpl.this.migrationQueue.add(this);
            return false;
        }
    }

    private class SyncReplicaVersionTask
    implements Runnable {
        private SyncReplicaVersionTask() {
        }

        @Override
        public void run() {
            if (InternalPartitionServiceImpl.this.node.isActive() && InternalPartitionServiceImpl.this.migrationActive.get()) {
                Address thisAddress = InternalPartitionServiceImpl.this.node.getThisAddress();
                for (InternalPartitionImpl partition : InternalPartitionServiceImpl.this.partitions) {
                    if (!thisAddress.equals(partition.getOwnerOrNull())) continue;
                    for (int index = 1; index < 7; ++index) {
                        if (partition.getReplicaAddress(index) == null) continue;
                        SyncReplicaVersion op = new SyncReplicaVersion(index, null);
                        op.setService(InternalPartitionServiceImpl.this);
                        op.setNodeEngine(InternalPartitionServiceImpl.this.nodeEngine);
                        op.setResponseHandler(ResponseHandlerFactory.createErrorLoggingResponseHandler(InternalPartitionServiceImpl.this.node.getLogger(SyncReplicaVersion.class)));
                        op.setPartitionId(partition.getPartitionId());
                        InternalPartitionServiceImpl.this.nodeEngine.getOperationService().executeOperation(op);
                    }
                }
            }
        }
    }

    private class SendClusterStateTask
    implements Runnable {
        private SendClusterStateTask() {
        }

        @Override
        public void run() {
            if (InternalPartitionServiceImpl.this.node.isMaster() && InternalPartitionServiceImpl.this.node.isActive()) {
                if (!InternalPartitionServiceImpl.this.migrationQueue.isEmpty() && InternalPartitionServiceImpl.this.migrationActive.get()) {
                    InternalPartitionServiceImpl.this.logger.info("Remaining migration tasks in queue => " + InternalPartitionServiceImpl.this.migrationQueue.size());
                }
                InternalPartitionServiceImpl.this.publishPartitionRuntimeState();
            }
        }
    }
}

