package com.hikvision.artemis.sdk.kafka.consumer;

import com.hikvision.artemis.sdk.kafka.config.KafkaConfig;
import com.hikvision.artemis.sdk.kafka.data.task.processor.MessageProcessor;
import com.hikvision.artemis.sdk.kafka.data.task.processor.PartitionTaskInfo;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hikvision/artemis/sdk/kafka/consumer/ArtemisKafkaConsumer.class */
public class ArtemisKafkaConsumer {
    private Properties props;
    private KafkaConsumer<String, String> consumer;
    private MessageProcessor messageProcessor;
    private final Logger logger = LoggerFactory.getLogger((Class<?>) ArtemisKafkaConsumer.class);
    private int taskMaxCount = 5;
    private int dataMaxCount = 500;
    private Map<Integer, PartitionTaskInfo> taskMap = new HashMap(8);
    private Queue<PartitionTaskInfo> taskQueue = new LinkedList();
    private volatile boolean isRunning = true;
    private volatile boolean consumersRunning = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hikvision/artemis/sdk/kafka/consumer/ArtemisKafkaConsumer$TaskStatus.class */
    public class TaskStatus {
        int dataCount;
        int taskCount;

        TaskStatus() {
        }
    }

    public ArtemisKafkaConsumer(String str, List<String> list, String str2, MessageProcessor messageProcessor, KafkaConfig kafkaConfig) {
        this.props = null;
        this.messageProcessor = messageProcessor;
        this.props = new Properties();
        this.props.put("bootstrap.servers", str);
        this.props.put(ConsumerConfig.GROUP_ID_CONFIG, str2);
        this.props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        this.props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        this.props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.valueOf(kafkaConfig.getSessionTimeout()));
        this.props.put("request.timeout.ms", Integer.valueOf(kafkaConfig.getRequestTimeout()));
        this.props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, Integer.valueOf(kafkaConfig.getHeartbeatInterval()));
        this.props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.valueOf(kafkaConfig.getMaxPollRecords()));
        this.props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        this.props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<>(this.props);
        this.consumer.subscribe(list);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            this.isRunning = false;
            messageProcessor.close();
            do {
            } while (this.consumersRunning);
            this.consumer.close();
            this.logger.info("Kafka 消费者已关闭！");
        }));
    }

    public void receive() {
        this.logger.info("开始订阅消息！");
        while (!Thread.interrupted() && this.isRunning) {
            TaskStatus taskDispatch = taskDispatch();
            if (taskDispatch.dataCount >= this.dataMaxCount || taskDispatch.taskCount >= this.taskMaxCount) {
                try {
                    Thread.sleep(50L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            } else {
                this.consumersRunning = true;
                ConsumerRecords<String, String> poll = this.consumer.poll(100L);
                this.consumersRunning = false;
                if (poll.count() > 0) {
                    for (TopicPartition topicPartition : poll.partitions()) {
                        List<ConsumerRecord<String, String>> records = poll.records(topicPartition);
                        this.taskQueue.add(new PartitionTaskInfo(topicPartition, records.get(records.size() - 1).offset(), (List) records.stream().map(consumerRecord -> {
                            return (String) consumerRecord.value();
                        }).collect(Collectors.toList())));
                    }
                }
            }
        }
    }

    private TaskStatus taskDispatch() {
        int i = 0;
        int i2 = 0;
        Iterator<PartitionTaskInfo> it = this.taskQueue.iterator();
        while (it.hasNext()) {
            i2++;
            i += it.next().getDataCount();
        }
        Iterator<Map.Entry<Integer, PartitionTaskInfo>> it2 = this.taskMap.entrySet().iterator();
        while (it2.hasNext()) {
            PartitionTaskInfo value = it2.next().getValue();
            if (value != null) {
                i += value.getDataCount() - value.getcompletedDataCount();
                if (value.getDataCount() == value.getcompletedDataCount()) {
                    this.logger.debug("任务已完成:" + value);
                    this.consumersRunning = true;
                    this.consumer.commitSync(Collections.singletonMap(value.getTopicPartition(), new OffsetAndMetadata(value.getMaxOffset() + 1)));
                    this.consumersRunning = false;
                    it2.remove();
                } else {
                    if (!value.getRetryData().isEmpty()) {
                        this.messageProcessor.processRetryData(value);
                    }
                    i2++;
                }
            }
        }
        if (!this.taskQueue.isEmpty()) {
            int size = this.taskQueue.size();
            for (int i3 = 0; i3 < size; i3++) {
                PartitionTaskInfo poll = this.taskQueue.poll();
                if (this.taskMap.containsKey(Integer.valueOf(poll.getTopicPartition().partition()))) {
                    this.taskQueue.add(poll);
                } else {
                    this.logger.debug("执行任务:" + poll);
                    this.taskMap.put(Integer.valueOf(poll.getTopicPartition().partition()), poll);
                    this.messageProcessor.process(poll);
                }
            }
        }
        TaskStatus taskStatus = new TaskStatus();
        taskStatus.dataCount = i;
        taskStatus.taskCount = i2;
        return taskStatus;
    }
}
