/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.cluster.Address;
import com.hazelcast.internal.nio.BufferObjectDataInput;
import com.hazelcast.internal.nio.BufferObjectDataOutput;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.internal.nio.Packet;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.jet.impl.JobExecutionService;
import com.hazelcast.jet.impl.execution.ExecutionContext;
import com.hazelcast.jet.impl.execution.ReceiverTasklet;
import com.hazelcast.jet.impl.execution.SenderTasklet;
import com.hazelcast.jet.impl.serialization.MemoryReader;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ImdgUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class Networking {
    public static final int PACKET_HEADER_SIZE = 16;
    private static final int FLOW_PACKET_INITIAL_SIZE = 128;
    private static final int TERMINAL_VERTEX_ID = -1;
    private static final long TERMINAL_EXECUTION_ID = Long.MIN_VALUE;
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final JobExecutionService jobExecutionService;
    private final ScheduledFuture<?> flowControlSender;
    private final MemoryReader memoryReader;
    private int lastFlowPacketSize;

    Networking(NodeEngine nodeEngine, JobExecutionService jobExecutionService, int flowControlPeriodMs) {
        this.nodeEngine = (NodeEngineImpl)nodeEngine;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.jobExecutionService = jobExecutionService;
        this.flowControlSender = nodeEngine.getExecutionService().scheduleWithRepetition(this::broadcastFlowControlPacket, 0L, flowControlPeriodMs, TimeUnit.MILLISECONDS);
        this.memoryReader = MemoryReader.create(((InternalSerializationService)nodeEngine.getSerializationService()).getByteOrder());
        this.lastFlowPacketSize = 128;
    }

    void shutdown() {
        this.flowControlSender.cancel(false);
    }

    void handle(Packet packet) throws IOException {
        if (packet.isFlagRaised(2)) {
            this.handleFlowControlPacket(packet.getConn().getRemoteAddress(), packet.toByteArray());
        } else {
            this.handleStreamPacket(packet);
        }
    }

    private void handleStreamPacket(Packet packet) {
        byte[] payload = packet.toByteArray();
        long executionId = this.memoryReader.readLong(payload, 0);
        int vertexId = this.memoryReader.readInt(payload, 8);
        int ordinal = this.memoryReader.readInt(payload, 12);
        ExecutionContext executionContext = this.jobExecutionService.getOrCreateExecutionContext(executionId);
        executionContext.handlePacket(vertexId, ordinal, packet.getConn().getRemoteAddress(), payload);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public static byte[] createStreamPacketHeader(NodeEngine nodeEngine, long executionId, int destinationVertexId, int ordinal) {
        try (BufferObjectDataOutput output = ImdgUtil.createObjectDataOutput(nodeEngine, 16);){
            output.writeLong(executionId);
            output.writeInt(destinationVertexId);
            output.writeInt(ordinal);
            byte[] byArray = output.toByteArray();
            return byArray;
        }
        catch (IOException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    private void broadcastFlowControlPacket() {
        try {
            Map<Address, byte[]> packets = this.createFlowControlPacket();
            for (Map.Entry<Address, byte[]> en : packets.entrySet()) {
                Connection conn = ImdgUtil.getMemberConnection(this.nodeEngine, en.getKey());
                if (conn == null) continue;
                conn.write(new Packet(en.getValue()).setPacketType(Packet.Type.JET).raiseFlags(18));
            }
        }
        catch (Throwable t) {
            this.logger.severe("Flow-control packet broadcast failed", t);
        }
    }

    private Map<Address, byte[]> createFlowControlPacket() throws IOException {
        class MemberData {
            final BufferObjectDataOutput output;
            final Connection memberConnection;
            Long startedExecutionId;

            MemberData(Address address) {
                this.output = ImdgUtil.createObjectDataOutput(Networking.this.nodeEngine, Networking.this.lastFlowPacketSize);
                this.memberConnection = ImdgUtil.getMemberConnection(Networking.this.nodeEngine, address);
            }
        }
        HashMap<Address, byte[]> res = new HashMap<Address, byte[]>();
        for (ExecutionContext execCtx : this.jobExecutionService.getExecutionContexts()) {
            Map<ExecutionContext.SenderReceiverKey, ReceiverTasklet> map = execCtx.receiverMap();
            if (map == null) continue;
            for (Map.Entry<ExecutionContext.SenderReceiverKey, ReceiverTasklet> en : map.entrySet()) {
                assert (!en.getKey().address.equals(this.nodeEngine.getThisAddress()));
                MemberData md = res.computeIfAbsent(en.getKey().address, address -> new MemberData((Address)address));
                if (md.startedExecutionId == null) {
                    md.startedExecutionId = execCtx.executionId();
                    md.output.writeLong(md.startedExecutionId);
                }
                assert (en.getKey().vertexId != -1);
                md.output.writeInt(en.getKey().vertexId);
                md.output.writeInt(en.getKey().ordinal);
                md.output.writeInt(en.getValue().updateAndGetSendSeqLimitCompressed(md.memberConnection));
            }
            for (MemberData md : res.values()) {
                if (md.startedExecutionId == null) continue;
                md.output.writeInt(-1);
                md.startedExecutionId = null;
            }
        }
        for (MemberData md : res.values()) {
            assert (md.output.position() > 0);
            md.output.writeLong(Long.MIN_VALUE);
        }
        int maxSize = 0;
        for (Map.Entry entry : res.entrySet()) {
            byte[] data = ((MemberData)entry.getValue()).output.toByteArray();
            Map.Entry entry1 = entry;
            entry1.setValue(data);
            if (data.length <= maxSize) continue;
            maxSize = data.length;
        }
        this.lastFlowPacketSize = maxSize;
        return res;
    }

    private void handleFlowControlPacket(Address fromAddr, byte[] packet) throws IOException {
        try (BufferObjectDataInput input = ImdgUtil.createObjectDataInput(this.nodeEngine, packet);){
            long executionId;
            block9: while ((executionId = input.readLong()) != Long.MIN_VALUE) {
                Map<ExecutionContext.SenderReceiverKey, SenderTasklet> senderMap = this.jobExecutionService.getSenderMap(executionId);
                while (true) {
                    SenderTasklet t;
                    int vertexId;
                    if ((vertexId = input.readInt()) == -1) continue block9;
                    int ordinal = input.readInt();
                    int sendSeqLimitCompressed = input.readInt();
                    if (senderMap == null || (t = senderMap.get(new ExecutionContext.SenderReceiverKey(vertexId, ordinal, fromAddr))) == null) continue;
                    t.setSendSeqLimitCompressed(sendSeqLimitCompressed);
                }
                break;
            }
        }
    }
}

