/*
 * Decompiled with CFR 0.152.
 */
package com.openexchange.logback.extensions.appenders.kafka;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.util.CloseUtil;
import com.openexchange.logback.extensions.appenders.AbstractRemoteAppender;
import com.openexchange.logback.extensions.appenders.kafka.KafkaAppenderMBean;
import com.openexchange.logback.extensions.appenders.kafka.KafkaExceptionHandler;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;

public class KafkaAppender
extends AbstractRemoteAppender<Producer<byte[], byte[]>>
implements KafkaAppenderMBean {
    private static final AtomicReference<KafkaAppender> REF = new AtomicReference();
    private static final String NAME = "kafka";
    private volatile Producer<byte[], byte[]> producer;
    final Map<String, Object> producerConfiguration = new HashMap<String, Object>();
    private byte[] hostnameHashKey;
    private String servers;
    private String topic;
    private String key;

    public static KafkaAppender getInstance() {
        return REF.get();
    }

    @Override
    protected String getAppenderName() {
        return NAME;
    }

    @Override
    public void start() {
        REF.set(this);
        if (this.isStarted()) {
            return;
        }
        if (this.initialiseConfiguration() > 0) {
            return;
        }
        this.exceptionHandler = new KafkaExceptionHandler(this, this.servers);
        super.start();
    }

    @Override
    protected void postStop() {
        REF.set(null);
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public void setServers(String servers) {
        this.servers = servers;
    }

    public void setKey(String key) {
        this.key = key;
    }

    private int initialiseConfiguration() {
        int errorCount = 0;
        if (this.servers == null) {
            this.addError("No 'servers' set for the '" + this.name + "' appender.");
            ++errorCount;
        }
        if (this.topic == null) {
            this.addError("No 'topic' set for the '" + this.name + "' appender.");
            ++errorCount;
        }
        this.calculateHostnamePartitioningKey();
        this.producerConfiguration.put("bootstrap.servers", this.servers);
        this.producerConfiguration.put("key.serializer", ByteArraySerializer.class.getName());
        this.producerConfiguration.put("value.serializer", ByteArraySerializer.class.getName());
        return errorCount;
    }

    private void calculateHostnamePartitioningKey() {
        String hostname = this.key;
        if (hostname == null || hostname.isEmpty()) {
            this.addInfo("Partitioning key is not set. Falling back to 'HOSTNAME'");
            hostname = this.context.getProperty("HOSTNAME");
            if (hostname == null || hostname.isEmpty()) {
                this.addWarn("The hostname is not set in the context. No hostname partitioning will be used.");
                return;
            }
        }
        this.addInfo("Partitioning key is set to: " + hostname);
        this.hostnameHashKey = ByteBuffer.allocate(4).putInt(hostname.hashCode()).array();
    }

    @Override
    protected void dispatchEvents() {
        ILoggingEvent event = null;
        this.addInfo("Dispatching events...");
        try {
            try {
                while (true) {
                    event = (ILoggingEvent)this.queue.take();
                    this.writeEvent(event);
                    event = null;
                }
            }
            catch (Exception e) {
                this.exceptionHandler.handle(e, event);
                event = null;
                CloseUtil.closeQuietly(this.producer);
                this.addInfo("Connection to " + this.servers + " closed.");
                if (event != null) {
                    this.doAppend(event);
                }
            }
        }
        catch (Throwable throwable) {
            CloseUtil.closeQuietly(this.producer);
            this.addInfo("Connection to " + this.servers + " closed.");
            if (event != null) {
                this.doAppend(event);
            }
            throw throwable;
        }
    }

    private void writeEvent(ILoggingEvent event) {
        ((Producer)this.getConnector()).send(new ProducerRecord(this.topic, (Object)this.hostnameHashKey, (Object)this.encoder.encode((Object)event)));
    }

    @Override
    protected Callable<Producer<byte[], byte[]>> createConnector() {
        this.addInfo("Creating Kafka producer for " + this.servers + "...");
        ProducerInitialiseTask task = new ProducerInitialiseTask();
        this.addInfo("Kafka producer for " + this.servers + " created.");
        return task;
    }

    @Override
    protected Producer<byte[], byte[]> waitForConnectorInitialisation() throws Exception {
        return (Producer)this.connectorTask.get(this.getConnectionTimeout(), TimeUnit.MILLISECONDS);
    }

    private final class ProducerInitialiseTask
    implements Callable<Producer<byte[], byte[]>> {
        @Override
        public Producer<byte[], byte[]> call() throws Exception {
            return new KafkaProducer(KafkaAppender.this.producerConfiguration);
        }
    }
}

