/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.spi.impl.eventservice.impl;

import com.hazelcast.instance.GroupProperties;
import com.hazelcast.instance.HazelcastThreadGroup;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.Packet;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.EventFilter;
import com.hazelcast.spi.EventRegistration;
import com.hazelcast.spi.EventService;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.eventservice.InternalEventService;
import com.hazelcast.spi.impl.eventservice.impl.EmptyFilter;
import com.hazelcast.spi.impl.eventservice.impl.EventPacket;
import com.hazelcast.spi.impl.eventservice.impl.EventServiceSegment;
import com.hazelcast.spi.impl.eventservice.impl.FutureUtilExceptionHandler;
import com.hazelcast.spi.impl.eventservice.impl.LocalEventDispatcher;
import com.hazelcast.spi.impl.eventservice.impl.Registration;
import com.hazelcast.spi.impl.eventservice.impl.RemoteEventPacketProcessor;
import com.hazelcast.spi.impl.eventservice.impl.operations.DeregistrationOperation;
import com.hazelcast.spi.impl.eventservice.impl.operations.PostJoinRegistrationOperation;
import com.hazelcast.spi.impl.eventservice.impl.operations.RegistrationOperation;
import com.hazelcast.spi.impl.eventservice.impl.operations.SendEventOperation;
import com.hazelcast.spi.impl.operationservice.InternalOperationService;
import com.hazelcast.util.ConcurrencyUtil;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.util.EmptyStatement;
import com.hazelcast.util.FutureUtil;
import com.hazelcast.util.executor.StripedExecutor;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;

