package de.unibamberg.minf.processing.service;

import de.unibamberg.minf.dme.model.datamodel.base.DatamodelNature;
import de.unibamberg.minf.processing.consumption.ResourceConsumptionService;
import de.unibamberg.minf.processing.exception.ProcessingConfigException;
import de.unibamberg.minf.processing.exception.ResourceProcessingException;
import de.unibamberg.minf.processing.listener.ProcessingListener;
import de.unibamberg.minf.processing.listener.ResourceProcessingListener;
import de.unibamberg.minf.processing.model.SerializableResource;
import de.unibamberg.minf.processing.model.base.Resource;
import de.unibamberg.minf.processing.service.base.BaseResourceProcessingServiceImpl;
import de.unibamberg.minf.processing.service.base.ProcessingService;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

/* loaded from: input_file:BOOT-INF/lib/processing-core-4.5.8-SNAPSHOT.jar:de/unibamberg/minf/processing/service/ParallelFileProcessingService.class */
public class ParallelFileProcessingService<T extends DatamodelNature> extends BaseResourceProcessingServiceImpl<T> implements ResourceConsumptionService, ApplicationContextAware, ProcessingListener {
    protected ApplicationContext applicationContext;
    private BlockingQueue<File> fileQueue;
    private static ExecutorService executorService;
    private MatchingFileCollector fileCollector;
    private LinkedHashMap<UUID, ProcessingService> services;
    private Class<? extends BaseResourceProcessingServiceImpl<T>> wrappedServiceType;
    private BaseResourceProcessingServiceImpl<T> wrappedServiceTemplate;
    private int maxParallelThreads = 2;
    private String path;
    private int filesProcessing;

    private static ExecutorService getExecutorServiceInstance() {
        if (executorService == null) {
            executorService = Executors.newFixedThreadPool(10);
        }
        return executorService;
    }

    public static void setExecutorServiceInstance(int i, ThreadFactory threadFactory) {
        executorService = Executors.newFixedThreadPool(i, threadFactory);
    }

    public String getPath() {
        return this.path;
    }

    public void setPath(String str) {
        this.path = str;
    }

    public int getMaxParallelThreads() {
        return this.maxParallelThreads;
    }

    public void setMaxParallelThreads(int i) {
        this.maxParallelThreads = i;
    }

    public Class<? extends BaseResourceProcessingServiceImpl<T>> getWrappedServiceType() {
        return this.wrappedServiceType;
    }

    public void setWrappedServiceType(Class<? extends BaseResourceProcessingServiceImpl<T>> cls) {
        this.wrappedServiceType = cls;
    }

    public MatchingFileCollector getFileCollector() {
        return this.fileCollector;
    }

    public void setFileCollector(MatchingFileCollector matchingFileCollector) {
        this.fileCollector = matchingFileCollector;
    }

    @Override // org.springframework.context.ApplicationContextAware
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override // de.unibamberg.minf.processing.service.base.BaseProcessingService, de.unibamberg.minf.processing.service.base.ProcessingService
    public synchronized void requestCancellation() {
        super.requestCancellation();
        this.fileQueue.clear();
        Iterator<UUID> it = this.services.keySet().iterator();
        while (it.hasNext()) {
            this.services.get(it.next()).requestCancellation();
        }
    }

    @Override // de.unibamberg.minf.processing.service.base.BaseResourceProcessingServiceImpl, de.unibamberg.minf.processing.service.base.BaseProcessingService, de.unibamberg.minf.processing.service.base.ProcessingService
    public void init() throws ProcessingConfigException {
        if (this.path == null) {
            throw new ProcessingConfigException("Path not set");
        }
        super.init();
        this.fileQueue = new LinkedBlockingQueue();
        this.wrappedServiceTemplate = (BaseResourceProcessingServiceImpl) this.applicationContext.getBean(this.wrappedServiceType);
        this.services = new LinkedHashMap<>();
        this.logger.info("Initialized for path [{}], maximum threads: {}", this.path, Integer.valueOf(getMaxParallelThreads()));
    }

