/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.AbstractSegments;
import org.apache.kafka.streams.state.internals.KeyValueIterators;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.Segment;
import org.apache.kafka.streams.state.internals.SegmentIterator;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AbstractRocksDBSegmentedBytesStore<S extends Segment>
implements SegmentedBytesStore {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);
    private final String name;
    private final AbstractSegments<S> segments;
    private final String metricScope;
    private final long retentionPeriod;
    private final SegmentedBytesStore.KeySchema keySchema;
    private ProcessorContext context;
    private StateStoreContext stateStoreContext;
    private Sensor expiredRecordSensor;
    private long observedStreamTime = -1L;
    private boolean consistencyEnabled = false;
    private Position position;
    protected OffsetCheckpoint positionCheckpoint;
    private volatile boolean open;

    AbstractRocksDBSegmentedBytesStore(String name, String metricScope, long retentionPeriod, SegmentedBytesStore.KeySchema keySchema, AbstractSegments<S> segments) {
        this.name = name;
        this.metricScope = metricScope;
        this.retentionPeriod = retentionPeriod;
        this.keySchema = keySchema;
        this.segments = segments;
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> fetch(Bytes key, long from, long to) {
        return this.fetch(key, from, to, true);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> backwardFetch(Bytes key, long from, long to) {
        return this.fetch(key, from, to, false);
    }

    KeyValueIterator<Bytes, byte[]> fetch(Bytes key, long from, long to, boolean forward) {
        long actualFrom = this.getActualFrom(from);
        if (this.keySchema instanceof WindowKeySchema && to < actualFrom) {
            LOG.debug("Returning no records for key {} as to ({}) < actualFrom ({}) ", new Object[]{key.toString(), to, actualFrom});
            return KeyValueIterators.emptyIterator();
        }
        List<S> searchSpace = this.keySchema.segmentsToSearch(this.segments, actualFrom, to, forward);
        Bytes binaryFrom = this.keySchema.lowerRangeFixedSize(key, actualFrom);
        Bytes binaryTo = this.keySchema.upperRangeFixedSize(key, to);
        return new SegmentIterator<S>(searchSpace.iterator(), this.keySchema.hasNextCondition(key, key, actualFrom, to, forward), binaryFrom, binaryTo, forward);
    }

    private long getActualFrom(long from) {
        return Math.max(from, this.observedStreamTime - this.retentionPeriod + 1L);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> fetch(Bytes keyFrom, Bytes keyTo, long from, long to) {
        return this.fetch(keyFrom, keyTo, from, to, true);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> backwardFetch(Bytes keyFrom, Bytes keyTo, long from, long to) {
        return this.fetch(keyFrom, keyTo, from, to, false);
    }

    KeyValueIterator<Bytes, byte[]> fetch(Bytes keyFrom, Bytes keyTo, long from, long to, boolean forward) {
        if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        long actualFrom = this.getActualFrom(from);
        if (this.keySchema instanceof WindowKeySchema && to < actualFrom) {
            LOG.debug("Returning no records for keys {}/{} as to ({}) < actualFrom ({}) ", new Object[]{keyFrom, keyTo, to, actualFrom});
            return KeyValueIterators.emptyIterator();
        }
        List<S> searchSpace = this.keySchema.segmentsToSearch(this.segments, actualFrom, to, forward);
        Bytes binaryFrom = keyFrom == null ? null : this.keySchema.lowerRange(keyFrom, actualFrom);
        Bytes binaryTo = keyTo == null ? null : this.keySchema.upperRange(keyTo, to);
        return new SegmentIterator<S>(searchSpace.iterator(), this.keySchema.hasNextCondition(keyFrom, keyTo, actualFrom, to, forward), binaryFrom, binaryTo, forward);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> all() {
        long actualFrom = this.getActualFrom(0L);
        List<S> searchSpace = this.keySchema.segmentsToSearch(this.segments, actualFrom, Long.MAX_VALUE, true);
        return new SegmentIterator<S>(searchSpace.iterator(), this.keySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true), null, null, true);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> backwardAll() {
        long actualFrom = this.getActualFrom(0L);
        List<S> searchSpace = this.keySchema.segmentsToSearch(this.segments, actualFrom, Long.MAX_VALUE, false);
        return new SegmentIterator<S>(searchSpace.iterator(), this.keySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false), null, null, false);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> fetchAll(long timeFrom, long timeTo) {
        long actualFrom = this.getActualFrom(timeFrom);
        if (this.keySchema instanceof WindowKeySchema && timeTo < actualFrom) {
            LOG.debug("Returning no records for as timeTo ({}) < actualFrom ({}) ", (Object)timeTo, (Object)actualFrom);
            return KeyValueIterators.emptyIterator();
        }
        List<S> searchSpace = this.segments.segments(actualFrom, timeTo, true);
        return new SegmentIterator<S>(searchSpace.iterator(), this.keySchema.hasNextCondition(null, null, actualFrom, timeTo, true), null, null, true);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> backwardFetchAll(long timeFrom, long timeTo) {
        long actualFrom = this.getActualFrom(timeFrom);
        if (this.keySchema instanceof WindowKeySchema && timeTo < actualFrom) {
            LOG.debug("Returning no records for as timeTo ({}) < actualFrom ({}) ", (Object)timeTo, (Object)actualFrom);
            return KeyValueIterators.emptyIterator();
        }
        List<S> searchSpace = this.segments.segments(actualFrom, timeTo, false);
        return new SegmentIterator<S>(searchSpace.iterator(), this.keySchema.hasNextCondition(null, null, actualFrom, timeTo, false), null, null, false);
    }

    @Override
    public void remove(Bytes key) {
        long timestamp = this.keySchema.segmentTimestamp(key);
        this.observedStreamTime = Math.max(this.observedStreamTime, timestamp);
        S segment = this.segments.getSegmentForTimestamp(timestamp);
        if (segment == null) {
            return;
        }
        segment.delete((Bytes)key);
    }

    @Override
    public void remove(Bytes key, long timestamp) {
        Bytes keyBytes = this.keySchema.toStoreBinaryKeyPrefix(key, timestamp);
        S segment = this.segments.getSegmentForTimestamp(timestamp);
        if (segment != null) {
            segment.deleteRange(keyBytes, keyBytes);
        }
    }

    @Override
    public void put(Bytes key, byte[] value) {
        long timestamp = this.keySchema.segmentTimestamp(key);
        this.observedStreamTime = Math.max(this.observedStreamTime, timestamp);
        long segmentId = this.segments.segmentId(timestamp);
        S segment = this.segments.getOrCreateSegmentIfLive(segmentId, this.context, this.observedStreamTime);
        if (segment == null) {
            this.expiredRecordSensor.record(1.0, ProcessorContextUtils.currentSystemTime(this.context));
            LOG.warn("Skipping record for expired segment.");
        } else {
            StoreQueryUtils.updatePosition(this.position, this.stateStoreContext);
            segment.put((Bytes)key, (byte[])value);
        }
    }

    @Override
    public byte[] get(Bytes key) {
        long timestampFromKey = this.keySchema.segmentTimestamp(key);
        if (timestampFromKey < this.observedStreamTime - this.retentionPeriod + 1L) {
            LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})", new Object[]{key.toString(), timestampFromKey, this.observedStreamTime - this.retentionPeriod + 1L});
            return null;
        }
        S segment = this.segments.getSegmentForTimestamp(timestampFromKey);
        if (segment == null) {
            return null;
        }
        return (byte[])segment.get((Bytes)key);
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    @Deprecated
    public void init(ProcessorContext context, StateStore root) {
        this.context = context;
        StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context);
        String threadId = Thread.currentThread().getName();
        String taskName = context.taskId().toString();
        this.expiredRecordSensor = TaskMetrics.droppedRecordsSensor(threadId, taskName, metrics);
        this.segments.openExisting(this.context, this.observedStreamTime);
        File positionCheckpointFile = new File(context.stateDir(), this.name() + ".position");
        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
        this.position = StoreQueryUtils.readPositionFromCheckpoint(this.positionCheckpoint);
        this.stateStoreContext.register(root, this::restoreAllInternal, () -> StoreQueryUtils.checkpointPosition(this.positionCheckpoint, this.position));
        this.open = true;
        this.consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(context.appConfigs(), "__iq.consistency.offset.vector.enabled__", false);
    }

    @Override
    public void init(StateStoreContext context, StateStore root) {
        this.stateStoreContext = context;
        this.init(StoreToProcessorContextAdapter.adapt(context), root);
    }

    @Override
    public void flush() {
        this.segments.flush();
    }

    @Override
    public void close() {
        this.open = false;
        this.segments.close();
    }

    @Override
    public boolean persistent() {
        return true;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    List<S> getSegments() {
        return this.segments.allSegments(false);
    }

    void restoreAllInternal(Collection<ConsumerRecord<byte[], byte[]>> records) {
        try {
            Map<S, WriteBatch> writeBatchMap = this.getWriteBatches(records);
            for (Map.Entry<S, WriteBatch> entry : writeBatchMap.entrySet()) {
                Segment segment = (Segment)entry.getKey();
                WriteBatch batch = entry.getValue();
                segment.write(batch);
                batch.close();
            }
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
        }
    }

    Map<S, WriteBatch> getWriteBatches(Collection<ConsumerRecord<byte[], byte[]>> records) {
        for (ConsumerRecord<byte[], byte[]> record : records) {
            long timestamp = this.keySchema.segmentTimestamp(Bytes.wrap((byte[])((byte[])record.key())));
            this.observedStreamTime = Math.max(this.observedStreamTime, timestamp);
        }
        HashMap<Segment, WriteBatch> writeBatchMap = new HashMap<Segment, WriteBatch>();
        for (ConsumerRecord<byte[], byte[]> record : records) {
            long timestamp = this.keySchema.segmentTimestamp(Bytes.wrap((byte[])((byte[])record.key())));
            long segmentId = this.segments.segmentId(timestamp);
            S segment = this.segments.getOrCreateSegmentIfLive(segmentId, this.context, this.observedStreamTime);
            if (segment == null) continue;
            ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(record, this.consistencyEnabled, this.position);
            try {
                WriteBatch batch = writeBatchMap.computeIfAbsent((Segment)segment, s -> new WriteBatch());
                segment.addToBatch(new KeyValue<Object, Object>(record.key(), record.value()), batch);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
            }
        }
        return writeBatchMap;
    }

    @Override
    public Position getPosition() {
        return this.position;
    }
}

