package org.dromara.hutool.extra.mq.engine.kafka;

import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.dromara.hutool.core.collection.ListUtil;
import org.dromara.hutool.core.io.IoUtil;
import org.dromara.hutool.extra.mq.Consumer;
import org.dromara.hutool.extra.mq.Message;
import org.dromara.hutool.extra.mq.MessageHandler;

/* loaded from: input_file:org/dromara/hutool/extra/mq/engine/kafka/KafkaConsumer.class */
public class KafkaConsumer implements Consumer {
    private final org.apache.kafka.clients.consumer.Consumer<String, byte[]> consumer;

    /* loaded from: input_file:org/dromara/hutool/extra/mq/engine/kafka/KafkaConsumer$ConsumerRecordMessage.class */
    private static class ConsumerRecordMessage implements Message {
        private final ConsumerRecord<String, byte[]> record;

        private ConsumerRecordMessage(ConsumerRecord<String, byte[]> consumerRecord) {
            this.record = consumerRecord;
        }

        @Override // org.dromara.hutool.extra.mq.Message
        public String topic() {
            return this.record.topic();
        }

        @Override // org.dromara.hutool.extra.mq.Message
        public byte[] content() {
            return (byte[]) this.record.value();
        }
    }

    public KafkaConsumer(Properties properties) {
        this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(properties);
    }

    public KafkaConsumer(org.apache.kafka.clients.consumer.Consumer<String, byte[]> consumer) {
        this.consumer = consumer;
    }

    public KafkaConsumer setTopics(String... strArr) {
        this.consumer.subscribe(ListUtil.of(strArr));
        return this;
    }

    public KafkaConsumer setTopicPattern(Pattern pattern) {
        this.consumer.subscribe(pattern);
        return this;
    }

    @Override // org.dromara.hutool.extra.mq.Consumer
    public void subscribe(MessageHandler messageHandler) {
        Iterator it = this.consumer.poll(Duration.ofMillis(3000L)).iterator();
        while (it.hasNext()) {
            messageHandler.handle(new ConsumerRecordMessage((ConsumerRecord) it.next()));
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IoUtil.nullSafeClose(this.consumer);
    }
}
