/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.core.ManagedContext;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.metrics.ProbeUnit;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.execution.BroadcastItem;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.OutboundCollector;
import com.hazelcast.jet.impl.execution.OutboundEdgeStream;
import com.hazelcast.jet.impl.execution.OutboxImpl;
import com.hazelcast.jet.impl.execution.ProcessorState;
import com.hazelcast.jet.impl.execution.SnapshotBarrier;
import com.hazelcast.jet.impl.execution.SnapshotContext;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.execution.WatermarkCoalescer;
import com.hazelcast.jet.impl.metrics.MetricsContext;
import com.hazelcast.jet.impl.processor.ProcessorWrapper;
import com.hazelcast.jet.impl.util.ArrayDequeInbox;
import com.hazelcast.jet.impl.util.CircularListCursor;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.PrefixedLogger;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class ProcessorTasklet
implements Tasklet {
    private static final int OUTBOX_BATCH_SIZE = 2048;
    private final ProgressTracker progTracker = new ProgressTracker();
    private final OutboundEdgeStream[] outstreams;
    private final OutboxImpl outbox;
    private final Processor.Context context;
    private final SnapshotContext ssContext;
    private final BitSet receivedBarriers;
    private final ArrayDequeInbox inbox = new ArrayDequeInbox(this.progTracker);
    private final Queue<ArrayList<InboundEdgeStream>> instreamGroupQueue;
    private final WatermarkCoalescer watermarkCoalescer;
    private final ILogger logger;
    private final SerializationService serializationService;
    private final List<? extends InboundEdgeStream> instreams;
    private final ExecutorService executionService;
    private final boolean isSource;
    private Processor processor;
    private int numActiveOrdinals;
    private CircularListCursor<InboundEdgeStream> instreamCursor;
    private InboundEdgeStream currInstream;
    private ProcessorState state;
    private long pendingSnapshotId1;
    private long pendingSnapshotId2;
    private SnapshotBarrier currentBarrier;
    private Watermark pendingWatermark;
    private boolean waitForAllBarriers;
    private final AtomicLongArray receivedCounts;
    private final AtomicLongArray receivedBatches;
    private final AtomicLongArray emittedCounts;
    @Probe(name="queuesSize")
    private final Counter queuesSize = SwCounter.newSwCounter();
    @Probe(name="queuesCapacity")
    private final Counter queuesCapacity = SwCounter.newSwCounter();
    private final Predicate<Object> addToInboxFunction = this.inbox.queue()::add;
    private final MetricsContext metricsContext = new MetricsContext();
    private Future<?> closeFuture;

    public ProcessorTasklet(@Nonnull Processor.Context context, @Nonnull ExecutorService executionService, @Nonnull SerializationService serializationService, @Nonnull Processor processor, @Nonnull List<? extends InboundEdgeStream> instreams, @Nonnull List<? extends OutboundEdgeStream> outstreams, @Nonnull SnapshotContext ssContext, @Nullable OutboundCollector ssCollector, boolean isSource) {
        Preconditions.checkNotNull(processor, "processor");
        this.context = context;
        this.executionService = executionService;
        this.serializationService = serializationService;
        this.processor = processor;
        this.numActiveOrdinals = instreams.size();
        this.instreams = instreams;
        this.instreamGroupQueue = new ArrayDeque(instreams.stream().collect(Collectors.groupingBy(InboundEdgeStream::priority, TreeMap::new, Collectors.toCollection(ArrayList::new))).values());
        this.outstreams = (OutboundEdgeStream[])outstreams.stream().sorted(Comparator.comparing(OutboundEdgeStream::ordinal)).toArray(OutboundEdgeStream[]::new);
        this.ssContext = ssContext;
        String prefix = PrefixedLogger.prefix(context.jobConfig().getName(), context.jobId(), context.vertexName(), context.globalProcessorIndex());
        this.logger = PrefixedLogger.prefixedLogger(this.getLogger(context), prefix);
        this.isSource = isSource;
        this.instreamCursor = this.popInstreamGroup();
        this.receivedCounts = new AtomicLongArray(instreams.size());
        this.receivedBatches = new AtomicLongArray(instreams.size());
        this.emittedCounts = new AtomicLongArray(outstreams.size() + 1);
        this.outbox = this.createOutbox(ssCollector);
        this.receivedBarriers = new BitSet(instreams.size());
        this.state = this.processingState();
        this.pendingSnapshotId1 = this.pendingSnapshotId2 = ssContext.activeSnapshotIdPhase1() + 1L;
        this.waitForAllBarriers = ssContext.processingGuarantee() == ProcessingGuarantee.EXACTLY_ONCE;
        this.watermarkCoalescer = WatermarkCoalescer.create(instreams.size());
    }

    @SuppressFBWarnings(value={"RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE"}, justification="hazelcastInstance() can be null in TestProcessorContext")
    private ILogger getLogger(@Nonnull Processor.Context context) {
        return context.hazelcastInstance() != null ? context.hazelcastInstance().getLoggingService().getLogger(this.getClass()) : Logger.getLogger(this.getClass());
    }

    private OutboxImpl createOutbox(@Nullable OutboundCollector ssCollector) {
        OutboundCollector[] collectors;
        if (ssCollector != null) {
            collectors = new OutboundCollector[this.outstreams.length + 1];
            collectors[this.outstreams.length] = ssCollector;
        } else {
            collectors = new OutboundCollector[this.outstreams.length];
        }
        for (int i = 0; i < this.outstreams.length; ++i) {
            collectors[i] = this.outstreams[i].getCollector();
        }
        return new OutboxImpl(collectors, ssCollector != null, this.progTracker, this.serializationService, 2048, this.emittedCounts);
    }

    @Override
    public void init() {
        ManagedContext managedContext = this.serializationService.getManagedContext();
        if (managedContext != null) {
            Processor toInit = this.processor instanceof ProcessorWrapper ? ((ProcessorWrapper)this.processor).getWrapped() : this.processor;
            Object initialized = null;
            try {
                initialized = managedContext.initialize(toInit);
                toInit = (Processor)initialized;
            }
            catch (ClassCastException e) {
                throw new IllegalArgumentException(String.format("The initialized object(%s) should be an instance of %s", initialized, Processor.class), e);
            }
            if (this.processor instanceof ProcessorWrapper) {
                ((ProcessorWrapper)this.processor).setWrapped(toInit);
            } else {
                this.processor = toInit;
            }
        }
        try {
            Util.doWithClassLoader(this.context.classLoader(), () -> this.processor.init(this.outbox, this.context));
        }
        catch (Exception e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    @Override
    @Nonnull
    public ProgressState call() {
        assert (this.state != ProcessorState.END) : "already in terminal state";
        this.progTracker.reset();
        this.progTracker.notDone();
        this.outbox.reset();
        this.stateMachineStep();
        return this.progTracker.toProgressState();
    }

    private void closeProcessor() {
        try {
            Util.doWithClassLoader(this.context.classLoader(), () -> this.processor.close());
        }
        catch (Throwable e) {
            this.logger.severe(Util.jobNameAndExecutionId(this.context.jobConfig().getName(), this.context.executionId()) + " encountered an exception in Processor.close(), ignoring it", e);
        }
    }

    @Override
    public MetricsContext getMetricsContext() {
        return this.metricsContext;
    }

    private void stateMachineStep() {
        switch (this.state) {
            case PROCESS_WATERMARK: {
                if (this.pendingWatermark == null) {
                    long wm = this.watermarkCoalescer.checkWmHistory();
                    if (wm == Long.MIN_VALUE) {
                        this.state = ProcessorState.NULLARY_PROCESS;
                        this.stateMachineStep();
                        break;
                    }
                    this.pendingWatermark = new Watermark(wm);
                }
                if (!(this.pendingWatermark.equals(WatermarkCoalescer.IDLE_MESSAGE) ? this.outbox.offer(WatermarkCoalescer.IDLE_MESSAGE) : Util.doWithClassLoader(this.context.classLoader(), () -> this.processor.tryProcessWatermark(this.pendingWatermark)) != false)) break;
                this.state = ProcessorState.NULLARY_PROCESS;
                this.pendingWatermark = null;
                break;
            }
            case NULLARY_PROCESS: {
                if (this.currInstream != null && !this.isSnapshotInbox() && !Util.doWithClassLoader(this.context.classLoader(), () -> this.processor.tryProcess()).booleanValue()) break;
                this.state = ProcessorState.PROCESS_INBOX;
                this.outbox.reset();
                this.stateMachineStep();
                break;
            }
            case PROCESS_INBOX: {
                this.processInbox();
                return;
            }
            case COMPLETE_EDGE: {
                if (this.isSnapshotInbox() ? Util.doWithClassLoader(this.context.classLoader(), () -> this.processor.finishSnapshotRestore()) != false : Util.doWithClassLoader(this.context.classLoader(), () -> this.processor.completeEdge(this.currInstream.ordinal())) != false) {
                    assert (!this.outbox.hasUnfinishedItem() || !this.isSnapshotInbox()) : "outbox has an unfinished item after successful finishSnapshotRestore()";
                    this.progTracker.madeProgress();
                    this.state = this.processingState();
                }
                return;
            }
            case SAVE_SNAPSHOT: {
                if (Util.doWithClassLoader(this.context.classLoader(), () -> this.processor.saveToSnapshot()).booleanValue()) {
                    this.progTracker.madeProgress();
                    this.state = this.ssContext.isExportOnly() ? ProcessorState.EMIT_BARRIER : ProcessorState.SNAPSHOT_COMMIT_PREPARE;
                    this.stateMachineStep();
                }
                return;
            }
            case SNAPSHOT_COMMIT_PREPARE: {
                if (Util.doWithClassLoader(this.context.classLoader(), () -> this.processor.snapshotCommitPrepare()).booleanValue()) {
                    this.progTracker.madeProgress();
                    this.state = ProcessorState.EMIT_BARRIER;
                    this.stateMachineStep();
                }
                return;
            }
            case EMIT_BARRIER: {
                assert (this.currentBarrier != null) : "currentBarrier == null";
                if (this.outbox.offerToEdgesAndSnapshot(this.currentBarrier)) {
                    this.progTracker.madeProgress();
                    if (this.currentBarrier.isTerminal()) {
                        this.state = ProcessorState.WAITING_FOR_SNAPSHOT_COMPLETED;
                    } else {
                        this.currentBarrier = null;
                        this.receivedBarriers.clear();
                        ++this.pendingSnapshotId1;
                        this.state = this.processingState();
                    }
                }
                return;
            }
            case SNAPSHOT_COMMIT_FINISH__PROCESS: 
            case SNAPSHOT_COMMIT_FINISH__COMPLETE: 
            case SNAPSHOT_COMMIT_FINISH__FINAL: {
                if (this.ssContext.isExportOnly() || Util.doWithClassLoader(this.context.classLoader(), () -> this.processor.snapshotCommitFinish(this.ssContext.isLastPhase1Successful())).booleanValue()) {
                    ++this.pendingSnapshotId2;
                    this.ssContext.phase2DoneForTasklet();
                    this.progTracker.madeProgress();
                    switch (this.state) {
                        case SNAPSHOT_COMMIT_FINISH__PROCESS: {
                            this.state = ProcessorState.PROCESS_INBOX;
                            break;
                        }
                        case SNAPSHOT_COMMIT_FINISH__COMPLETE: {
                            this.state = ProcessorState.COMPLETE;
                            break;
                        }
                        case SNAPSHOT_COMMIT_FINISH__FINAL: {
                            this.state = ProcessorState.PRE_EMIT_DONE_ITEM;
                            break;
                        }
                        default: {
                            throw new RuntimeException("unexpected state: " + (Object)((Object)this.state));
                        }
                    }
                }
                return;
            }
            case WAITING_FOR_SNAPSHOT_COMPLETED: {
                long currSnapshotId2 = this.ssContext.activeSnapshotIdPhase2();
                if (currSnapshotId2 >= this.pendingSnapshotId2) {
                    this.state = ProcessorState.SNAPSHOT_COMMIT_FINISH__FINAL;
                    this.stateMachineStep();
                }
                return;
            }
            case COMPLETE: {
                this.complete();
                return;
            }
            case PRE_EMIT_DONE_ITEM: {
                this.ssContext.processorTaskletDone(this.pendingSnapshotId2 - 1L);
                this.state = ProcessorState.EMIT_DONE_ITEM;
                this.stateMachineStep();
                return;
            }
            case EMIT_DONE_ITEM: {
                if (this.outbox.offerToEdgesAndSnapshot(DoneItem.DONE_ITEM)) {
                    this.progTracker.madeProgress();
                    this.state = ProcessorState.CLOSE;
                    this.stateMachineStep();
                }
                return;
            }
            case CLOSE: {
                if (this.isCooperative() && !this.processor.closeIsCooperative()) {
                    if (this.closeFuture == null) {
                        ClassLoader contextCl = Thread.currentThread().getContextClassLoader();
                        this.closeFuture = this.executionService.submit(() -> Util.doWithClassLoader(contextCl, this::closeProcessor));
                        this.progTracker.madeProgress();
                    }
                    if (!this.closeFuture.isDone()) {
                        return;
                    }
                    this.progTracker.madeProgress();
                } else {
                    this.closeProcessor();
                }
                this.state = ProcessorState.END;
                this.progTracker.done();
                return;
            }
            default: {
                throw new JetException("Unexpected state: " + (Object)((Object)this.state));
            }
        }
    }

    private void processInbox() {
        if (this.ssContext.activeSnapshotIdPhase2() == this.pendingSnapshotId2) {
            this.state = ProcessorState.SNAPSHOT_COMMIT_FINISH__PROCESS;
            this.progTracker.madeProgress();
            return;
        }
        if (this.inbox.isEmpty()) {
            this.fillInbox();
        }
        if (!this.inbox.isEmpty()) {
            if (this.isSnapshotInbox()) {
                Util.doWithClassLoader(this.context.classLoader(), () -> this.processor.restoreFromSnapshot(this.inbox));
            } else {
                Util.doWithClassLoader(this.context.classLoader(), () -> this.processor.process(this.currInstream.ordinal(), this.inbox));
            }
        }
        if (this.inbox.isEmpty()) {
            if (this.currInstream != null && this.currInstream.isDone()) {
                this.state = ProcessorState.COMPLETE_EDGE;
                this.progTracker.madeProgress();
            } else if (this.numActiveOrdinals > 0 && this.receivedBarriers.cardinality() == this.numActiveOrdinals) {
                this.state = ProcessorState.SAVE_SNAPSHOT;
            } else if (this.numActiveOrdinals == 0) {
                this.progTracker.madeProgress();
                this.state = ProcessorState.COMPLETE;
            } else {
                this.state = ProcessorState.PROCESS_WATERMARK;
            }
        }
    }

    /*
     * Enabled aggressive block sorting
     */
    private void complete() {
        if (this.pendingSnapshotId1 == this.pendingSnapshotId2) {
            long currSnapshotId1 = this.ssContext.activeSnapshotIdPhase1();
            assert (currSnapshotId1 + 1L == this.pendingSnapshotId1 || currSnapshotId1 == this.pendingSnapshotId1) : "Unexpected new phase 1 snapshot id: " + currSnapshotId1 + ", expected was " + (this.pendingSnapshotId1 - 1L) + " or " + this.pendingSnapshotId1;
            if (currSnapshotId1 == this.pendingSnapshotId1) {
                if (!this.outbox.hasUnfinishedItem()) {
                    this.outbox.unblock();
                    this.state = ProcessorState.SAVE_SNAPSHOT;
                    this.currentBarrier = new SnapshotBarrier(currSnapshotId1, this.ssContext.isTerminalSnapshot());
                    this.progTracker.madeProgress();
                    return;
                }
                this.outbox.block();
            }
        } else {
            long currSnapshotId2 = this.ssContext.activeSnapshotIdPhase2();
            assert (currSnapshotId2 + 1L == this.pendingSnapshotId2 || currSnapshotId2 == this.pendingSnapshotId2) : "Unexpected new phase 2 snapshot id: " + currSnapshotId2 + ", expected was " + (this.pendingSnapshotId2 - 1L) + " or " + this.pendingSnapshotId2;
            if (currSnapshotId2 == this.pendingSnapshotId2) {
                this.state = ProcessorState.SNAPSHOT_COMMIT_FINISH__COMPLETE;
                this.progTracker.madeProgress();
                return;
            }
        }
        if (this.processor.complete()) {
            this.progTracker.madeProgress();
            this.state = this.pendingSnapshotId2 < this.pendingSnapshotId1 ? ProcessorState.WAITING_FOR_SNAPSHOT_COMPLETED : ProcessorState.PRE_EMIT_DONE_ITEM;
        }
    }

    private void fillInbox() {
        ProgressState result;
        assert (this.inbox.isEmpty()) : "inbox is not empty";
        assert (this.pendingWatermark == null) : "null wm expected, but was " + this.pendingWatermark;
        if (this.instreamCursor == null) {
            return;
        }
        InboundEdgeStream first = this.instreamCursor.value();
        do {
            this.currInstream = this.instreamCursor.value();
            result = ProgressState.NO_PROGRESS;
            if (this.waitForAllBarriers && this.receivedBarriers.get(this.currInstream.ordinal())) {
                this.instreamCursor.advance();
                continue;
            }
            result = this.currInstream.drainTo(this.addToInboxFunction);
            this.progTracker.madeProgress(result.isMadeProgress());
            Object lastItem = this.inbox.queue().peekLast();
            if (lastItem instanceof Watermark) {
                long newWmValue = ((Watermark)this.inbox.queue().removeLast()).timestamp();
                long wm = this.watermarkCoalescer.observeWm(this.currInstream.ordinal(), newWmValue);
                if (wm != Long.MIN_VALUE) {
                    this.pendingWatermark = new Watermark(wm);
                }
            } else if (lastItem instanceof SnapshotBarrier) {
                SnapshotBarrier barrier = (SnapshotBarrier)this.inbox.queue().removeLast();
                this.observeBarrier(this.currInstream.ordinal(), barrier);
            } else if (lastItem != null && !(lastItem instanceof BroadcastItem)) {
                this.watermarkCoalescer.observeEvent(this.currInstream.ordinal());
            }
            if (result.isDone()) {
                this.receivedBarriers.clear(this.currInstream.ordinal());
                long wm = this.watermarkCoalescer.queueDone(this.currInstream.ordinal());
                if (wm != Long.MIN_VALUE) {
                    assert (this.pendingWatermark == null || this.pendingWatermark.timestamp() < wm) : "trying to assign lower WM. Old=" + this.pendingWatermark.timestamp() + ", new=" + wm;
                    this.pendingWatermark = new Watermark(wm);
                }
                this.instreamCursor.remove();
                --this.numActiveOrdinals;
            }
            if (this.instreamCursor.advance()) continue;
            this.instreamCursor = this.popInstreamGroup();
            break;
        } while (!result.isMadeProgress() && this.instreamCursor.value() != first);
        Util.lazyAdd(this.receivedCounts, this.currInstream.ordinal(), this.inbox.size());
        if (!this.inbox.isEmpty()) {
            Util.lazyIncrement(this.receivedBatches, this.currInstream.ordinal());
        }
        this.queuesCapacity.set(this.instreamCursor == null ? 0L : (long)Util.sum(this.instreamCursor.getList(), InboundEdgeStream::capacities));
        this.queuesSize.set(this.instreamCursor == null ? 0L : (long)Util.sum(this.instreamCursor.getList(), InboundEdgeStream::sizes));
    }

    private CircularListCursor<InboundEdgeStream> popInstreamGroup() {
        return Optional.ofNullable(this.instreamGroupQueue.poll()).map(CircularListCursor::new).orElse(null);
    }

    public String toString() {
        String prefix = PrefixedLogger.prefix(this.context.jobConfig().getName(), this.context.jobId(), this.context.vertexName(), this.context.globalProcessorIndex());
        return "ProcessorTasklet{" + prefix + '}';
    }

    private void observeBarrier(int ordinal, SnapshotBarrier barrier) {
        if (barrier.snapshotId() != this.pendingSnapshotId1) {
            throw new JetException("Unexpected snapshot barrier ID " + barrier.snapshotId() + " from ordinal " + ordinal + ", expected " + this.pendingSnapshotId1);
        }
        this.currentBarrier = barrier;
        if (barrier.isTerminal()) {
            this.waitForAllBarriers = true;
        }
        this.receivedBarriers.set(ordinal);
    }

    private ProcessorState processingState() {
        return this.instreamCursor == null ? ProcessorState.COMPLETE : ProcessorState.PROCESS_WATERMARK;
    }

    private boolean isSnapshotInbox() {
        return this.currInstream != null && this.currInstream.priority() == Integer.MIN_VALUE;
    }

    private long lastForwardedWmLatency() {
        long wm = this.outbox.lastForwardedWm();
        if (wm == WatermarkCoalescer.IDLE_MESSAGE.timestamp()) {
            return Long.MIN_VALUE;
        }
        if (wm == Long.MIN_VALUE) {
            return Long.MAX_VALUE;
        }
        return System.currentTimeMillis() - wm;
    }

    @Override
    public boolean isCooperative() {
        return Util.doWithClassLoader(this.context.classLoader(), () -> this.processor.isCooperative());
    }

    @Override
    public void close() {
        if (this.state == ProcessorState.CLOSE) {
            try {
                this.closeFuture.get();
            }
            catch (Exception e) {
                throw ExceptionUtil.sneakyThrow(e);
            }
        } else if (this.state != ProcessorState.END) {
            this.closeProcessor();
        }
    }

    @Override
    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        int i;
        descriptor = descriptor.withTag("vertex", this.context.vertexName()).withTag("procType", this.processor.getClass().getSimpleName()).withTag("proc", Integer.toString(this.context.globalProcessorIndex()));
        if (this.isSource) {
            descriptor = descriptor.withTag("source", "true");
        }
        if (this.outstreams.length == 0) {
            descriptor = descriptor.withTag("sink", "true");
        }
        for (i = 0; i < this.instreams.size(); ++i) {
            MetricDescriptor descWithOrdinal = descriptor.copy().withTag("ordinal", String.valueOf(i));
            context.collect(descWithOrdinal, "receivedCount", ProbeLevel.INFO, ProbeUnit.COUNT, this.receivedCounts.get(i));
            context.collect(descWithOrdinal, "receivedBatches", ProbeLevel.INFO, ProbeUnit.COUNT, this.receivedBatches.get(i));
        }
        for (i = 0; i < this.emittedCounts.length() - (this.context.snapshottingEnabled() ? 0 : 1); ++i) {
            String ordinal = i == this.emittedCounts.length() - 1 ? "snapshot" : String.valueOf(i);
            MetricDescriptor descriptorWithOrdinal = descriptor.copy().withTag("ordinal", ordinal);
            context.collect(descriptorWithOrdinal, "emittedCount", ProbeLevel.INFO, ProbeUnit.COUNT, this.emittedCounts.get(i));
        }
        context.collect(descriptor, "topObservedWm", ProbeLevel.INFO, ProbeUnit.MS, this.watermarkCoalescer.topObservedWm());
        context.collect(descriptor, "coalescedWm", ProbeLevel.INFO, ProbeUnit.MS, this.watermarkCoalescer.coalescedWm());
        context.collect(descriptor, "lastForwardedWm", ProbeLevel.INFO, ProbeUnit.MS, this.outbox.lastForwardedWm());
        context.collect(descriptor, "lastForwardedWmLatency", ProbeLevel.INFO, ProbeUnit.MS, this.lastForwardedWmLatency());
        context.collect(descriptor, this);
        context.collect(descriptor, this.processor);
        if (this.processor instanceof DynamicMetricsProvider) {
            ((DynamicMetricsProvider)((Object)this.processor)).provideDynamicMetrics(descriptor.copy(), context);
        }
        this.metricsContext.provideDynamicMetrics(descriptor, context);
    }
}

