/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.cache.impl;

import com.hazelcast.cache.CacheMergePolicy;
import com.hazelcast.cache.impl.CachePartitionSegment;
import com.hazelcast.cache.impl.CacheService;
import com.hazelcast.cache.impl.ICacheRecordStore;
import com.hazelcast.cache.impl.SplitBrainAwareCacheRecordStore;
import com.hazelcast.cache.impl.merge.entry.DefaultCacheEntryView;
import com.hazelcast.cache.impl.merge.policy.CacheMergePolicyProvider;
import com.hazelcast.cache.impl.operation.CacheMergeOperation;
import com.hazelcast.cache.impl.record.CacheRecord;
import com.hazelcast.config.CacheConfig;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.partition.IPartitionService;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.util.ExceptionUtil;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

class CacheSplitBrainHandler {
    private final NodeEngine nodeEngine;
    private final Map<String, CacheConfig> configs;
    private final CachePartitionSegment[] segments;
    private final CacheMergePolicyProvider mergePolicyProvider;

    CacheSplitBrainHandler(NodeEngine nodeEngine, Map<String, CacheConfig> configs, CachePartitionSegment[] segments) {
        this.nodeEngine = nodeEngine;
        this.configs = configs;
        this.segments = segments;
        this.mergePolicyProvider = new CacheMergePolicyProvider(nodeEngine);
    }

    Runnable prepareMergeRunnable() {
        HashMap<String, Map<Data, CacheRecord>> recordMap = new HashMap<String, Map<Data, CacheRecord>>(this.configs.size());
        IPartitionService partitionService = this.nodeEngine.getPartitionService();
        int partitionCount = partitionService.getPartitionCount();
        Address thisAddress = this.nodeEngine.getClusterService().getThisAddress();
        for (int i = 0; i < partitionCount; ++i) {
            if (!thisAddress.equals(partitionService.getPartitionOwner(i))) continue;
            CachePartitionSegment segment = this.segments[i];
            Iterator<ICacheRecordStore> iter = segment.recordStoreIterator();
            while (iter.hasNext()) {
                ICacheRecordStore cacheRecordStore = iter.next();
                if (!(cacheRecordStore instanceof SplitBrainAwareCacheRecordStore)) continue;
                String cacheName = cacheRecordStore.getName();
                HashMap<Data, CacheRecord> records = (HashMap<Data, CacheRecord>)recordMap.get(cacheName);
                if (records == null) {
                    records = new HashMap<Data, CacheRecord>(cacheRecordStore.size());
                    recordMap.put(cacheName, records);
                }
                for (Map.Entry<Data, CacheRecord> cacheRecordEntry : cacheRecordStore.getReadOnlyRecords().entrySet()) {
                    Data key = cacheRecordEntry.getKey();
                    CacheRecord cacheRecord = cacheRecordEntry.getValue();
                    records.put(key, cacheRecord);
                }
                cacheRecordStore.clear();
                CacheService cacheService = (CacheService)this.nodeEngine.getService("hz:impl:cacheService");
                cacheService.sendInvalidationEvent(cacheName, null, "<NA>");
            }
        }
        return new CacheMerger(this.nodeEngine, this.configs, recordMap, this.mergePolicyProvider);
    }

    private static class CacheMerger
    implements Runnable {
        private static final int TIMEOUT_FACTOR = 500;
        private final NodeEngine nodeEngine;
        private final Map<String, CacheConfig> configs;
        private final Map<String, Map<Data, CacheRecord>> recordMap;
        private final CacheMergePolicyProvider mergePolicyProvider;
        private final ILogger logger;

        public CacheMerger(NodeEngine nodeEngine, Map<String, CacheConfig> configs, Map<String, Map<Data, CacheRecord>> recordMap, CacheMergePolicyProvider mergePolicyProvider) {
            this.nodeEngine = nodeEngine;
            this.configs = configs;
            this.recordMap = recordMap;
            this.mergePolicyProvider = mergePolicyProvider;
            this.logger = nodeEngine.getLogger(CacheService.class);
        }

        @Override
        public void run() {
            final Semaphore semaphore = new Semaphore(0);
            int recordCount = 0;
            ExecutionCallback mergeCallback = new ExecutionCallback(){

                public void onResponse(Object response) {
                    semaphore.release(1);
                }

                @Override
                public void onFailure(Throwable t) {
                    CacheMerger.this.logger.warning("Error while running merge operation: " + t.getMessage());
                    semaphore.release(1);
                }
            };
            SerializationService serializationService = this.nodeEngine.getSerializationService();
            for (Map.Entry<String, Map<Data, CacheRecord>> recordMapEntry : this.recordMap.entrySet()) {
                String cacheName = recordMapEntry.getKey();
                CacheConfig cacheConfig = this.configs.get(cacheName);
                Map<Data, CacheRecord> records = recordMapEntry.getValue();
                String mergePolicyName = cacheConfig.getMergePolicy();
                CacheMergePolicy cacheMergePolicy = this.mergePolicyProvider.getMergePolicy(mergePolicyName);
                for (Map.Entry<Data, CacheRecord> recordEntry : records.entrySet()) {
                    Data key = recordEntry.getKey();
                    CacheRecord record = recordEntry.getValue();
                    ++recordCount;
                    DefaultCacheEntryView entryView = new DefaultCacheEntryView(key, (Data)serializationService.toData(record.getValue()), record.getCreationTime(), record.getExpirationTime(), record.getLastAccessTime(), record.getAccessHit());
                    CacheMergeOperation operation = new CacheMergeOperation(cacheName, key, entryView, cacheMergePolicy);
                    try {
                        int partitionId = this.nodeEngine.getPartitionService().getPartitionId(key);
                        InternalCompletableFuture f = this.nodeEngine.getOperationService().invokeOnPartition("hz:impl:cacheService", operation, partitionId);
                        f.andThen(mergeCallback);
                    }
                    catch (Throwable t) {
                        throw ExceptionUtil.rethrow(t);
                    }
                }
            }
            try {
                semaphore.tryAcquire(recordCount, recordCount * 500, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                this.logger.finest("Interrupted while waiting merge operation...");
            }
        }
    }
}

