/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.sink.writer;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.ListIterator;
import java.util.function.Consumer;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.base.sink.writer.strategy.BasicRequestInfo;
import org.apache.flink.connector.base.sink.writer.strategy.BasicResultInfo;
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.Preconditions;

@PublicEvolving
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
    private final MailboxExecutor mailboxExecutor;
    private final ProcessingTimeService timeService;
    private long lastSendTimestamp = 0L;
    private long ackTime = Long.MAX_VALUE;
    private final SinkWriterMetricGroup metrics;
    private final Counter numBytesOutCounter;
    private final Counter numRecordsOutCounter;
    private final RateLimitingStrategy rateLimitingStrategy;
    private final int maxBatchSize;
    private final int maxBufferedRequests;
    private final long maxBatchSizeInBytes;
    private final long maxTimeInBufferMS;
    private final long maxRecordSizeInBytes;
    private final ElementConverter<InputT, RequestEntryT> elementConverter;
    private final Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries = new ArrayDeque<RequestEntryWrapper<RequestEntryT>>();
    private int inFlightRequestsCount;
    private double bufferedRequestEntriesTotalSizeInBytes;
    private boolean existsActiveTimerCallback = false;
    private final Consumer<Exception> fatalExceptionCons;

    protected abstract void submitRequestEntries(List<RequestEntryT> var1, Consumer<List<RequestEntryT>> var2);

    protected abstract long getSizeInBytes(RequestEntryT var1);

    @Deprecated
    public AsyncSinkWriter(ElementConverter<InputT, RequestEntryT> elementConverter, Sink.InitContext context, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) {
        this(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, Collections.emptyList());
    }

    @Deprecated
    public AsyncSinkWriter(ElementConverter<InputT, RequestEntryT> elementConverter, Sink.InitContext context, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, Collection<BufferedRequestState<RequestEntryT>> states) {
        this(elementConverter, context, AsyncSinkWriterConfiguration.builder().setMaxBatchSize(maxBatchSize).setMaxBatchSizeInBytes(maxBatchSizeInBytes).setMaxInFlightRequests(maxInFlightRequests).setMaxBufferedRequests(maxBufferedRequests).setMaxTimeInBufferMS(maxTimeInBufferMS).setMaxRecordSizeInBytes(maxRecordSizeInBytes).build(), states);
    }

    public AsyncSinkWriter(ElementConverter<InputT, RequestEntryT> elementConverter, Sink.InitContext context, AsyncSinkWriterConfiguration configuration, Collection<BufferedRequestState<RequestEntryT>> states) {
        this.elementConverter = elementConverter;
        this.mailboxExecutor = context.getMailboxExecutor();
        this.timeService = context.getProcessingTimeService();
        Preconditions.checkNotNull(elementConverter);
        Preconditions.checkArgument(configuration.getMaxBatchSize() > 0);
        Preconditions.checkArgument(configuration.getMaxBufferedRequests() > 0);
        Preconditions.checkArgument(configuration.getMaxBatchSizeInBytes() > 0L);
        Preconditions.checkArgument(configuration.getMaxTimeInBufferMS() > 0L);
        Preconditions.checkArgument(configuration.getMaxRecordSizeInBytes() > 0L);
        Preconditions.checkArgument(configuration.getMaxBufferedRequests() > configuration.getMaxBatchSize(), "The maximum number of requests that may be buffered should be strictly greater than the maximum number of requests per batch.");
        Preconditions.checkArgument(configuration.getMaxBatchSizeInBytes() >= configuration.getMaxRecordSizeInBytes(), "The maximum allowed size in bytes per flush must be greater than or equal to the maximum allowed size in bytes of a single record.");
        Preconditions.checkNotNull(configuration.getRateLimitingStrategy());
        this.maxBatchSize = configuration.getMaxBatchSize();
        this.maxBufferedRequests = configuration.getMaxBufferedRequests();
        this.maxBatchSizeInBytes = configuration.getMaxBatchSizeInBytes();
        this.maxTimeInBufferMS = configuration.getMaxTimeInBufferMS();
        this.maxRecordSizeInBytes = configuration.getMaxRecordSizeInBytes();
        this.rateLimitingStrategy = configuration.getRateLimitingStrategy();
        this.inFlightRequestsCount = 0;
        this.bufferedRequestEntriesTotalSizeInBytes = 0.0;
        this.metrics = context.metricGroup();
        this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp);
        this.numBytesOutCounter = this.metrics.getIOMetricGroup().getNumBytesOutCounter();
        this.numRecordsOutCounter = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
        this.fatalExceptionCons = exception -> this.mailboxExecutor.execute(() -> {
            throw exception;
        }, "A fatal exception occurred in the sink that cannot be recovered from or should not be retried.");
        this.initializeState(states);
    }

    private void registerCallback() {
        ProcessingTimeService.ProcessingTimeCallback ptc = instant -> {
            this.existsActiveTimerCallback = false;
            while (!this.bufferedRequestEntries.isEmpty()) {
                this.flush();
            }
        };
        this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + this.maxTimeInBufferMS, ptc);
        this.existsActiveTimerCallback = true;
    }

    @Override
    public void write(InputT element, SinkWriter.Context context) throws IOException, InterruptedException {
        while (this.bufferedRequestEntries.size() >= this.maxBufferedRequests) {
            this.flush();
        }
        this.addEntryToBuffer((Serializable)this.elementConverter.apply(element, context), false);
        this.nonBlockingFlush();
    }

    private void nonBlockingFlush() throws InterruptedException {
        while (!this.rateLimitingStrategy.shouldBlock(this.createRequestInfo()) && (this.bufferedRequestEntries.size() >= this.getNextBatchSizeLimit() || this.bufferedRequestEntriesTotalSizeInBytes >= (double)this.maxBatchSizeInBytes)) {
            this.flush();
        }
    }

    private BasicRequestInfo createRequestInfo() {
        int batchSize = this.getNextBatchSize();
        return new BasicRequestInfo(batchSize);
    }

    private void flush() throws InterruptedException {
        BasicRequestInfo requestInfo = this.createRequestInfo();
        while (this.rateLimitingStrategy.shouldBlock(requestInfo)) {
            this.mailboxExecutor.yield();
            requestInfo = this.createRequestInfo();
        }
        List<RequestEntryT> batch = this.createNextAvailableBatch(requestInfo);
        if (batch.size() == 0) {
            return;
        }
        int batchSize = requestInfo.getBatchSize();
        long requestTimestamp = System.currentTimeMillis();
        Consumer<List<RequestEntryT>> requestToRetry = failedRequestEntries -> this.mailboxExecutor.execute(() -> this.completeRequest((List<RequestEntryT>)failedRequestEntries, batchSize, requestTimestamp), "Mark in-flight request as completed and requeue %d request entries", failedRequestEntries.size());
        this.rateLimitingStrategy.registerInFlightRequest(requestInfo);
        ++this.inFlightRequestsCount;
        this.submitRequestEntries(batch, requestToRetry);
    }

    private int getNextBatchSize() {
        return Math.min(this.getNextBatchSizeLimit(), this.bufferedRequestEntries.size());
    }

    private List<RequestEntryT> createNextAvailableBatch(RequestInfo requestInfo) {
        long requestEntrySize;
        ArrayList<RequestEntryT> batch = new ArrayList<RequestEntryT>(requestInfo.getBatchSize());
        long batchSizeBytes = 0L;
        for (int i = 0; i < requestInfo.getBatchSize() && batchSizeBytes + (requestEntrySize = this.bufferedRequestEntries.peek().getSize()) <= this.maxBatchSizeInBytes; ++i) {
            RequestEntryWrapper<RequestEntryT> elem = this.bufferedRequestEntries.remove();
            batch.add(elem.getRequestEntry());
            this.bufferedRequestEntriesTotalSizeInBytes -= (double)requestEntrySize;
            batchSizeBytes += requestEntrySize;
        }
        this.numRecordsOutCounter.inc((long)batch.size());
        this.numBytesOutCounter.inc(batchSizeBytes);
        return batch;
    }

    private void completeRequest(List<RequestEntryT> failedRequestEntries, int batchSize, long requestStartTime) throws InterruptedException {
        this.lastSendTimestamp = requestStartTime;
        this.ackTime = System.currentTimeMillis();
        --this.inFlightRequestsCount;
        this.rateLimitingStrategy.registerCompletedRequest(new BasicResultInfo(failedRequestEntries.size(), batchSize));
        ListIterator<RequestEntryT> iterator = failedRequestEntries.listIterator(failedRequestEntries.size());
        while (iterator.hasPrevious()) {
            this.addEntryToBuffer((Serializable)iterator.previous(), true);
        }
        this.nonBlockingFlush();
    }

    private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) {
        RequestEntryWrapper<RequestEntryT> wrappedEntry;
        if (this.bufferedRequestEntries.isEmpty() && !this.existsActiveTimerCallback) {
            this.registerCallback();
        }
        if ((wrappedEntry = new RequestEntryWrapper<RequestEntryT>(entry, this.getSizeInBytes(entry))).getSize() > this.maxRecordSizeInBytes) {
            throw new IllegalArgumentException(String.format("The request entry sent to the buffer was of size [%s], when the maxRecordSizeInBytes was set to [%s].", wrappedEntry.getSize(), this.maxRecordSizeInBytes));
        }
        if (insertAtHead) {
            this.bufferedRequestEntries.addFirst(wrappedEntry);
        } else {
            this.bufferedRequestEntries.add(wrappedEntry);
        }
        this.bufferedRequestEntriesTotalSizeInBytes += (double)wrappedEntry.getSize();
    }

    @Override
    public void flush(boolean flush) throws InterruptedException {
        while (this.inFlightRequestsCount > 0 || this.bufferedRequestEntries.size() > 0 && flush) {
            this.yieldIfThereExistsInFlightRequests();
            if (!flush) continue;
            this.flush();
        }
    }

    private void yieldIfThereExistsInFlightRequests() throws InterruptedException {
        if (this.inFlightRequestsCount > 0) {
            this.mailboxExecutor.yield();
        }
    }

    @Override
    public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId) {
        return Collections.singletonList(new BufferedRequestState<RequestEntryT>(this.bufferedRequestEntries));
    }

    private void initializeState(Collection<BufferedRequestState<RequestEntryT>> states) {
        for (BufferedRequestState<RequestEntryT> state : states) {
            this.initializeState(state);
        }
    }

    private void initializeState(BufferedRequestState<RequestEntryT> state) {
        this.bufferedRequestEntries.addAll(state.getBufferedRequestEntries());
        for (RequestEntryWrapper<RequestEntryT> wrapper : this.bufferedRequestEntries) {
            if (wrapper.getSize() <= this.maxRecordSizeInBytes) continue;
            throw new IllegalStateException(String.format("State contains record of size %d which exceeds sink maximum record size %d.", wrapper.getSize(), this.maxRecordSizeInBytes));
        }
        this.bufferedRequestEntriesTotalSizeInBytes += (double)state.getStateSize();
    }

    @Override
    public void close() {
    }

    private int getNextBatchSizeLimit() {
        return Math.min(this.maxBatchSize, this.rateLimitingStrategy.getMaxBatchSize());
    }

    protected Consumer<Exception> getFatalExceptionCons() {
        return this.fatalExceptionCons;
    }
}