    @Override // de.unibamberg.minf.processing.service.base.BaseResourceProcessingServiceImpl, java.lang.Runnable
    public void run() {
        try {
            if (!isInitialized()) {
                throw new ProcessingConfigException("Explicit call to initialization method is required but has not been executed");
            }
            if (isCancellationRequested()) {
                throw new ResourceProcessingException("Service cancellation has been requested");
            }
            updateState(ProcessingService.ProcessingServiceStates.ACTIVE);
            if (getListener() != null) {
                this.listener.start(getUuid());
            }
            try {
                doIndexAsync();
            } catch (Throwable th) {
                this.logger.error("An error occurred while indexing", th);
                updateState(ProcessingService.ProcessingServiceStates.ERROR);
                if (getListener() != null) {
                    this.listener.error(getUuid());
                }
            }
        } catch (Exception e) {
            this.logger.error("An error occurred while starting processing runnable", (Throwable) e);
        }
    }

    @Override // de.unibamberg.minf.processing.service.base.BaseResourceProcessingServiceImpl
    protected void doIndexAsync() throws Throwable {
        File file = new File(this.path);
        this.filesProcessing = 0;
        if (!file.exists() || !file.isDirectory()) {
            if (!file.exists() || file.isDirectory()) {
                this.logger.info(String.format("No accessible files detected at [%s]", file));
                processDone();
                return;
            } else {
                this.logger.info(String.format("Processing file [%s]", file));
                processContent(file, (String) null);
                return;
            }
        }
        ArrayList arrayList = new ArrayList();
        if (this.fileCollector != null) {
            this.logger.info(String.format("Processing directory [%s] with FileCollector instance", this.path));
            this.fileCollector.collectFiles();
            Iterator<Path> it = this.fileCollector.getMatchingFiles().iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().toFile());
            }
        } else {
            this.logger.info(String.format("Processing directory [%s]", this.path));
            for (File file2 : file.listFiles()) {
                if (!file2.isDirectory()) {
                    arrayList.add(file2);
                }
            }
        }
        if (getListener() != null) {
            getListener().updateSize(getUuid(), arrayList.size());
        }
        if (arrayList.size() == 0) {
            processDone();
        }
        boolean z = false;
        for (int i = 0; i < arrayList.size(); i++) {
            if (isCancellationRequested()) {
                throw new ResourceProcessingException("Service cancellation has been requested");
            }
            if (z || this.services.keySet().size() >= getMaxParallelThreads()) {
                this.fileQueue.add((File) arrayList.get(i));
                z = true;
                this.logger.debug(String.format("Enqueued file [%s]; index [%s]", arrayList.get(i), Integer.valueOf(i)));
            } else {
                processContent((File) arrayList.get(i), (String) null);
            }
        }
    }

    @Override // de.unibamberg.minf.processing.service.base.BaseResourceProcessingServiceImpl
    public List<Resource> processContent(File file, String str) throws IOException, ResourceProcessingException {
        if (isCancellationRequested()) {
            throw new ResourceProcessingException("Service cancellation has been requested");
        }
        List<Resource> processContent = processContent(file.getAbsolutePath(), new FileInputStream(file));
        if (this.listener != null && (this.listener instanceof ResourceProcessingListener)) {
            ((ResourceProcessingListener) this.listener).addProcessed(this);
        }
        return processContent;
    }

    public List<Resource> processContent(String str, InputStream inputStream) throws ResourceProcessingException {
        try {
            if (isCancellationRequested()) {
                throw new ResourceProcessingException("Service cancellation has been requested");
            }
            BaseResourceProcessingServiceImpl baseResourceProcessingServiceImpl = (BaseResourceProcessingServiceImpl) this.applicationContext.getBean(this.wrappedServiceType);
            if (str != null) {
                try {
                    baseResourceProcessingServiceImpl.setResourceMetadata(new ArrayList(1));
                    baseResourceProcessingServiceImpl.getResourceMetadata().add(new SerializableResource("_filename", str.substring(this.path.length() + 1)));
                } catch (Exception e) {
                    this.logger.warn("Failed to set _filename", (Throwable) e);
                }
            }
            baseResourceProcessingServiceImpl.setMdcUid(getMdcUid());
            baseResourceProcessingServiceImpl.setDebug(isDebug());
            baseResourceProcessingServiceImpl.setRoot(getRoot());
            baseResourceProcessingServiceImpl.setSchema(getSchema());
            baseResourceProcessingServiceImpl.addConsumptionService(this);
            baseResourceProcessingServiceImpl.setListener(this);
            baseResourceProcessingServiceImpl.setInputStream(inputStream);
            baseResourceProcessingServiceImpl.setExecutionContext(getExecutionContext());
            baseResourceProcessingServiceImpl.init();
            this.services.put(baseResourceProcessingServiceImpl.getUuid(), baseResourceProcessingServiceImpl);
            getExecutorServiceInstance().execute(baseResourceProcessingServiceImpl);
            return null;
        } catch (Exception e2) {
            throw new ResourceProcessingException("Failed to setup wrapped service for parallel processing", e2);
        }
    }

    @Override // de.unibamberg.minf.processing.service.base.BaseResourceProcessingServiceImpl
    public List<Resource> processContent(InputStream inputStream) throws ResourceProcessingException {
        return processContent((String) null, inputStream);
    }

    @Override // de.unibamberg.minf.processing.service.base.BaseResourceProcessingServiceImpl, de.unibamberg.minf.processing.consumption.ResourceConsumptionService
    public void init(String str) throws ProcessingConfigException {
    }

    @Override // de.unibamberg.minf.processing.service.base.BaseResourceProcessingServiceImpl, de.unibamberg.minf.processing.consumption.ResourceConsumptionService
    public synchronized boolean consume(Resource resource) {
        if (getConsumptionServices() == null) {
            return true;
        }
        Iterator<ResourceConsumptionService> it = getConsumptionServices().iterator();
        while (it.hasNext()) {
            it.next().consume(resource);
        }
        return true;
    }

    @Override // de.unibamberg.minf.processing.service.base.BaseResourceProcessingServiceImpl, de.unibamberg.minf.processing.consumption.ResourceConsumptionService
    public synchronized int commit() {
        return 0;
    }

    @Override // de.unibamberg.minf.processing.service.base.BaseResourceProcessingServiceImpl
    public synchronized String getContentAsString(Resource resource) {
        return this.wrappedServiceTemplate.getContentAsString(resource);
    }

    @Override // de.unibamberg.minf.processing.listener.ProcessingListener
    public synchronized void start(UUID uuid) {
    }

    @Override // de.unibamberg.minf.processing.listener.ProcessingListener
    public synchronized void updateSize(UUID uuid, long j) {
    }

    @Override // de.unibamberg.minf.processing.listener.ProcessingListener
    public synchronized void processed(UUID uuid, long j) {
    }

    @Override // de.unibamberg.minf.processing.listener.ProcessingListener
    public synchronized void finished(UUID uuid) {
        this.services.remove(uuid);
        processDone();
    }

    @Override // de.unibamberg.minf.processing.listener.ProcessingListener
    public synchronized void error(UUID uuid) {
        this.services.remove(uuid);
        processDone();
    }

    protected synchronized void processDone() {
        ProcessingListener listener = getListener();
        UUID uuid = getUuid();
        int i = this.filesProcessing + 1;
        this.filesProcessing = i;
        listener.processed(uuid, i);
        try {
            if (!this.fileQueue.isEmpty()) {
                processContent(this.fileQueue.take(), (String) null);
                return;
            }
        } catch (Exception e) {
            this.logger.error("Failed to take file from queue", (Throwable) e);
        }
        if (this.services.isEmpty()) {
            super.commitConsumptionServices();
            updateState(ProcessingService.ProcessingServiceStates.COMPLETE);
            if (getListener() != null) {
                this.listener.finished(getUuid());
            }
        }
    }

    void shutdownAndAwaitTermination(ExecutorService executorService2) {
        executorService2.shutdown();
        try {
            if (!executorService2.awaitTermination(60L, TimeUnit.SECONDS)) {
                executorService2.shutdownNow();
                if (!executorService2.awaitTermination(60L, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException e) {
            executorService2.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}
