/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.transaction;

import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.config.AbstractKafkaConfig;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;

public class AddPartitionsToTxnManager
extends InterBrokerSendThread {
    public static final String VERIFICATION_FAILURE_RATE_METRIC_NAME = "VerificationFailureRate";
    public static final String VERIFICATION_TIME_MS_METRIC_NAME = "VerificationTimeMs";
    private final MetadataCache metadataCache;
    private final Function<String, Integer> partitionFor;
    private final Time time;
    private final ListenerName interBrokerListenerName;
    private final Set<Node> inflightNodes = new HashSet<Node>();
    private final Map<Node, TransactionDataAndCallbacks> nodesToTransactions = new HashMap<Node, TransactionDataAndCallbacks>();
    private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "AddPartitionsToTxnManager");
    private final Meter verificationFailureRate = this.metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS);
    private final Histogram verificationTimeMs = this.metricsGroup.newHistogram("VerificationTimeMs");

    public static TransactionSupportedOperation produceRequestVersionToTransactionSupportedOperation(short version) {
        if (version > 11) {
            return TransactionSupportedOperation.ADD_PARTITION;
        }
        if (version > 10) {
            return TransactionSupportedOperation.GENERIC_ERROR_SUPPORTED;
        }
        return TransactionSupportedOperation.DEFAULT_ERROR;
    }

    public static TransactionSupportedOperation txnOffsetCommitRequestVersionToTransactionSupportedOperation(int version) {
        if (version > 4) {
            return TransactionSupportedOperation.ADD_PARTITION;
        }
        if (version > 3) {
            return TransactionSupportedOperation.GENERIC_ERROR_SUPPORTED;
        }
        return TransactionSupportedOperation.DEFAULT_ERROR;
    }

    public AddPartitionsToTxnManager(AbstractKafkaConfig config, NetworkClient client, MetadataCache metadataCache, Function<String, Integer> partitionFor, Time time) {
        super("AddPartitionsToTxnSenderThread-" + config.brokerId(), (KafkaClient)client, config.requestTimeoutMs(), time);
        this.interBrokerListenerName = config.interBrokerListenerName();
        this.metadataCache = metadataCache;
        this.partitionFor = partitionFor;
        this.time = time;
    }

    public void addOrVerifyTransaction(String transactionalId, long producerId, short producerEpoch, Collection<TopicPartition> topicPartitions, AppendCallback callback, TransactionSupportedOperation transactionSupportedOperation) {
        Optional<Node> coordinator = this.getTransactionCoordinator(this.partitionFor.apply(transactionalId));
        if (coordinator.isEmpty()) {
            callback.complete(topicPartitions.stream().collect(Collectors.toMap(Function.identity(), tp -> Errors.COORDINATOR_NOT_AVAILABLE)));
        } else {
            AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection topicCollection = new AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection();
            topicPartitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic, tps) -> topicCollection.add((ImplicitLinkedHashCollection.Element)new AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic().setName(topic).setPartitions(tps.stream().map(TopicPartition::partition).collect(Collectors.toList()))));
            AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction transactionData = new AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction().setTransactionalId(transactionalId).setProducerId(producerId).setProducerEpoch(producerEpoch).setVerifyOnly(!transactionSupportedOperation.supportsEpochBump).setTopics(topicCollection);
            this.addTxnData(coordinator.get(), transactionData, callback, transactionSupportedOperation);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addTxnData(Node node, AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction transactionData, AppendCallback callback, TransactionSupportedOperation transactionSupportedOperation) {
        Map<Node, TransactionDataAndCallbacks> map = this.nodesToTransactions;
        synchronized (map) {
            long curTime = this.time.milliseconds();
            TransactionDataAndCallbacks existingNodeAndTransactionData = this.nodesToTransactions.computeIfAbsent(node, ignored -> new TransactionDataAndCallbacks(new AddPartitionsToTxnRequestData.AddPartitionsToTxnTransactionCollection(1), new HashMap<String, AppendCallback>(), new HashMap<String, Long>(), transactionSupportedOperation));
            AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId());
            if (existingTransactionData != null) {
                if (existingTransactionData.producerEpoch() <= transactionData.producerEpoch()) {
                    Errors error = existingTransactionData.producerEpoch() < transactionData.producerEpoch() ? Errors.INVALID_PRODUCER_EPOCH : Errors.NETWORK_EXCEPTION;
                    AppendCallback oldCallback = existingNodeAndTransactionData.callbacks.get(transactionData.transactionalId());
                    existingNodeAndTransactionData.transactionData.remove((Object)transactionData);
                    this.sendCallback(oldCallback, this.topicPartitionsToError(existingTransactionData, error), existingNodeAndTransactionData.startTimeMs.get(transactionData.transactionalId()));
                } else {
                    this.sendCallback(callback, this.topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH), curTime);
                    return;
                }
            }
            existingNodeAndTransactionData.transactionData.add((ImplicitLinkedHashCollection.Element)transactionData);
            existingNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback);
            existingNodeAndTransactionData.startTimeMs.put(transactionData.transactionalId(), curTime);
            this.wakeup();
        }
    }

    private Optional<Node> getTransactionCoordinator(int partition) {
        return this.metadataCache.getLeaderAndIsr("__transaction_state", partition).filter(leaderAndIsr -> leaderAndIsr.leader() != -1).flatMap(metadata -> this.metadataCache.getAliveBrokerNode(metadata.leader(), this.interBrokerListenerName));
    }

    private Map<TopicPartition, Errors> topicPartitionsToError(AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction txnData, Errors error) {
        HashMap<TopicPartition, Errors> topicPartitionsToError = new HashMap<TopicPartition, Errors>();
        txnData.topics().forEach(topic -> topic.partitions().forEach(partition -> topicPartitionsToError.put(new TopicPartition(topic.name(), partition.intValue()), error)));
        this.verificationFailureRate.mark((long)topicPartitionsToError.size());
        return topicPartitionsToError;
    }

    private void sendCallback(AppendCallback callback, Map<TopicPartition, Errors> errors, long startTimeMs) {
        this.verificationTimeMs.update(this.time.milliseconds() - startTimeMs);
        callback.complete(errors);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<RequestAndCompletionHandler> generateRequests() {
        ArrayList<RequestAndCompletionHandler> list = new ArrayList<RequestAndCompletionHandler>();
        long currentTimeMs = this.time.milliseconds();
        Map<Node, TransactionDataAndCallbacks> map = this.nodesToTransactions;
        synchronized (map) {
            Iterator<Map.Entry<Node, TransactionDataAndCallbacks>> iter = this.nodesToTransactions.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<Node, TransactionDataAndCallbacks> entry = iter.next();
                Node node = entry.getKey();
                TransactionDataAndCallbacks transactionDataAndCallbacks = entry.getValue();
                if (this.inflightNodes.contains(node)) continue;
                list.add(new RequestAndCompletionHandler(currentTimeMs, node, (AbstractRequest.Builder)AddPartitionsToTxnRequest.Builder.forBroker((AddPartitionsToTxnRequestData.AddPartitionsToTxnTransactionCollection)transactionDataAndCallbacks.transactionData()), (RequestCompletionHandler)new AddPartitionsToTxnHandler(node, transactionDataAndCallbacks)));
                this.inflightNodes.add(node);
                iter.remove();
            }
        }
        return list;
    }

    public void shutdown() throws InterruptedException {
        super.shutdown();
        this.metricsGroup.removeMetric(VERIFICATION_FAILURE_RATE_METRIC_NAME);
        this.metricsGroup.removeMetric(VERIFICATION_TIME_MS_METRIC_NAME);
    }

    public static enum TransactionSupportedOperation {
        DEFAULT_ERROR(false),
        GENERIC_ERROR_SUPPORTED(false),
        ADD_PARTITION(true);

        public final boolean supportsEpochBump;

        private TransactionSupportedOperation(boolean supportsEpochBump) {
            this.supportsEpochBump = supportsEpochBump;
        }
    }

    @FunctionalInterface
    public static interface AppendCallback {
        public void complete(Map<TopicPartition, Errors> var1);
    }

    public record TransactionDataAndCallbacks(AddPartitionsToTxnRequestData.AddPartitionsToTxnTransactionCollection transactionData, Map<String, AppendCallback> callbacks, Map<String, Long> startTimeMs, TransactionSupportedOperation transactionSupportedOperation) {
    }

    private class AddPartitionsToTxnHandler
    implements RequestCompletionHandler {
        private final Node node;
        private final TransactionDataAndCallbacks transactionDataAndCallbacks;

        public AddPartitionsToTxnHandler(Node node, TransactionDataAndCallbacks transactionDataAndCallbacks) {
            this.node = node;
            this.transactionDataAndCallbacks = transactionDataAndCallbacks;
        }

        public void onComplete(ClientResponse response) {
            AddPartitionsToTxnManager.this.inflightNodes.remove(this.node);
            if (response.authenticationException() != null) {
                AddPartitionsToTxnManager.this.log.error("AddPartitionsToTxnRequest failed for node {} with an authentication exception.", (Object)response.destination(), (Object)response.authenticationException());
                this.sendCallbacksToAll(Errors.forException((Throwable)response.authenticationException()).code());
            } else if (response.versionMismatch() != null) {
                AddPartitionsToTxnManager.this.log.warn("AddPartitionsToTxnRequest failed for node {} with invalid version exception. This suggests verification is not supported. Continuing handling the produce request.", (Object)response.destination());
                this.transactionDataAndCallbacks.callbacks().forEach((txnId, callback) -> AddPartitionsToTxnManager.this.sendCallback((AppendCallback)callback, Map.of(), this.transactionDataAndCallbacks.startTimeMs.get(txnId)));
            } else if (response.wasDisconnected() || response.wasTimedOut()) {
                AddPartitionsToTxnManager.this.log.warn("AddPartitionsToTxnRequest failed for node {} with a network exception.", (Object)response.destination());
                this.sendCallbacksToAll(Errors.NETWORK_EXCEPTION.code());
            } else {
                AddPartitionsToTxnResponseData responseData = ((AddPartitionsToTxnResponse)response.responseBody()).data();
                if (responseData.errorCode() != 0) {
                    AddPartitionsToTxnManager.this.log.error("AddPartitionsToTxnRequest for node {} returned with error {}.", (Object)response.destination(), (Object)Errors.forCode((short)responseData.errorCode()));
                    short finalError = responseData.errorCode() == Errors.CLUSTER_AUTHORIZATION_FAILED.code() ? Errors.INVALID_TXN_STATE.code() : responseData.errorCode();
                    this.sendCallbacksToAll(finalError);
                } else {
                    for (AddPartitionsToTxnResponseData.AddPartitionsToTxnResult txnResult : responseData.resultsByTransaction()) {
                        HashMap<TopicPartition, Errors> unverified = new HashMap<TopicPartition, Errors>();
                        for (AddPartitionsToTxnResponseData.AddPartitionsToTxnTopicResult topicResult : txnResult.topicResults()) {
                            for (AddPartitionsToTxnResponseData.AddPartitionsToTxnPartitionResult partitionResult : topicResult.resultsByPartition()) {
                                TopicPartition tp = new TopicPartition(topicResult.name(), partitionResult.partitionIndex());
                                if (partitionResult.partitionErrorCode() == Errors.NONE.code()) continue;
                                short code = partitionResult.partitionErrorCode() == Errors.PRODUCER_FENCED.code() ? Errors.INVALID_PRODUCER_EPOCH.code() : (partitionResult.partitionErrorCode() == Errors.TRANSACTION_ABORTABLE.code() && this.transactionDataAndCallbacks.transactionSupportedOperation().equals((Object)TransactionSupportedOperation.DEFAULT_ERROR) ? Errors.INVALID_TXN_STATE.code() : partitionResult.partitionErrorCode());
                                unverified.put(tp, Errors.forCode((short)code));
                            }
                        }
                        AddPartitionsToTxnManager.this.verificationFailureRate.mark((long)unverified.size());
                        AppendCallback callback2 = this.transactionDataAndCallbacks.callbacks().get(txnResult.transactionalId());
                        AddPartitionsToTxnManager.this.sendCallback(callback2, unverified, this.transactionDataAndCallbacks.startTimeMs.get(txnResult.transactionalId()));
                    }
                }
            }
            AddPartitionsToTxnManager.this.wakeup();
        }

        private Map<TopicPartition, Errors> buildErrorMap(String transactionalId, short errorCode) {
            AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction transactionData = this.transactionDataAndCallbacks.transactionData.find(transactionalId);
            return AddPartitionsToTxnManager.this.topicPartitionsToError(transactionData, Errors.forCode((short)errorCode));
        }

        private void sendCallbacksToAll(short errorCode) {
            this.transactionDataAndCallbacks.callbacks.forEach((txnId, cb) -> AddPartitionsToTxnManager.this.sendCallback((AppendCallback)cb, this.buildErrorMap((String)txnId, errorCode), this.transactionDataAndCallbacks.startTimeMs.get(txnId)));
        }
    }
}

