package com.hikvision.artemis.sdk.kafka.data.task.processor;

import com.hikvision.artemis.sdk.kafka.config.ProcessorConfig;
import com.hikvision.artemis.sdk.kafka.data.parse.DataMapper;
import com.hikvision.artemis.sdk.kafka.data.push.ExecuteCommand;
import com.hikvision.artemis.sdk.kafka.data.result.ExecuteResult;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hikvision/artemis/sdk/kafka/data/task/processor/MessageProcessor.class */
public class MessageProcessor<T> implements RejectedExecutionHandler {
    private Logger logger;
    private int coreSize;
    private int maxSize;
    private long keepAliveTime;
    private int queueMaxSize;
    private BlockingQueue<Runnable> queue;
    private ExecutorService executorService;
    private DataMapper<T> dataMapper;
    private ExecuteCommand<T> command;

    public void initThreadPool() {
        this.executorService = new ThreadPoolExecutor(this.coreSize, this.maxSize, this.keepAliveTime, TimeUnit.SECONDS, this.queue, this);
    }

    public MessageProcessor(DataMapper<T> dataMapper, ExecuteCommand<T> executeCommand) {
        this(dataMapper, executeCommand, null);
    }

    public MessageProcessor(DataMapper<T> dataMapper, ExecuteCommand<T> executeCommand, ProcessorConfig processorConfig) {
        this.logger = LoggerFactory.getLogger(getClass());
        this.coreSize = 10;
        this.maxSize = 10;
        this.keepAliveTime = 60L;
        this.queueMaxSize = 1000;
        this.queue = new ArrayBlockingQueue(this.queueMaxSize);
        this.dataMapper = dataMapper;
        this.command = executeCommand;
        if (processorConfig != null) {
            this.coreSize = processorConfig.getThreadPoolCoreSize();
            this.maxSize = processorConfig.getThreadPoolMaxSize();
            this.keepAliveTime = processorConfig.getThreadPoolKeepAliveTime();
            this.queueMaxSize = processorConfig.getThreadPoolQueueSize();
        }
        initThreadPool();
    }

    public void process(PartitionTaskInfo<String, T> partitionTaskInfo) {
        for (String str : partitionTaskInfo.getData()) {
            T parse = this.dataMapper.parse(str);
            if (null == parse) {
                this.logger.debug("解析后数据为空，或在二次识别配置项中该数据不满足配置要求而丢弃。接收到的源数据为:{}", str);
                partitionTaskInfo.completedDataCountIncrease();
            } else {
                execute((PartitionTaskInfo<String, PartitionTaskInfo<String, T>>) partitionTaskInfo, (PartitionTaskInfo<String, T>) parse);
            }
        }
    }

    public void processRetryData(PartitionTaskInfo<String, T> partitionTaskInfo) {
        if (partitionTaskInfo.getRetryData().isEmpty()) {
            return;
        }
        while (true) {
            RetryData<T> poll = partitionTaskInfo.getRetryData().poll();
            if (poll == null) {
                return;
            }
            if (poll.limited()) {
                partitionTaskInfo.completedDataCountIncrease();
            } else {
                poll.retryTimeIncrease();
                execute((PartitionTaskInfo) partitionTaskInfo, (RetryData) poll);
            }
        }
    }

    @Override // java.util.concurrent.RejectedExecutionHandler
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
        try {
            threadPoolExecutor.getQueue().put(runnable);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void close() {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            this.logger.info("业务处理线程池已关闭！");
        }
    }

    private void execute(PartitionTaskInfo<String, T> partitionTaskInfo, T t) {
        this.executorService.submit(() -> {
            try {
                ExecuteResult execute = this.command.execute(t);
                if (execute.isSuccess()) {
                    partitionTaskInfo.completedDataCountIncrease();
                } else if (execute.isRetry()) {
                    partitionTaskInfo.addRetryData(new RetryData(t, execute.getMaxRetryTimes()));
                } else if (!execute.isSuccess()) {
                    partitionTaskInfo.completedDataCountIncrease();
                }
            } catch (Exception e) {
                this.logger.error("Command execute error.", (Throwable) e);
                partitionTaskInfo.completedDataCountIncrease();
            }
        });
    }

    private void execute(PartitionTaskInfo<String, T> partitionTaskInfo, RetryData<T> retryData) {
        this.executorService.submit(() -> {
            try {
                ExecuteResult execute = this.command.execute(retryData.getData());
                if (execute.isSuccess()) {
                    partitionTaskInfo.completedDataCountIncrease();
                } else if (execute.isRetry()) {
                    partitionTaskInfo.addRetryData(retryData);
                } else if (!execute.isSuccess()) {
                    partitionTaskInfo.completedDataCountIncrease();
                }
            } catch (Exception e) {
                this.logger.error("Command execute error.", (Throwable) e);
                partitionTaskInfo.completedDataCountIncrease();
            }
        });
    }
}
