package com.viontech.batch.item.kafka;

import com.viontech.batch.listener.KafkaAckChunkListener;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.kafka.core.ConsumerFactory;

/* loaded from: input_file:com/viontech/batch/item/kafka/KafkaReader.class */
public class KafkaReader<K, V> implements ItemStreamReader<ConsumerRecord<K, V>> {
    private volatile Iterator<ConsumerRecord<K, V>> iterator;
    private Consumer consumer;
    public static final long DEFAULT_AWAIT_IN_MILLIS = 3000;
    private long lastCommitTime = System.currentTimeMillis();
    private int tryCount = 0;
    private long awaitInMillis = DEFAULT_AWAIT_IN_MILLIS;
    private final Logger logger = LoggerFactory.getLogger(KafkaReader.class);
    private ThreadLocal<ExecutionContext> executionContextThreadLocal = new ThreadLocal<>();

    public KafkaReader(ConsumerFactory consumerFactory, String str, String str2) {
        HashMap hashMap = new HashMap(consumerFactory.getConfigurationProperties());
        if (str != null) {
            hashMap.put("group.id", str);
        }
        hashMap.put("auto.offset.reset", "earliest");
        hashMap.put("enable.auto.commit", false);
        this.consumer = new KafkaConsumer(hashMap, new StringDeserializer(), new StringDeserializer());
        this.consumer.subscribe(Arrays.asList(str2));
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public ConsumerRecord<K, V> m0read() {
        while (System.currentTimeMillis() - this.lastCommitTime <= TimeUnit.SECONDS.toMillis(10L)) {
            if (this.tryCount > 5) {
                return null;
            }
            if (this.tryCount > 2) {
                if (this.awaitInMillis < 9000) {
                    this.awaitInMillis += DEFAULT_AWAIT_IN_MILLIS;
                }
                this.logger.debug("尝试 {} 次都无法在kafka中读取到数据,程序等待 {} 秒，让出CPU", Integer.valueOf(this.tryCount), Long.valueOf(this.awaitInMillis));
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(this.awaitInMillis));
            }
            if (this.iterator == null) {
                ConsumerRecords poll = this.consumer.poll(Duration.ofSeconds(1L));
                this.logger.debug("从kafka获取数据成功，获取到 {} 条数据", Integer.valueOf(poll.count()));
                if (!poll.isEmpty()) {
                    this.tryCount = 0;
                    this.awaitInMillis = DEFAULT_AWAIT_IN_MILLIS;
                }
                if (poll.isEmpty()) {
                    this.tryCount++;
                } else {
                    this.iterator = poll.iterator();
                }
            } else {
                if (this.iterator.hasNext()) {
                    ConsumerRecord<K, V> next = this.iterator.next();
                    this.logger.debug("返回kafka数据 {} - {}:{}", new Object[]{Long.valueOf(next.offset()), next.key(), next.value()});
                    return next;
                }
                this.logger.debug("从kafka读取的数据已经全部返回了");
                this.iterator = null;
            }
        }
        this.logger.debug("已经 10 秒没有提交数据了，赶紧提交一波，不要丢失了");
        this.lastCommitTime = System.currentTimeMillis();
        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        this.executionContextThreadLocal.set(executionContext);
        executionContext.put(KafkaAckChunkListener.ATTRIBUTE_CONSUMER, this.consumer);
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
    }

    public void close() throws ItemStreamException {
        ExecutionContext executionContext = this.executionContextThreadLocal.get();
        try {
            try {
                ((Consumer) executionContext.get(KafkaAckChunkListener.ATTRIBUTE_CONSUMER)).commitSync();
                executionContext.remove(KafkaAckChunkListener.ATTRIBUTE_CONSUMER);
                this.executionContextThreadLocal.remove();
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException("ack fail");
            }
        } catch (Throwable th) {
            executionContext.remove(KafkaAckChunkListener.ATTRIBUTE_CONSUMER);
            this.executionContextThreadLocal.remove();
            throw th;
        }
    }
}