public class EventServiceImpl
implements InternalEventService {
    private static final EventRegistration[] EMPTY_REGISTRATIONS = new EventRegistration[0];
    private static final int EVENT_SYNC_FREQUENCY = 100000;
    private static final int SEND_RETRY_COUNT = 50;
    private static final int SEND_EVENT_TIMEOUT_SECONDS = 5;
    private static final int REGISTRATION_TIMEOUT_SECONDS = 5;
    private static final int DEREGISTER_TIMEOUT_SECONDS = 5;
    private static final int WARNING_LOG_FREQUENCY = 1000;
    final ILogger logger;
    final NodeEngineImpl nodeEngine;
    private final FutureUtil.ExceptionHandler registrationExceptionHandler;
    private final FutureUtil.ExceptionHandler deregistrationExceptionHandler;
    private final ConcurrentMap<String, EventServiceSegment> segments;
    private final StripedExecutor eventExecutor;
    private final int eventQueueTimeoutMs;
    private final int eventThreadCount;
    private final int eventQueueCapacity;
    private final AtomicLong totalFailures = new AtomicLong();

    public EventServiceImpl(NodeEngineImpl nodeEngine) {
        this.nodeEngine = nodeEngine;
        this.logger = nodeEngine.getLogger(EventService.class.getName());
        Node node = nodeEngine.getNode();
        GroupProperties groupProperties = node.getGroupProperties();
        this.eventThreadCount = groupProperties.EVENT_THREAD_COUNT.getInteger();
        this.eventQueueCapacity = groupProperties.EVENT_QUEUE_CAPACITY.getInteger();
        this.eventQueueTimeoutMs = groupProperties.EVENT_QUEUE_TIMEOUT_MILLIS.getInteger();
        HazelcastThreadGroup threadGroup = node.getHazelcastThreadGroup();
        this.eventExecutor = new StripedExecutor(node.getLogger(EventServiceImpl.class), threadGroup.getThreadNamePrefix("event"), threadGroup.getInternalThreadGroup(), this.eventThreadCount, this.eventQueueCapacity);
        this.registrationExceptionHandler = new FutureUtilExceptionHandler(this.logger, "Member left while registering listener...");
        this.deregistrationExceptionHandler = new FutureUtilExceptionHandler(this.logger, "Member left while de-registering listener...");
        this.segments = new ConcurrentHashMap<String, EventServiceSegment>();
    }

    @Override
    public void close(EventRegistration eventRegistration) {
        Registration registration = (Registration)eventRegistration;
        Object listener = registration.getListener();
        if (!(listener instanceof Closeable)) {
            return;
        }
        try {
            ((Closeable)listener).close();
        }
        catch (IOException e) {
            EmptyStatement.ignore(e);
        }
    }

    @Override
    public int getEventThreadCount() {
        return this.eventThreadCount;
    }

    @Override
    public int getEventQueueCapacity() {
        return this.eventQueueCapacity;
    }

    @Override
    public int getEventQueueSize() {
        return this.eventExecutor.getWorkQueueSize();
    }

    @Override
    public EventRegistration registerLocalListener(String serviceName, String topic, Object listener) {
        return this.registerListenerInternal(serviceName, topic, new EmptyFilter(), listener, true);
    }

    @Override
    public EventRegistration registerLocalListener(String serviceName, String topic, EventFilter filter, Object listener) {
        return this.registerListenerInternal(serviceName, topic, filter, listener, true);
    }

    @Override
    public EventRegistration registerListener(String serviceName, String topic, Object listener) {
        return this.registerListenerInternal(serviceName, topic, new EmptyFilter(), listener, false);
    }

    @Override
    public EventRegistration registerListener(String serviceName, String topic, EventFilter filter, Object listener) {
        return this.registerListenerInternal(serviceName, topic, filter, listener, false);
    }

    private EventRegistration registerListenerInternal(String serviceName, String topic, EventFilter filter, Object listener, boolean localOnly) {
        Registration reg;
        if (listener == null) {
            throw new IllegalArgumentException("Listener required!");
        }
        if (filter == null) {
            throw new IllegalArgumentException("EventFilter required!");
        }
        EventServiceSegment segment = this.getSegment(serviceName, true);
        if (!segment.addRegistration(topic, reg = new Registration(UUID.randomUUID().toString(), serviceName, topic, filter, this.nodeEngine.getThisAddress(), listener, localOnly))) {
            return null;
        }
        if (!localOnly) {
            this.invokeRegistrationOnOtherNodes(serviceName, reg);
        }
        return reg;
    }

    public boolean handleRegistration(Registration reg) {
        if (this.nodeEngine.getThisAddress().equals(reg.getSubscriber())) {
            return false;
        }
        EventServiceSegment segment = this.getSegment(reg.getServiceName(), true);
        return segment.addRegistration(reg.getTopic(), reg);
    }

    @Override
    public boolean deregisterListener(String serviceName, String topic, Object id) {
        EventServiceSegment segment = this.getSegment(serviceName, false);
        if (segment != null) {
            Registration reg = segment.removeRegistration(topic, String.valueOf(id));
            if (reg != null && !reg.isLocalOnly()) {
                this.invokeDeregistrationOnOtherNodes(serviceName, topic, String.valueOf(id));
            }
            return reg != null;
        }
        return false;
    }

    @Override
    public void deregisterAllListeners(String serviceName, String topic) {
        EventServiceSegment segment = this.getSegment(serviceName, false);
        if (segment != null) {
            segment.removeRegistrations(topic);
        }
    }

    private void invokeRegistrationOnOtherNodes(String serviceName, Registration reg) {
        InternalOperationService operationService = this.nodeEngine.getOperationService();
        Collection<MemberImpl> members = this.nodeEngine.getClusterService().getMemberList();
        ArrayList<Future> calls = new ArrayList<Future>(members.size());
        for (MemberImpl member : members) {
            if (member.localMember()) continue;
            RegistrationOperation operation = new RegistrationOperation(reg);
            InternalCompletableFuture f = operationService.invokeOnTarget(serviceName, operation, member.getAddress());
            calls.add(f);
        }
        FutureUtil.waitWithDeadline(calls, 5L, TimeUnit.SECONDS, this.registrationExceptionHandler);
    }

    private void invokeDeregistrationOnOtherNodes(String serviceName, String topic, String id) {
        InternalOperationService operationService = this.nodeEngine.getOperationService();
        Collection<MemberImpl> members = this.nodeEngine.getClusterService().getMemberList();
        ArrayList<Future> calls = new ArrayList<Future>(members.size());
        for (MemberImpl member : members) {
            if (member.localMember()) continue;
            DeregistrationOperation operation = new DeregistrationOperation(topic, id);
            InternalCompletableFuture f = operationService.invokeOnTarget(serviceName, operation, member.getAddress());
            calls.add(f);
        }
        FutureUtil.waitWithDeadline(calls, 5L, TimeUnit.SECONDS, this.deregistrationExceptionHandler);
    }

    @Override
    public EventRegistration[] getRegistrationsAsArray(String serviceName, String topic) {
        EventServiceSegment segment = this.getSegment(serviceName, false);
        if (segment != null) {
            Collection<Registration> registrations = segment.getRegistrations(topic, false);
            if (registrations == null || registrations.isEmpty()) {
                return EMPTY_REGISTRATIONS;
            }
            return registrations.toArray(new Registration[registrations.size()]);
        }
        return EMPTY_REGISTRATIONS;
    }

    @Override
    public Collection<EventRegistration> getRegistrations(String serviceName, String topic) {
        EventServiceSegment segment = this.getSegment(serviceName, false);
        if (segment != null) {
            Collection<Registration> registrations = segment.getRegistrations(topic, false);
            if (registrations == null || registrations.isEmpty()) {
                return Collections.emptySet();
            }
            return Collections.unmodifiableCollection(registrations);
        }
        return Collections.emptySet();
    }

    @Override
    public boolean hasEventRegistration(String serviceName, String topic) {
        EventServiceSegment segment = this.getSegment(serviceName, false);
        if (segment != null) {
            return segment.hasRegistration(topic);
        }
        return false;
    }

    @Override
    public void publishEvent(String serviceName, String topic, Object event, int orderKey) {
        Collection<EventRegistration> registrations = this.getRegistrations(serviceName, topic);
        this.publishEvent(serviceName, registrations, event, orderKey);
    }

    @Override
    public void publishEvent(String serviceName, EventRegistration registration, Object event, int orderKey) {
        if (!(registration instanceof Registration)) {
            throw new IllegalArgumentException();
        }
        if (this.isLocal(registration)) {
            this.executeLocal(serviceName, event, registration, orderKey);
        } else {
            Address subscriber = registration.getSubscriber();
            this.sendEventPacket(subscriber, new EventPacket(registration.getId(), serviceName, event), orderKey);
        }
    }

    @Override
    public void publishEvent(String serviceName, Collection<EventRegistration> registrations, Object event, int orderKey) {
        Data eventData = null;
        for (EventRegistration registration : registrations) {
            if (!(registration instanceof Registration)) {
                throw new IllegalArgumentException();
            }
            if (this.isLocal(registration)) {
                this.executeLocal(serviceName, event, registration, orderKey);
                continue;
            }
            if (eventData == null) {
                eventData = this.nodeEngine.toData(event);
            }
            EventPacket eventPacket = new EventPacket(registration.getId(), serviceName, eventData);
            this.sendEventPacket(registration.getSubscriber(), eventPacket, orderKey);
        }
    }

    @Override
    public void publishRemoteEvent(String serviceName, Collection<EventRegistration> registrations, Object event, int orderKey) {
        if (registrations.isEmpty()) {
            return;
        }
        Data eventData = this.nodeEngine.toData(event);
        for (EventRegistration registration : registrations) {
            if (!(registration instanceof Registration)) {
                throw new IllegalArgumentException();
            }
            if (this.isLocal(registration)) continue;
            EventPacket eventPacket = new EventPacket(registration.getId(), serviceName, eventData);
            this.sendEventPacket(registration.getSubscriber(), eventPacket, orderKey);
        }
    }

    private void executeLocal(String serviceName, Object event, EventRegistration registration, int orderKey) {
        block5: {
            if (this.nodeEngine.isActive()) {
                Registration reg = (Registration)registration;
                try {
                    if (reg.getListener() != null) {
                        this.eventExecutor.execute(new LocalEventDispatcher(this, serviceName, event, reg.getListener(), orderKey, this.eventQueueTimeoutMs));
                    } else {
                        this.logger.warning("Something seems wrong! Listener instance is null! -> " + reg);
                    }
                }
                catch (RejectedExecutionException e) {
                    if (!this.eventExecutor.isLive()) break block5;
                    this.logFailure("EventQueue overloaded! %s failed to publish to %s:%s", event, reg.getServiceName(), reg.getTopic());
                }
            }
        }
    }

    private void sendEventPacket(Address subscriber, EventPacket eventPacket, int orderKey) {
        boolean sync;
        String serviceName = eventPacket.getServiceName();
        EventServiceSegment segment = this.getSegment(serviceName, true);
        boolean bl = sync = segment.incrementPublish() % 100000L == 0L;
        if (sync) {
            SendEventOperation op = new SendEventOperation(eventPacket, orderKey);
            InternalCompletableFuture f = this.nodeEngine.getOperationService().createInvocationBuilder(serviceName, (Operation)op, subscriber).setTryCount(50).invoke();
            try {
                f.get(5L, TimeUnit.SECONDS);
            }
            catch (Exception ignored) {
                EmptyStatement.ignore(ignored);
            }
        } else {
            Packet packet = new Packet(this.nodeEngine.toData(eventPacket), orderKey);
            packet.setHeader(2);
            if (!this.nodeEngine.getPacketTransceiver().transmit(packet, subscriber) && this.nodeEngine.isActive()) {
                this.logFailure("IO Queue overloaded! Failed to send event packet to: %s", subscriber);
            }
        }
    }

    public EventServiceSegment getSegment(String service, boolean forceCreate) {
        EventServiceSegment segment = (EventServiceSegment)this.segments.get(service);
        if (segment == null && forceCreate) {
            ConstructorFunction<String, EventServiceSegment> func = new ConstructorFunction<String, EventServiceSegment>(){

                @Override
                public EventServiceSegment createNew(String key) {
                    return new EventServiceSegment(key, EventServiceImpl.this.nodeEngine.getService(key));
                }
            };
            return ConcurrencyUtil.getOrPutIfAbsent(this.segments, service, func);
        }
        return segment;
    }

    boolean isLocal(EventRegistration reg) {
        return this.nodeEngine.getThisAddress().equals(reg.getSubscriber());
    }

    @Override
    public void executeEventCallback(Runnable callback) {
        block3: {
            if (this.nodeEngine.isActive()) {
                try {
                    this.eventExecutor.execute(callback);
                }
                catch (RejectedExecutionException e) {
                    if (!this.eventExecutor.isLive()) break block3;
                    this.logFailure("EventQueue overloaded! Failed to execute event callback: %s", callback);
                }
            }
        }
    }

    @Override
    public void handleEvent(Packet packet) {
        block2: {
            try {
                this.eventExecutor.execute(new RemoteEventPacketProcessor(this, packet));
            }
            catch (RejectedExecutionException e) {
                if (!this.eventExecutor.isLive()) break block2;
                Connection conn = packet.getConn();
                String endpoint = conn.getEndPoint() != null ? conn.getEndPoint().toString() : conn.toString();
                this.logFailure("EventQueue overloaded! Failed to process event packet sent from: %s", endpoint);
            }
        }
    }

    public PostJoinRegistrationOperation getPostJoinOperation() {
        LinkedList<Registration> registrations = new LinkedList<Registration>();
        for (EventServiceSegment segment : this.segments.values()) {
            for (Registration reg : segment.getRegistrationIdMap().values()) {
                if (reg.isLocalOnly()) continue;
                registrations.add(reg);
            }
        }
        return registrations.isEmpty() ? null : new PostJoinRegistrationOperation(registrations);
    }

    public void shutdown() {
        this.logger.finest("Stopping event executor...");
        this.eventExecutor.shutdown();
        for (EventServiceSegment segment : this.segments.values()) {
            segment.clear();
        }
        this.segments.clear();
    }

    public void onMemberLeft(MemberImpl member) {
        Address address = member.getAddress();
        for (EventServiceSegment segment : this.segments.values()) {
            segment.onMemberLeft(address);
        }
    }

    private void logFailure(String message, Object ... args) {
        Level level;
        Level level2 = level = this.totalFailures.getAndIncrement() % 1000L == 0L ? Level.WARNING : Level.FINEST;
        if (this.logger.isLoggable(level)) {
            this.logger.log(level, String.format(message, args));
        }
    }
}

