/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.workload;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.workload.RoundTripWorkerBase;
import org.apache.kafka.trogdor.workload.RoundTripWorkloadSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShareRoundTripWorker
extends RoundTripWorkerBase {
    private static final Logger log = LoggerFactory.getLogger(ShareRoundTripWorker.class);
    KafkaShareConsumer<byte[], byte[]> consumer;

    ShareRoundTripWorker(String id, RoundTripWorkloadSpec spec) {
        this.id = id;
        this.spec = spec;
    }

    @Override
    public void initializeConsumer(HashSet<TopicPartition> partitions) {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.spec.bootstrapServers());
        props.put("client.id", "consumer." + this.id);
        props.put("auto.offset.reset", "earliest");
        props.put("request.timeout.ms", (Object)105000);
        props.put("max.poll.interval.ms", (Object)100000);
        WorkerUtils.addConfigsToProperties(props, this.spec.commonClientConf(), this.spec.consumerConf());
        String groupId = "round-trip-share-group-" + this.id;
        props.put("group.id", groupId);
        try (Admin adminClient = WorkerUtils.createAdminClient(this.spec.bootstrapServers(), this.spec.commonClientConf(), this.spec.adminClientConf());){
            this.alterShareAutoOffsetReset(groupId, "earliest", adminClient);
        }
        catch (Exception e) {
            log.warn("Failed to set share.auto.offset.reset config to 'earliest' mode", (Throwable)e);
            throw e;
        }
        this.consumer = new KafkaShareConsumer(props, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
        this.consumer.subscribe(this.spec.activeTopics().materialize().keySet());
    }

    @Override
    protected ConsumerRecords<byte[], byte[]> fetchRecords(Duration duration) {
        return this.consumer.poll(duration);
    }

    @Override
    protected void shutdownConsumer() {
        Utils.closeQuietly(this.consumer, (String)"consumer");
        this.consumer = null;
    }

    private void alterShareAutoOffsetReset(String groupId, String newValue, Admin adminClient) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
        HashMap<ConfigResource, List<AlterConfigOp>> alterEntries = new HashMap<ConfigResource, List<AlterConfigOp>>();
        alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry("share.auto.offset.reset", newValue), AlterConfigOp.OpType.SET)));
        AlterConfigsOptions alterOptions = new AlterConfigsOptions();
        try {
            adminClient.incrementalAlterConfigs(alterEntries, alterOptions).all().get(60L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            throw new RuntimeException("Exception was thrown while attempting to set share.auto.offset.reset config: ", e);
        }
    }
}

