/*
 * Decompiled with CFR 0.152.
 */
package com.openexchange.tools.pipesnfilters.internal;

import com.openexchange.threadpool.ThreadPoolService;
import com.openexchange.tools.pipesnfilters.DataSource;
import com.openexchange.tools.pipesnfilters.Filter;
import com.openexchange.tools.pipesnfilters.PipesAndFiltersException;
import com.openexchange.tools.pipesnfilters.internal.DataSink;
import com.openexchange.tools.pipesnfilters.internal.FilterTask;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class Pipe<T>
implements DataSource<T>,
DataSink<T> {
    private static final Logger LOG = LoggerFactory.getLogger(Pipe.class);
    private final ThreadPoolService threadPool;
    private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
    private final Object eof = new Object();
    private boolean finished = false;

    public Pipe(ThreadPoolService threadPool) {
        this.threadPool = threadPool;
    }

    @Override
    public boolean hasData() {
        return !this.finished || !this.queue.isEmpty();
    }

    @Override
    public int getData(Collection<T> col) throws PipesAndFiltersException {
        int retval = 0;
        if (this.queue.isEmpty()) {
            try {
                this.pass(this.queue.take(), col);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new PipesAndFiltersException(e);
            }
        }
        ArrayList tmps = new ArrayList(this.queue.size());
        retval += this.queue.drainTo(tmps);
        for (Object tmp : tmps) {
            retval += this.pass(tmp, col);
        }
        return retval;
    }

    private int pass(Object tmp, Collection<T> col) throws PipesAndFiltersException {
        int retval = 0;
        if (this.eof.equals(tmp)) {
            this.finished = true;
        } else {
            if (tmp instanceof PipesAndFiltersException) {
                this.finished = true;
                throw (PipesAndFiltersException)((Object)tmp);
            }
            Object obj = tmp;
            col.add(obj);
            ++retval;
        }
        return retval;
    }

    @Override
    public void put(T element) throws PipesAndFiltersException {
        try {
            this.queue.put(element);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PipesAndFiltersException(e);
        }
    }

    @Override
    public void exception(PipesAndFiltersException e) {
        try {
            this.queue.put((Object)e);
        }
        catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
            LOG.error("", (Throwable)e1);
        }
    }

    @Override
    public void finished() throws PipesAndFiltersException {
        try {
            this.queue.put(this.eof);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PipesAndFiltersException(e);
        }
    }

    @Override
    public <O> DataSource<O> addFilter(Filter<T, O> filter) {
        Pipe<T> pipe = new Pipe<T>(this.threadPool);
        FilterTask<T, O> task = new FilterTask<T, O>(this, filter, pipe);
        this.threadPool.submit(task);
        return pipe;
    }
}

