/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.errors;

import java.util.Map;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogAndContinueProcessingExceptionHandler
implements ProcessingExceptionHandler {
    private static final Logger log = LoggerFactory.getLogger(LogAndContinueProcessingExceptionHandler.class);
    private String deadLetterQueueTopic = null;

    @Override
    public ProcessingExceptionHandler.Response handleError(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
        log.warn("Exception caught during message processing, processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}", new Object[]{context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(), exception});
        return ProcessingExceptionHandler.Response.resume(ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords(this.deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawValue(), context, exception));
    }

    public void configure(Map<String, ?> configs) {
        if (configs.get("errors.dead.letter.queue.topic.name") != null) {
            this.deadLetterQueueTopic = String.valueOf(configs.get("errors.dead.letter.queue.topic.name"));
        }
    }
}

