/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.spi.impl;

import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.client.connection.ClientConnectionManager;
import com.hazelcast.client.connection.nio.ClientConnection;
import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.client.ClientRequest;
import com.hazelcast.client.impl.client.ClientResponse;
import com.hazelcast.client.impl.client.RemoveAllListeners;
import com.hazelcast.client.spi.ClientExecutionService;
import com.hazelcast.client.spi.ClientInvocationService;
import com.hazelcast.client.spi.ClientPartitionService;
import com.hazelcast.client.spi.EventHandler;
import com.hazelcast.client.spi.impl.ClientExecutionServiceImpl;
import com.hazelcast.client.spi.impl.ClientInvocation;
import com.hazelcast.client.spi.impl.ConnectionHeartbeatListener;
import com.hazelcast.cluster.client.ClientPingRequest;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.instance.OutOfMemoryErrorDispatcher;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.ConnectionListener;
import com.hazelcast.nio.Packet;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.SerializationService;
import com.hazelcast.spi.exception.TargetDisconnectedException;
import com.hazelcast.util.ConstructorFunction;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

abstract class ClientInvocationServiceSupport
implements ClientInvocationService,
ConnectionHeartbeatListener,
ConnectionListener {
    private static final int WAIT_TIME_FOR_PACKETS_TO_BE_CONSUMED = 10;
    private static final int WAIT_TIME_FOR_PACKETS_TO_BE_CONSUMED_THRESHOLD = 5000;
    protected final HazelcastClientInstanceImpl client;
    protected final ClientConnectionManager connectionManager;
    protected final ClientPartitionService partitionService;
    protected final ClientExecutionService executionService;
    private final ILogger logger = Logger.getLogger(ClientInvocationService.class);
    private final ResponseThread responseThread;
    private final ConcurrentMap<Integer, ClientInvocation> callIdMap = new ConcurrentHashMap<Integer, ClientInvocation>();
    private final ConcurrentMap<Integer, ClientInvocation> eventHandlerMap = new ConcurrentHashMap<Integer, ClientInvocation>();
    private final AtomicInteger callIdIncrementer = new AtomicInteger();
    private volatile boolean isShutdown;

    public ClientInvocationServiceSupport(HazelcastClientInstanceImpl client) {
        this.client = client;
        this.connectionManager = client.getConnectionManager();
        this.executionService = client.getClientExecutionService();
        this.connectionManager.addConnectionListener(this);
        this.connectionManager.addConnectionHeartbeatListener(this);
        this.partitionService = client.getClientPartitionService();
        this.responseThread = new ResponseThread(client.getThreadGroup(), client.getName() + ".response-", client.getClientConfig().getClassLoader());
        this.responseThread.start();
    }

    @Override
    public <T> ICompletableFuture<T> invokeOnTarget(ClientRequest request, Address target) throws Exception {
        return new ClientInvocation(this.client, request, target).invoke();
    }

    @Override
    public boolean isRedoOperation() {
        return this.client.getClientConfig().getNetworkConfig().isRedoOperation();
    }

    protected void send(ClientInvocation invocation, ClientConnection connection) throws IOException {
        if (this.isShutdown) {
            throw new HazelcastClientNotActiveException("Client is shut down");
        }
        this.registerInvocation(invocation);
        SerializationService ss = this.client.getSerializationService();
        Object data = ss.toData(invocation.getRequest());
        Packet packet = new Packet((Data)data, invocation.getPartitionId());
        if (!this.isAllowedToSendRequest(connection, invocation.getRequest()) || !connection.write(packet)) {
            int callId = invocation.getRequest().getCallId();
            this.deRegisterCallId(callId);
            this.deRegisterEventHandler(callId);
            throw new IOException("Packet not send to " + connection.getRemoteEndpoint());
        }
        invocation.setSendConnection(connection);
    }

    private boolean isAllowedToSendRequest(ClientConnection connection, ClientRequest request) {
        if (!connection.isHeartBeating()) {
            if (request instanceof ClientPingRequest || request instanceof RemoveAllListeners) {
                return true;
            }
            if (this.logger.isFinestEnabled()) {
                this.logger.warning("Connection is not heart-beating, won't write request -> " + request);
            }
            return false;
        }
        return true;
    }

    private void registerInvocation(ClientInvocation clientInvocation) {
        int callId = this.newCallId();
        clientInvocation.getRequest().setCallId(callId);
        this.callIdMap.put(callId, clientInvocation);
        if (clientInvocation.getHandler() != null) {
            this.eventHandlerMap.put(callId, clientInvocation);
        }
    }

    private ClientInvocation deRegisterCallId(int callId) {
        return (ClientInvocation)this.callIdMap.remove(callId);
    }

    private ClientInvocation deRegisterEventHandler(int callId) {
        return (ClientInvocation)this.eventHandlerMap.remove(callId);
    }

    @Override
    public EventHandler getEventHandler(int callId) {
        ClientInvocation clientInvocation = (ClientInvocation)this.eventHandlerMap.get(callId);
        if (clientInvocation == null) {
            return null;
        }
        return clientInvocation.getHandler();
    }

    @Override
    public boolean removeEventHandler(Integer callId) {
        if (callId != null) {
            return this.eventHandlerMap.remove(callId) != null;
        }
        return false;
    }

    public void cleanResources(ConstructorFunction<Object, Throwable> responseCtor, ClientConnection connection) {
        ClientInvocation invocation;
        Iterator iter = this.callIdMap.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry entry = iter.next();
            invocation = (ClientInvocation)entry.getValue();
            if (!connection.equals(invocation.getSendConnection())) continue;
            iter.remove();
            invocation.notify(responseCtor.createNew(null));
            this.eventHandlerMap.remove(entry.getKey());
        }
        Iterator iterator = this.eventHandlerMap.values().iterator();
        while (iterator.hasNext()) {
            invocation = (ClientInvocation)iterator.next();
            if (!connection.equals(invocation.getSendConnection())) continue;
            iterator.remove();
            invocation.notify(responseCtor.createNew(null));
        }
    }

    @Override
    public void heartBeatStarted(Connection connection) {
    }

    @Override
    public void heartBeatStopped(Connection connection) {
        RemoveAllListeners request = new RemoveAllListeners();
        new ClientInvocation(this.client, (ClientRequest)request, connection).invoke();
        Address remoteEndpoint = connection.getEndPoint();
        Iterator iterator = this.eventHandlerMap.values().iterator();
        TargetDisconnectedException response = new TargetDisconnectedException(remoteEndpoint);
        while (iterator.hasNext()) {
            ClientInvocation clientInvocation = (ClientInvocation)iterator.next();
            if (!clientInvocation.getSendConnection().equals(connection)) continue;
            iterator.remove();
            clientInvocation.notify(response);
        }
    }

    @Override
    public void connectionAdded(Connection connection) {
    }

    @Override
    public void connectionRemoved(Connection connection) {
        this.cleanConnectionResources((ClientConnection)connection);
    }

    @Override
    public void cleanConnectionResources(ClientConnection connection) {
        if (this.connectionManager.isAlive()) {
            try {
                ((ClientExecutionServiceImpl)this.executionService).executeInternal(new CleanResourcesTask(connection));
            }
            catch (RejectedExecutionException e) {
                this.logger.warning("Execution rejected ", e);
            }
        } else {
            this.cleanResources(new ConstructorFunction<Object, Throwable>(){

                @Override
                public Throwable createNew(Object arg) {
                    return new HazelcastClientNotActiveException("Client is shutting down!");
                }
            }, connection);
        }
    }

    public boolean isShutdown() {
        return this.isShutdown;
    }

    @Override
    public void shutdown() {
        this.isShutdown = true;
        this.responseThread.interrupt();
    }

    @Override
    public void handlePacket(Packet packet) {
        this.responseThread.workQueue.add(packet);
    }

    private int newCallId() {
        return this.callIdIncrementer.incrementAndGet();
    }

    private class ResponseThread
    extends Thread {
        private final BlockingQueue<Packet> workQueue;

        public ResponseThread(ThreadGroup threadGroup, String name, ClassLoader classLoader) {
            super(threadGroup, name);
            this.workQueue = new LinkedBlockingQueue<Packet>();
            this.setContextClassLoader(classLoader);
        }

        @Override
        public void run() {
            try {
                this.doRun();
            }
            catch (OutOfMemoryError e) {
                OutOfMemoryErrorDispatcher.onOutOfMemory(e);
            }
            catch (Throwable t) {
                ClientInvocationServiceSupport.this.logger.severe(t);
            }
        }

        private void doRun() {
            while (true) {
                Packet task;
                try {
                    task = this.workQueue.take();
                }
                catch (InterruptedException e) {
                    if (!ClientInvocationServiceSupport.this.isShutdown) continue;
                    return;
                }
                if (ClientInvocationServiceSupport.this.isShutdown) {
                    return;
                }
                this.process(task);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void process(Packet packet) {
            ClientConnection conn = (ClientConnection)packet.getConn();
            try {
                ClientResponse clientResponse = (ClientResponse)ClientInvocationServiceSupport.this.client.getSerializationService().toObject(packet.getData());
                int callId = clientResponse.getCallId();
                Data response = clientResponse.getResponse();
                this.handlePacket(response, clientResponse.isError(), callId);
            }
            catch (Exception e) {
                ClientInvocationServiceSupport.this.logger.severe("Failed to process task: " + packet + " on responseThread :" + this.getName());
            }
            finally {
                conn.decrementPendingPacketCount();
            }
        }

        private void handlePacket(Object response, boolean isError, int callId) {
            ClientInvocation future = ClientInvocationServiceSupport.this.deRegisterCallId(callId);
            if (future == null) {
                ClientInvocationServiceSupport.this.logger.warning("No call for callId: " + callId + ", response: " + response);
                return;
            }
            if (isError) {
                response = ClientInvocationServiceSupport.this.client.getSerializationService().toObject(response);
            }
            future.notify(response);
        }
    }

    private class CleanResourcesTask
    implements Runnable {
        private final ClientConnection connection;

        CleanResourcesTask(ClientConnection connection) {
            this.connection = connection;
        }

        @Override
        public void run() {
            this.waitForPacketsProcessed();
            ClientInvocationServiceSupport.this.cleanResources(new ConstructorFunction<Object, Throwable>(){

                @Override
                public Throwable createNew(Object arg) {
                    return new TargetDisconnectedException(CleanResourcesTask.this.connection.getRemoteEndpoint());
                }
            }, this.connection);
        }

        private void waitForPacketsProcessed() {
            long begin = System.currentTimeMillis();
            int count = this.connection.getPendingPacketCount();
            while (count != 0) {
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException e) {
                    ClientInvocationServiceSupport.this.logger.warning(e);
                    break;
                }
                long elapsed = System.currentTimeMillis() - begin;
                if (elapsed > 5000L) {
                    ClientInvocationServiceSupport.this.logger.warning("There are packets which are not processed " + count);
                    break;
                }
                count = this.connection.getPendingPacketCount();
            }
        }
    }
}

