/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.cluster.Address;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.ProbeUnit;
import com.hazelcast.internal.nio.BufferObjectDataInput;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.jet.RestartableException;
import com.hazelcast.jet.impl.execution.BroadcastItem;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.execution.OutboundCollector;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.ObjectWithPartitionId;
import com.hazelcast.jet.impl.util.PrefixedLogger;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingService;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

public class ReceiverTasklet
implements Tasklet {
    static final int COMPRESSED_SEQ_UNIT_LOG2 = 16;
    static final int INITIAL_RECEIVE_WINDOW_COMPRESSED = 800;
    private final int rwinMultiplier;
    private final double flowControlPeriodNs;
    private final ILogger logger;
    private final String sourceAddressString;
    private final String ordinalString;
    private final String destinationVertexName;
    private final Connection memberConnection;
    private Queue<byte[]> incoming;
    private final ProgressTracker tracker = new ProgressTracker();
    private final ArrayDeque<ObjWithPtionIdAndSize> inbox = new ArrayDeque();
    private final OutboundCollector collector;
    private final InternalSerializationService serializationService;
    private boolean receptionDone;
    @Probe(name="distributedItemsIn")
    private final Counter itemsInCounter = SwCounter.newSwCounter();
    @Probe(name="distributedBytesIn", unit=ProbeUnit.BYTES)
    private final Counter bytesInCounter = SwCounter.newSwCounter();
    private volatile long ackedSeq;
    private volatile int numWaitingInInbox;
    private volatile boolean connectionChanged;
    private int receiveWindowCompressed;
    private int prevAckedSeqCompressed;
    private long prevTimestamp;

    public ReceiverTasklet(OutboundCollector collector, InternalSerializationService serializationService, int rwinMultiplier, int flowControlPeriodMs, LoggingService loggingService, Address sourceAddress, int ordinal, String destinationVertexName, Connection memberConnection, String jobPrefix) {
        this.collector = collector;
        this.serializationService = serializationService;
        this.rwinMultiplier = rwinMultiplier;
        this.flowControlPeriodNs = TimeUnit.MILLISECONDS.toNanos(flowControlPeriodMs);
        this.sourceAddressString = sourceAddress.toString();
        this.ordinalString = "" + ordinal;
        this.destinationVertexName = destinationVertexName;
        this.memberConnection = memberConnection;
        String prefix = String.format("%s/receiverFor:%s#%d", jobPrefix, destinationVertexName, ordinal);
        this.logger = PrefixedLogger.prefixedLogger(loggingService.getLogger(this.getClass()), prefix);
        this.receiveWindowCompressed = 800;
    }

    @Override
    @Nonnull
    public ProgressState call() {
        ObjWithPtionIdAndSize o;
        if (this.receptionDone) {
            return this.collector.offerBroadcast(DoneItem.DONE_ITEM);
        }
        if (this.connectionChanged) {
            throw new RestartableException("The member was reconnected: " + this.sourceAddressString);
        }
        this.tracker.reset();
        this.tracker.notDone();
        this.tryFillInbox();
        int ackItemLocal = 0;
        while ((o = this.inbox.peek()) != null) {
            ProgressState outcome;
            Object item = o.getItem();
            if (item == DoneItem.DONE_ITEM) {
                this.receptionDone = true;
                this.inbox.remove();
                assert (this.inbox.peek() == null) : "Found something in the queue beyond the DONE_ITEM: " + this.inbox.remove();
                break;
            }
            ProgressState progressState = outcome = item instanceof BroadcastItem ? this.collector.offerBroadcast((BroadcastItem)item) : this.collector.offer(item, o.getPartitionId());
            if (!outcome.isDone()) {
                this.tracker.madeProgress(outcome.isMadeProgress());
                break;
            }
            this.tracker.madeProgress();
            this.inbox.remove();
            ackItemLocal = (int)((long)ackItemLocal + o.estimatedMemoryFootprint);
        }
        this.ackItem(ackItemLocal);
        this.numWaitingInInbox = this.inbox.size();
        return this.tracker.toProgressState();
    }

    public int updateAndGetSendSeqLimitCompressed(Connection expectedConnection) {
        return this.updateAndGetSendSeqLimitCompressed(System.nanoTime(), expectedConnection);
    }

    int updateAndGetSendSeqLimitCompressed(long timestampNow, Connection expectedConnection) {
        if (!Objects.equals(expectedConnection, this.memberConnection)) {
            this.connectionChanged = true;
        }
        boolean hadPrevStats = this.prevTimestamp != 0L || this.prevAckedSeqCompressed != 0;
        long ackTimeDelta = timestampNow - this.prevTimestamp;
        this.prevTimestamp = timestampNow;
        int ackedSeqCompressed = ReceiverTasklet.compressSeq(this.ackedSeq);
        int ackedSeqCompressedDelta = ackedSeqCompressed - this.prevAckedSeqCompressed;
        this.prevAckedSeqCompressed = ackedSeqCompressed;
        if (hadPrevStats) {
            double ackedSeqsPerAckPeriod = this.flowControlPeriodNs * (double)ackedSeqCompressedDelta / (double)ackTimeDelta;
            int targetRwin = this.rwinMultiplier * (int)Math.ceil(ackedSeqsPerAckPeriod);
            int rwinDiff = targetRwin - this.receiveWindowCompressed;
            int numWaitingInInbox = this.numWaitingInInbox;
            if (numWaitingInInbox == 0 && rwinDiff < 0) {
                rwinDiff = 0;
            }
            this.receiveWindowCompressed += (rwinDiff /= 2);
            if (rwinDiff != 0) {
                LoggingUtil.logFinest(this.logger, "receiveWindowCompressed changed by %d to %d", rwinDiff, this.receiveWindowCompressed);
            }
        }
        return ackedSeqCompressed + this.receiveWindowCompressed;
    }

    long ackItem(long itemWeight) {
        return this.ackedSeq += itemWeight;
    }

    void setNumWaitingInInbox(int value) {
        this.numWaitingInInbox = value;
    }

    public String toString() {
        return "ReceiverTasklet";
    }

    static int compressSeq(long seq) {
        return (int)(seq >> 16);
    }

    static long estimatedMemoryFootprint(long itemBlobSize) {
        int inboxSlot = 4;
        int objPtionAndSenderIdHeader = 16;
        int itemField = 4;
        int itemObjHeader = 16;
        int partitionIdField = 4;
        int senderIdField = 4;
        int estimatedMemoryFootprintField = 8;
        int overhead = 56;
        return 56L + itemBlobSize;
    }

    private void tryFillInbox() {
        try {
            byte[] payload;
            long totalBytes = 0L;
            long totalItems = 0L;
            while ((payload = this.incoming.poll()) != null) {
                BufferObjectDataInput input = this.serializationService.createObjectDataInput(payload, 16);
                int itemCount = input.readInt();
                for (int i = 0; i < itemCount; ++i) {
                    int mark = input.position();
                    Object item = input.readObject();
                    int itemSize = input.position() - mark;
                    this.inbox.add(new ObjWithPtionIdAndSize(item, input.readInt(), itemSize));
                }
                totalItems += (long)itemCount;
                totalBytes += (long)input.position();
                input.close();
                this.tracker.madeProgress();
            }
            this.bytesInCounter.inc(totalBytes);
            this.itemsInCounter.inc(totalItems);
        }
        catch (IOException e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    public void initIncomingQueue(Queue<byte[]> incomingQueue) {
        this.incoming = incomingQueue;
    }

    @Override
    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        descriptor = descriptor.withTag("vertex", this.destinationVertexName).withTag("sourceAddress", this.sourceAddressString).withTag("ordinal", this.ordinalString);
        context.collect(descriptor, this);
    }

    private static class ObjWithPtionIdAndSize
    extends ObjectWithPartitionId {
        final long estimatedMemoryFootprint;

        ObjWithPtionIdAndSize(Object item, int partitionId, int itemBlobSize) {
            super(item, partitionId);
            this.estimatedMemoryFootprint = ReceiverTasklet.estimatedMemoryFootprint(itemBlobSize);
        }
    }
}

