/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.internal.server.tcp;

import com.hazelcast.cluster.Address;
import com.hazelcast.config.EndpointConfig;
import com.hazelcast.instance.EndpointQualifier;
import com.hazelcast.instance.ProtocolType;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.metrics.ProbeUnit;
import com.hazelcast.internal.networking.Channel;
import com.hazelcast.internal.networking.ChannelInitializer;
import com.hazelcast.internal.networking.Networking;
import com.hazelcast.internal.nio.ConnectionListener;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.nio.Packet;
import com.hazelcast.internal.server.NetworkStats;
import com.hazelcast.internal.server.ServerConnection;
import com.hazelcast.internal.server.ServerContext;
import com.hazelcast.internal.server.tcp.TcpServer;
import com.hazelcast.internal.server.tcp.TcpServerConnection;
import com.hazelcast.internal.server.tcp.TcpServerConnectionManagerBase;
import com.hazelcast.internal.server.tcp.TcpServerConnector;
import com.hazelcast.internal.server.tcp.TcpServerControl;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.executor.StripedRunnable;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;

public class TcpServerConnectionManager
extends TcpServerConnectionManagerBase
implements Consumer<Packet>,
DynamicMetricsProvider {
    private final Function<EndpointQualifier, ChannelInitializer> channelInitializerFn;
    private final TcpServerConnector connector;
    private final TcpServerControl serverControl;
    private final AtomicInteger connectionIdGen = new AtomicInteger();

    TcpServerConnectionManager(TcpServer server, EndpointConfig endpointConfig, Function<EndpointQualifier, ChannelInitializer> channelInitializerFn, ServerContext serverContext, Set<ProtocolType> supportedProtocolTypes) {
        super(server, endpointConfig);
        this.channelInitializerFn = channelInitializerFn;
        this.connector = new TcpServerConnector(this);
        this.serverControl = new TcpServerControl(this, serverContext, this.logger, supportedProtocolTypes);
    }

    @Override
    public TcpServer getServer() {
        return this.server;
    }

    @Override
    public Collection<ServerConnection> getConnections() {
        return Collections.unmodifiableSet(this.connections);
    }

    @Override
    public void addConnectionListener(ConnectionListener listener) {
        Preconditions.checkNotNull(listener, "listener can't be null");
        this.connectionListeners.add(listener);
    }

    @Override
    public synchronized void accept(Packet packet) {
        this.serverControl.process(packet);
    }

    @Override
    public ServerConnection get(Address address, int streamId) {
        return this.getPlane(streamId).getConnection(address);
    }

    @Override
    public ServerConnection getOrConnect(Address address, int streamId) {
        return this.getOrConnect(address, false, streamId);
    }

    @Override
    public ServerConnection getOrConnect(Address address, boolean silent, int streamId) {
        TcpServerConnectionManagerBase.Plane plane = this.getPlane(streamId);
        TcpServerConnection connection = plane.getConnection(address);
        if (connection == null && this.server.isLive()) {
            if (plane.addConnectionInProgress(address)) {
                if (this.logger.isFineEnabled()) {
                    this.logger.fine("Connection to: " + address + " streamId:" + streamId + " is not yet progress");
                }
                this.connector.asyncConnect(address, silent, plane.index);
            } else if (this.logger.isFineEnabled()) {
                this.logger.fine("Connection to: " + address + " streamId:" + streamId + " is already in progress");
            }
        }
        return connection;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized boolean register(final Address remoteAddress, ServerConnection c, int planeIndex) {
        TcpServerConnectionManagerBase.Plane plane = this.planes[planeIndex];
        final TcpServerConnection connection = (TcpServerConnection)c;
        try {
            if (remoteAddress.equals(this.serverContext.getThisAddress())) {
                boolean bl = false;
                return bl;
            }
            if (!connection.isAlive()) {
                if (this.logger.isFinestEnabled()) {
                    this.logger.finest(connection + " to " + remoteAddress + " is not registered since connection is not active.");
                }
                boolean bl = false;
                return bl;
            }
            Address currentRemoteAddress = connection.getRemoteAddress();
            if (currentRemoteAddress != null && !currentRemoteAddress.equals(remoteAddress)) {
                throw new IllegalArgumentException(connection + " has already a different remoteAddress than: " + remoteAddress);
            }
            connection.setRemoteAddress(remoteAddress);
            if (!connection.isClient()) {
                connection.setErrorHandler(this.getErrorHandler(remoteAddress, plane.index).reset());
            }
            plane.putConnection(remoteAddress, connection);
            this.serverContext.getEventService().executeEventCallback(new StripedRunnable(){

                @Override
                public void run() {
                    TcpServerConnectionManager.this.connectionListeners.forEach(listener -> listener.connectionAdded(connection));
                }

                @Override
                public int getKey() {
                    return remoteAddress.hashCode();
                }
            });
            boolean bl = true;
            return bl;
        }
        finally {
            plane.removeConnectionInProgress(remoteAddress);
        }
    }

    public TcpServerConnectionManagerBase.Plane getPlane(int streamId) {
        int planeIndex = streamId == -1 || streamId == Integer.MIN_VALUE ? 0 : Math.abs(streamId) % this.planeCount;
        return this.planes[planeIndex];
    }

    public synchronized void reset(boolean cleanListeners) {
        this.acceptedChannels.forEach(IOUtil::closeResource);
        for (TcpServerConnectionManagerBase.Plane plane2 : this.planes) {
            plane2.forEachConnection(conn -> IOUtil.close(conn, "TcpServer is stopping"));
            plane2.clearConnections();
        }
        this.connections.forEach(conn -> IOUtil.close(conn, "TcpServer is stopping"));
        this.acceptedChannels.clear();
        Arrays.stream(this.planes).forEach(plane -> plane.clearConnectionsInProgress());
        Arrays.stream(this.planes).forEach(plane -> plane.errorHandlers.clear());
        this.connections.clear();
        if (cleanListeners) {
            this.connectionListeners.clear();
        }
    }

    @Override
    public boolean transmit(Packet packet, Address target, int streamId) {
        Preconditions.checkNotNull(packet, "packet can't be null");
        Preconditions.checkNotNull(target, "target can't be null");
        return this.send(packet, target, null, streamId);
    }

    @Override
    public NetworkStats getNetworkStats() {
        return this.networkStats;
    }

    Channel newChannel(SocketChannel socketChannel, boolean clientMode) throws IOException {
        Networking networking = this.server.getNetworking();
        ChannelInitializer channelInitializer = this.channelInitializerFn.apply(this.endpointQualifier);
        assert (channelInitializer != null) : "Found NULL channel initializer for endpoint-qualifier " + this.endpointQualifier;
        Channel channel = networking.register(channelInitializer, socketChannel, clientMode);
        if (this.endpointConfig != null) {
            IOUtil.setChannelOptions(channel, this.endpointConfig);
        }
        this.acceptedChannels.add(channel);
        return channel;
    }

    void removeAcceptedChannel(Channel channel) {
        this.acceptedChannels.remove(channel);
    }

    void failedConnection(Address address, int planeIndex, Throwable t, boolean silent) {
        this.planes[planeIndex].removeConnectionInProgress(address);
        this.serverContext.onFailedConnection(address);
        if (!silent) {
            this.getErrorHandler(address, planeIndex).onError(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized TcpServerConnection newConnection(Channel channel, Address remoteAddress) {
        try {
            if (!this.server.isLive()) {
                throw new IllegalStateException("connection manager is not live!");
            }
            TcpServerConnection connection = new TcpServerConnection(this, this.connectionLifecycleListener, this.connectionIdGen.incrementAndGet(), channel);
            connection.setRemoteAddress(remoteAddress);
            this.connections.add(connection);
            if (this.logger.isFineEnabled()) {
                this.logger.fine("Established socket connection between " + channel.localSocketAddress() + " and " + channel.remoteSocketAddress());
            }
            this.openedCount.inc();
            channel.start();
            TcpServerConnection tcpServerConnection = connection;
            return tcpServerConnection;
        }
        finally {
            this.acceptedChannels.remove(channel);
        }
    }

    public String toString() {
        return "TcpServerConnectionManager{endpointQualifier=" + this.endpointQualifier + ", connectionsMap=" + null + '}';
    }

    @Probe(name="inProgressCount")
    private int connectionsInProgress() {
        int c = 0;
        for (TcpServerConnectionManagerBase.Plane plane : this.planes) {
            c += plane.connectionsInProgressCount();
        }
        return c;
    }

    @Override
    @Probe(name="count", level=ProbeLevel.MANDATORY)
    public int connectionCount() {
        int c = 0;
        for (TcpServerConnectionManagerBase.Plane plane : this.planes) {
            c += plane.connectionCount();
        }
        return c;
    }

    @Override
    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        MetricDescriptor rootDescriptor = descriptor.withPrefix("tcp.connection");
        if (this.endpointQualifier == null) {
            context.collect(rootDescriptor.copy(), this);
        } else {
            context.collect(rootDescriptor.copy().withDiscriminator("endpoint", this.endpointQualifier.toMetricsPrefixString()), this);
        }
        for (TcpServerConnection connection : this.connections) {
            if (connection.getRemoteAddress() == null) continue;
            context.collect(rootDescriptor.copy().withDiscriminator("endpoint", connection.getRemoteAddress().toString()), connection);
        }
        int clientCount = 0;
        int textCount = 0;
        for (TcpServerConnectionManagerBase.Plane plane : this.planes) {
            for (Map.Entry<Address, TcpServerConnection> entry : plane.connections()) {
                Address bindAddress = entry.getKey();
                TcpServerConnection connection = entry.getValue();
                if (connection.isClient()) {
                    ++clientCount;
                    String connectionType = connection.getConnectionType();
                    if ("REST".equals(connectionType) || "MEMCACHE".equals(connectionType)) {
                        ++textCount;
                    }
                }
                if (connection.getRemoteAddress() == null) continue;
                context.collect(rootDescriptor.copy().withDiscriminator("bindAddress", bindAddress.toString()).withTag("endpoint", connection.getRemoteAddress().toString()), connection);
            }
        }
        if (this.endpointConfig == null) {
            context.collect(rootDescriptor.copy(), "clientCount", ProbeLevel.MANDATORY, ProbeUnit.COUNT, clientCount);
            context.collect(rootDescriptor.copy(), "textCount", ProbeLevel.MANDATORY, ProbeUnit.COUNT, textCount);
        }
    }
}

