/*
 * Decompiled with CFR 0.152.
 */
package com.openexchange.logback.extensions.appenders.kafka;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.spi.ContextAwareBase;
import com.openexchange.logback.extensions.appenders.AbstractRemoteAppender;
import com.openexchange.logback.extensions.appenders.ExceptionHandler;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.SerializationException;

public class KafkaExceptionHandler
implements ExceptionHandler {
    private final Map<Class<? extends Exception>, KafkaFuture.BiConsumer<Exception, ILoggingEvent>> exceptionHandlers;
    private final ContextAwareBase appender;
    private final String peerId;

    public KafkaExceptionHandler(final AbstractRemoteAppender<Producer<byte[], byte[]>> appender, String peerId) {
        this.appender = appender;
        this.peerId = peerId;
        HashMap<Class<KafkaException>, Object> exh = new HashMap<Class<KafkaException>, Object>();
        KafkaFuture.BiConsumer<Exception, ILoggingEvent> common = new KafkaFuture.BiConsumer<Exception, ILoggingEvent>(){

            public void accept(Exception e, ILoggingEvent event) {
                appender.addError("An error occurred while sending records to Kafka. Flushing record to fallback output.", e);
                appender.writeEventToFallbackOutput(event);
            }
        };
        exh.put(InterruptedException.class, common);
        exh.put(IllegalStateException.class, common);
        exh.put(TimeoutException.class, common);
        exh.put(SerializationException.class, new KafkaFuture.BiConsumer<Exception, ILoggingEvent>(){

            public void accept(Exception e, ILoggingEvent event) {
                appender.addError("A serialisation error occurred. Flushing record to fallback output.", e);
                appender.writeEventToFallbackOutput(event);
            }
        });
        exh.put(AuthenticationException.class, new KafkaFuture.BiConsumer<Exception, ILoggingEvent>(){

            public void accept(Exception e, ILoggingEvent event) {
                appender.addError("An authentication error occurred. Flushing record to fallback output.", e);
                appender.writeEventToFallbackOutput(event);
            }
        });
        exh.put(AuthorizationException.class, new KafkaFuture.BiConsumer<Exception, ILoggingEvent>(){

            public void accept(Exception e, ILoggingEvent event) {
                appender.addError("An authorization error occurred. Flushing record to fallback output.", e);
                appender.writeEventToFallbackOutput(event);
            }
        });
        exh.put(KafkaException.class, new KafkaFuture.BiConsumer<Exception, ILoggingEvent>(){

            public void accept(Exception e, ILoggingEvent event) {
                appender.addError("An unexpected Kafka error occurred. Flushing record to fallback output.", e);
                appender.writeEventToFallbackOutput(event);
            }
        });
        this.exceptionHandlers = Collections.unmodifiableMap(exh);
    }

    @Override
    public void handle(Exception e) {
        this.handle(e, null);
    }

    @Override
    public void handle(Exception e, ILoggingEvent event) {
        KafkaFuture.BiConsumer<Exception, ILoggingEvent> exceptionHandler = this.exceptionHandlers.get(e.getClass());
        if (exceptionHandler == null) {
            this.appender.addError("Connection error to " + this.peerId + ".", (Throwable)e);
            return;
        }
        exceptionHandler.accept((Object)e, (Object)event);
    }
}

