/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.mapreduce.impl.task;

import com.hazelcast.mapreduce.Combiner;
import com.hazelcast.mapreduce.CombinerFactory;
import com.hazelcast.mapreduce.Context;
import com.hazelcast.mapreduce.impl.CombinerResultList;
import com.hazelcast.mapreduce.impl.HashMapAdapter;
import com.hazelcast.mapreduce.impl.MapReduceUtil;
import com.hazelcast.mapreduce.impl.task.MapCombineTask;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class DefaultContext<KeyIn, ValueIn>
implements Context<KeyIn, ValueIn> {
    private static final AtomicIntegerFieldUpdater<DefaultContext> COLLECTED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultContext.class, "collected");
    private final ConcurrentMap<KeyIn, Combiner<ValueIn, ?>> combiners = new ConcurrentHashMap();
    private final CombinerFactory<KeyIn, ValueIn, ?> combinerFactory;
    private final MapCombineTask mapCombineTask;
    private volatile int collected;
    private volatile int partitionId;

    protected DefaultContext(CombinerFactory<KeyIn, ValueIn, ?> combinerFactory, MapCombineTask mapCombineTask) {
        this.mapCombineTask = mapCombineTask;
        this.combinerFactory = combinerFactory != null ? combinerFactory : new CollectingCombinerFactory();
    }

    public void setPartitionId(int partitionId) {
        this.partitionId = partitionId;
    }

    @Override
    public void emit(KeyIn key, ValueIn value) {
        Combiner<ValueIn, ?> combiner = this.getOrCreateCombiner(key);
        combiner.combine(value);
        COLLECTED_UPDATER.incrementAndGet(this);
        this.mapCombineTask.onEmit(this, this.partitionId);
    }

    public <Chunk> Map<KeyIn, Chunk> requestChunk() {
        int mapSize = MapReduceUtil.mapSize(this.combiners.size());
        HashMapAdapter chunkMap = new HashMapAdapter(mapSize);
        for (Map.Entry entry : this.combiners.entrySet()) {
            Combiner combiner = (Combiner)entry.getValue();
            Object chunk = combiner.finalizeChunk();
            combiner.reset();
            if (chunk == null) continue;
            chunkMap.put(entry.getKey(), chunk);
        }
        COLLECTED_UPDATER.set(this, 0);
        return chunkMap;
    }

    public int getCollected() {
        return this.collected;
    }

    public <Chunk> Map<KeyIn, Chunk> finish() {
        for (Combiner combiner : this.combiners.values()) {
            combiner.finalizeCombine();
        }
        return this.requestChunk();
    }

    public Combiner<ValueIn, ?> getOrCreateCombiner(KeyIn key) {
        Combiner<ValueIn, ?> combiner = (Combiner<ValueIn, ?>)this.combiners.get(key);
        if (combiner == null) {
            combiner = this.combinerFactory.newCombiner(key);
            combiner.beginCombine();
            Combiner<ValueIn, ?> temp = this.combiners.putIfAbsent(key, combiner);
            if (temp != null) {
                combiner = temp;
            }
        }
        return combiner;
    }

    private static class CollectingCombinerFactory<KeyIn, ValueIn>
    implements CombinerFactory<KeyIn, ValueIn, List<ValueIn>> {
        private CollectingCombinerFactory() {
        }

        @Override
        public Combiner<ValueIn, List<ValueIn>> newCombiner(KeyIn key) {
            return new Combiner<ValueIn, List<ValueIn>>(){
                private final List<ValueIn> values = new ArrayList();

                @Override
                public void combine(ValueIn value) {
                    this.values.add(value);
                }

                @Override
                public List<ValueIn> finalizeChunk() {
                    return new CombinerResultList(this.values);
                }

                @Override
                public void reset() {
                    this.values.clear();
                }
            };
        }
    }
}

