/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.topic.impl.reliable;

import com.hazelcast.cluster.ClusterService;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.Message;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.serialization.SerializationService;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.StaleSequenceException;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
import com.hazelcast.topic.ReliableMessageListener;
import com.hazelcast.topic.impl.reliable.ReliableTopicMessage;
import com.hazelcast.topic.impl.reliable.ReliableTopicProxy;
import java.util.Iterator;

class ReliableMessageListenerRunner<E>
implements ExecutionCallback<ReadResultSet<ReliableTopicMessage>> {
    final ReliableMessageListener<E> listener;
    private final Ringbuffer<ReliableTopicMessage> ringbuffer;
    private final String topicName;
    private final SerializationService serializationService;
    private final ClusterService clusterService;
    private final ILogger logger;
    private final String id;
    private final ReliableTopicProxy<E> proxy;
    private long sequence;
    private volatile boolean cancelled;
    private final int batchSze;

    public ReliableMessageListenerRunner(String id, ReliableMessageListener<E> listener, ReliableTopicProxy<E> proxy) {
        this.id = id;
        this.listener = listener;
        this.proxy = proxy;
        this.ringbuffer = proxy.ringbuffer;
        this.topicName = proxy.getName();
        NodeEngine nodeEngine = proxy.getNodeEngine();
        this.serializationService = nodeEngine.getSerializationService();
        this.clusterService = nodeEngine.getClusterService();
        this.logger = nodeEngine.getLogger(ReliableMessageListenerRunner.class);
        this.batchSze = proxy.topicConfig.getReadBatchSize();
        long initialSequence = listener.retrieveInitialSequence();
        if (initialSequence == -1L) {
            initialSequence = this.ringbuffer.tailSequence() + 1L;
        }
        this.sequence = initialSequence;
    }

    void next() {
        if (this.cancelled) {
            return;
        }
        ICompletableFuture<ReadResultSet<ReliableTopicMessage>> f = this.ringbuffer.readManyAsync(this.sequence, 1, this.batchSze, null);
        f.andThen(this, this.proxy.executor);
    }

    @Override
    public void onResponse(ReadResultSet<ReliableTopicMessage> result) {
        Iterator i$ = result.iterator();
        while (i$.hasNext()) {
            block4: {
                ReliableTopicMessage item;
                ReliableTopicMessage message = item = (ReliableTopicMessage)i$.next();
                if (this.cancelled) {
                    return;
                }
                try {
                    this.listener.storeSequence(this.sequence);
                    this.process(message);
                }
                catch (Throwable t) {
                    if (!this.terminate(t)) break block4;
                    this.cancel();
                    return;
                }
            }
            ++this.sequence;
        }
        this.next();
    }

    private void process(ReliableTopicMessage message) throws Throwable {
        this.proxy.localTopicStats.incrementReceives();
        this.listener.onMessage(this.toMessage(message));
    }

    private Message<E> toMessage(ReliableTopicMessage m) {
        MemberImpl member = this.clusterService.getMember(m.getPublisherAddress());
        Object payload = this.serializationService.toObject(m.getPayload());
        return new Message(this.topicName, payload, m.getPublishTime(), member);
    }

    @Override
    public void onFailure(Throwable t) {
        if (this.cancelled) {
            return;
        }
        if (t instanceof StaleSequenceException) {
            StaleSequenceException staleSequenceException = (StaleSequenceException)t;
            if (this.listener.isLossTolerant()) {
                if (this.logger.isFinestEnabled()) {
                    this.logger.finest("MessageListener " + this.listener + " on topic: " + this.topicName + " ran into a stale sequence. " + "Jumping from oldSequence: " + this.sequence + " to sequence: " + staleSequenceException.getHeadSeq());
                }
                this.sequence = staleSequenceException.getHeadSeq();
                this.next();
                return;
            }
            this.logger.warning("Terminating MessageListener:" + this.listener + " on topic: " + this.topicName + ". " + "Reason: The listener was too slow or the retention period of the message has been violated. " + "head: " + staleSequenceException.getHeadSeq() + " sequence:" + this.sequence);
        } else if (t instanceof HazelcastInstanceNotActiveException) {
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ". " + " Reason: HazelcastInstance is shutting down");
            }
        } else if (t instanceof DistributedObjectDestroyedException) {
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ". " + "Reason: Topic is destroyed");
            }
        } else {
            this.logger.warning("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ". " + "Reason: Unhandled exception, message: " + t.getMessage(), t);
        }
        this.cancel();
    }

    void cancel() {
        this.cancelled = true;
        this.proxy.runnersMap.remove(this.id);
    }

    private boolean terminate(Throwable failure) {
        if (this.cancelled) {
            return true;
        }
        try {
            boolean terminate = this.listener.isTerminal(failure);
            if (terminate) {
                this.logger.warning("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ". " + "Reason: Unhandled exception, message: " + failure.getMessage(), failure);
            } else if (this.logger.isFinestEnabled()) {
                this.logger.finest("MessageListener " + this.listener + " on topic: " + this.topicName + " ran into an exception:" + " message:" + failure.getMessage(), failure);
            }
            return terminate;
        }
        catch (Throwable t) {
            this.logger.warning("Terminating messageListener:" + this.listener + " on topic: " + this.topicName + ". " + "Reason: Unhandled exception while calling ReliableMessageListener.isTerminal() method", t);
            return true;
        }
    }
}

