package org.redisson.spring.data.connection;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.hibernate.validator.internal.metadata.core.ConstraintHelper;
import org.reactivestreams.Publisher;
import org.redisson.api.StreamMessageId;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.CodecDecoder;
import org.redisson.client.protocol.decoder.ListMultiDecoder2;
import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.client.protocol.decoder.StreamInfoDecoder;
import org.redisson.reactive.CommandReactiveExecutor;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.PendingMessage;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;

/* loaded from: input_file:BOOT-INF/lib/redisson-spring-data-23-3.13.6.jar:org/redisson/spring/data/connection/RedissonReactiveStreamCommands.class */
public class RedissonReactiveStreamCommands extends RedissonBaseReactive implements ReactiveStreamCommands {
    private static final RedisStrictCommand<String> XGROUP_STRING = new RedisStrictCommand<>("XGROUP");

    /* JADX INFO: Access modifiers changed from: package-private */
    public RedissonReactiveStreamCommands(CommandReactiveExecutor commandReactiveExecutor) {
        super(commandReactiveExecutor);
    }

    private static List<String> toStringList(List<RecordId> list) {
        return (List) list.stream().map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList());
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XClaimCommand, Flux<RecordId>>> xClaimJustId(Publisher<ReactiveStreamCommands.XClaimCommand> publisher) {
        return execute(publisher, xClaimCommand -> {
            Assert.notNull(xClaimCommand.getKey(), "Key must not be null!");
            Assert.notNull(xClaimCommand.getGroupName(), "Group name must not be null!");
            Assert.notNull(xClaimCommand.getNewOwner(), "NewOwner must not be null!");
            Assert.notEmpty(xClaimCommand.getOptions().getIds(), "Ids collection must not be empty!");
            ArrayList arrayList = new ArrayList();
            byte[] byteArray = toByteArray(xClaimCommand.getKey());
            arrayList.add(byteArray);
            arrayList.add(xClaimCommand.getGroupName());
            arrayList.add(xClaimCommand.getNewOwner());
            arrayList.add(Long.valueOf(((Duration) Objects.requireNonNull(xClaimCommand.getOptions().getIdleTime())).toMillis()));
            arrayList.addAll(Arrays.asList(xClaimCommand.getOptions().getIdsAsStringArray()));
            arrayList.add("JUSTID");
            return write(byteArray, ByteArrayCodec.INSTANCE, RedisCommands.XCLAIM, arrayList.toArray()).map(map -> {
                return new ReactiveRedisConnection.CommandResponse(xClaimCommand, Flux.fromStream(map.entrySet().stream()).map(entry -> {
                    return RecordId.of(((StreamMessageId) entry.getKey()).toString());
                }));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XClaimCommand, Flux<ByteBufferRecord>>> xClaim(Publisher<ReactiveStreamCommands.XClaimCommand> publisher) {
        return execute(publisher, xClaimCommand -> {
            Assert.notNull(xClaimCommand.getKey(), "Key must not be null!");
            Assert.notNull(xClaimCommand.getGroupName(), "Group name must not be null!");
            Assert.notNull(xClaimCommand.getNewOwner(), "NewOwner must not be null!");
            Assert.notEmpty(xClaimCommand.getOptions().getIds(), "Ids collection must not be empty!");
            ArrayList arrayList = new ArrayList();
            byte[] byteArray = toByteArray(xClaimCommand.getKey());
            arrayList.add(byteArray);
            arrayList.add(xClaimCommand.getGroupName());
            arrayList.add(xClaimCommand.getNewOwner());
            arrayList.add(Long.valueOf(((Duration) Objects.requireNonNull(xClaimCommand.getOptions().getIdleTime())).toMillis()));
            arrayList.addAll(Arrays.asList(xClaimCommand.getOptions().getIdsAsStringArray()));
            return write(byteArray, ByteArrayCodec.INSTANCE, RedisCommands.XCLAIM, arrayList.toArray()).map(map -> {
                return new ReactiveRedisConnection.CommandResponse(xClaimCommand, Flux.fromStream(map.entrySet().stream()).map(entry -> {
                    return StreamRecords.newRecord().in(xClaimCommand.getKey()).withId(RecordId.of(((StreamMessageId) entry.getKey()).toString())).ofBuffer((Map) ((Map) entry.getValue()).entrySet().stream().collect(Collectors.toMap(entry -> {
                        return ByteBuffer.wrap((byte[]) entry.getKey());
                    }, entry2 -> {
                        return ByteBuffer.wrap((byte[]) entry2.getValue());
                    })));
                }));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.PendingRecordsCommand, PendingMessagesSummary>> xPendingSummary(Publisher<ReactiveStreamCommands.PendingRecordsCommand> publisher) {
        return execute(publisher, pendingRecordsCommand -> {
            Assert.notNull(pendingRecordsCommand.getKey(), "Key must not be null!");
            Assert.notNull(pendingRecordsCommand.getGroupName(), "Group name must not be null!");
            byte[] byteArray = toByteArray(pendingRecordsCommand.getKey());
            return write(byteArray, StringCodec.INSTANCE, RedisCommands.XPENDING, byteArray, pendingRecordsCommand.getGroupName()).map(pendingResult -> {
                return new ReactiveRedisConnection.CommandResponse(pendingRecordsCommand, new PendingMessagesSummary(pendingRecordsCommand.getGroupName(), pendingResult.getTotal(), Range.open(pendingResult.getLowestId().toString(), pendingResult.getHighestId().toString()), pendingResult.getConsumerNames()));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.PendingRecordsCommand, PendingMessages>> xPending(Publisher<ReactiveStreamCommands.PendingRecordsCommand> publisher) {
        return execute(publisher, pendingRecordsCommand -> {
            Assert.notNull(pendingRecordsCommand.getKey(), "Key must not be null!");
            Assert.notNull(pendingRecordsCommand.getGroupName(), "Group name must not be null!");
            byte[] byteArray = toByteArray(pendingRecordsCommand.getKey());
            ArrayList arrayList = new ArrayList();
            arrayList.add(byteArray);
            arrayList.add(pendingRecordsCommand.getRange().getLowerBound().getValue().orElse("-"));
            arrayList.add(pendingRecordsCommand.getRange().getUpperBound().getValue().orElse("+"));
            if (pendingRecordsCommand.getCount() != null) {
                arrayList.add(pendingRecordsCommand.getCount());
            }
            if (pendingRecordsCommand.getConsumerName() != null) {
                arrayList.add(pendingRecordsCommand.getConsumerName());
            }
            return write(byteArray, StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, arrayList.toArray()).map(list -> {
                return new ReactiveRedisConnection.CommandResponse(pendingRecordsCommand, new PendingMessages(pendingRecordsCommand.getGroupName(), pendingRecordsCommand.getRange(), (List) list.stream().map(pendingEntry -> {
                    return new PendingMessage(RecordId.of(pendingEntry.getId().toString()), Consumer.from(pendingRecordsCommand.getGroupName(), pendingEntry.getConsumerName()), Duration.of(pendingEntry.getIdleTime(), ChronoUnit.MILLIS), pendingEntry.getLastTimeDelivered());
                }).collect(Collectors.toList())));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, StreamInfo.XInfoStream>> xInfo(Publisher<ReactiveStreamCommands.XInfoCommand> publisher) {
        return execute(publisher, xInfoCommand -> {
            Assert.notNull(xInfoCommand.getKey(), "Key must not be null!");
            byte[] byteArray = toByteArray(xInfoCommand.getKey());
            return write(byteArray, StringCodec.INSTANCE, new RedisCommand<>("XINFO", "STREAM", new ListMultiDecoder2(new StreamInfoDecoder(), new CodecDecoder(), new ObjectMapDecoder(ByteArrayCodec.INSTANCE, false))), byteArray).map(streamInfo -> {
                HashMap hashMap = new HashMap();
                hashMap.put("length", Long.valueOf(streamInfo.getLength()));
                hashMap.put("first-entry", streamInfo.getFirstEntry().getData());
                hashMap.put("last-entry", streamInfo.getLastEntry().getData());
                hashMap.put("radix-tree-keys", Integer.valueOf(streamInfo.getRadixTreeKeys()));
                hashMap.put("radix-tree-nodes", Integer.valueOf(streamInfo.getRadixTreeNodes()));
                hashMap.put(ConstraintHelper.GROUPS, Long.valueOf(streamInfo.getGroups()));
                hashMap.put("last-generated-id", streamInfo.getLastGeneratedId().toString());
                return new ReactiveRedisConnection.CommandResponse(xInfoCommand, StreamInfo.XInfoStream.fromList((List) hashMap.entrySet().stream().flatMap(entry -> {
                    return Stream.of(entry.getKey(), entry.getValue());
                }).collect(Collectors.toList())));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, Flux<StreamInfo.XInfoGroup>>> xInfoGroups(Publisher<ReactiveStreamCommands.XInfoCommand> publisher) {
        return execute(publisher, xInfoCommand -> {
            Assert.notNull(xInfoCommand.getKey(), "Key must not be null!");
            byte[] byteArray = toByteArray(xInfoCommand.getKey());
            return write(byteArray, StringCodec.INSTANCE, RedisCommands.XINFO_GROUPS, byteArray).map(list -> {
                return new ReactiveRedisConnection.CommandResponse(xInfoCommand, Flux.fromStream(list.stream()).map(streamGroup -> {
                    HashMap hashMap = new HashMap();
                    hashMap.put("name", streamGroup.getName());
                    hashMap.put("consumers", Long.valueOf(streamGroup.getConsumers()));
                    hashMap.put("pending", Long.valueOf(streamGroup.getPending()));
                    hashMap.put("last-delivered-id", streamGroup.getLastDeliveredId().toString());
                    return StreamInfo.XInfoGroup.fromList((List) hashMap.entrySet().stream().flatMap(entry -> {
                        return Stream.of(entry.getKey(), entry.getValue());
                    }).collect(Collectors.toList()));
                }));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, Flux<StreamInfo.XInfoConsumer>>> xInfoConsumers(Publisher<ReactiveStreamCommands.XInfoCommand> publisher) {
        return execute(publisher, xInfoCommand -> {
            Assert.notNull(xInfoCommand.getKey(), "Key must not be null!");
            Assert.notNull(xInfoCommand.getGroupName(), "Group name must not be null!");
            byte[] byteArray = toByteArray(xInfoCommand.getKey());
            return write(byteArray, StringCodec.INSTANCE, RedisCommands.XINFO_CONSUMERS, byteArray, xInfoCommand.getGroupName()).map(list -> {
                return new ReactiveRedisConnection.CommandResponse(xInfoCommand, Flux.fromStream(list.stream()).map(streamConsumer -> {
                    HashMap hashMap = new HashMap();
                    hashMap.put("name", streamConsumer.getName());
                    hashMap.put("idle", Long.valueOf(streamConsumer.getIdleTime()));
                    hashMap.put("pending", Long.valueOf(streamConsumer.getPending()));
                    return new StreamInfo.XInfoConsumer(xInfoCommand.getGroupName(), (List) hashMap.entrySet().stream().flatMap(entry -> {
                        return Stream.of(entry.getKey(), entry.getValue());
                    }).collect(Collectors.toList()));
                }));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveStreamCommands.AcknowledgeCommand, Long>> xAck(Publisher<ReactiveStreamCommands.AcknowledgeCommand> publisher) {
        return execute(publisher, acknowledgeCommand -> {
            Assert.notNull(acknowledgeCommand.getKey(), "Key must not be null!");
            Assert.notNull(acknowledgeCommand.getGroup(), "Group must not be null!");
            Assert.notNull(acknowledgeCommand.getRecordIds(), "recordIds must not be null!");
            ArrayList arrayList = new ArrayList();
            byte[] byteArray = toByteArray(acknowledgeCommand.getKey());
            arrayList.add(byteArray);
            arrayList.add(acknowledgeCommand.getGroup());
            arrayList.addAll(toStringList(acknowledgeCommand.getRecordIds()));
            return write(byteArray, StringCodec.INSTANCE, RedisCommands.XACK, arrayList.toArray()).map(l -> {
                return new ReactiveRedisConnection.NumericResponse(acknowledgeCommand, l);
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.AddStreamRecord, RecordId>> xAdd(Publisher<ReactiveStreamCommands.AddStreamRecord> publisher) {
        return execute(publisher, addStreamRecord -> {
            Assert.notNull(addStreamRecord.getKey(), "Key must not be null!");
            Assert.notNull(addStreamRecord.getBody(), "Body must not be null!");
            byte[] byteArray = toByteArray(addStreamRecord.getKey());
            LinkedList linkedList = new LinkedList();
            linkedList.add(byteArray);
            if (addStreamRecord.getMaxlen() != null) {
                linkedList.add("MAXLEN");
                linkedList.add(addStreamRecord.getMaxlen());
            }
            if (addStreamRecord.getRecord().getId().shouldBeAutoGenerated()) {
                linkedList.add("*");
            } else {
                linkedList.add(addStreamRecord.getRecord().getId().getValue());
            }
            for (Map.Entry<ByteBuffer, ByteBuffer> entry : addStreamRecord.getBody().entrySet()) {
                linkedList.add(toByteArray(entry.getKey()));
                linkedList.add(toByteArray(entry.getValue()));
            }
            return write(byteArray, StringCodec.INSTANCE, RedisCommands.XADD, linkedList.toArray()).map(streamMessageId -> {
                return new ReactiveRedisConnection.CommandResponse(addStreamRecord, RecordId.of(streamMessageId.toString()));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.DeleteCommand, Long>> xDel(Publisher<ReactiveStreamCommands.DeleteCommand> publisher) {
        return execute(publisher, deleteCommand -> {
            Assert.notNull(deleteCommand.getKey(), "Key must not be null!");
            Assert.notNull(deleteCommand.getRecordIds(), "recordIds must not be null!");
            byte[] byteArray = toByteArray(deleteCommand.getKey());
            ArrayList arrayList = new ArrayList();
            arrayList.add(byteArray);
            arrayList.addAll(toStringList(deleteCommand.getRecordIds()));
            return write(byteArray, StringCodec.INSTANCE, RedisCommands.XDEL, arrayList.toArray()).map(l -> {
                return new ReactiveRedisConnection.CommandResponse(deleteCommand, l);
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>> xLen(Publisher<ReactiveRedisConnection.KeyCommand> publisher) {
        return execute(publisher, keyCommand -> {
            Assert.notNull(keyCommand.getKey(), "Key must not be null!");
            byte[] byteArray = toByteArray(keyCommand.getKey());
            return write(byteArray, StringCodec.INSTANCE, RedisCommands.XLEN, byteArray).map(l -> {
                return new ReactiveRedisConnection.NumericResponse(keyCommand, l);
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> xRange(Publisher<ReactiveStreamCommands.RangeCommand> publisher) {
        return range(RedisCommands.XRANGE, publisher);
    }

    private Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> range(RedisCommand<?> redisCommand, Publisher<ReactiveStreamCommands.RangeCommand> publisher) {
        return execute(publisher, rangeCommand -> {
            Assert.notNull(rangeCommand.getKey(), "Key must not be null!");
            Assert.notNull(rangeCommand.getRange(), "Range must not be null!");
            Assert.notNull(rangeCommand.getLimit(), "Limit must not be null!");
            byte[] byteArray = toByteArray(rangeCommand.getKey());
            LinkedList linkedList = new LinkedList();
            linkedList.add(byteArray);
            if (redisCommand == RedisCommands.XRANGE) {
                linkedList.add(rangeCommand.getRange().getLowerBound().getValue().orElse("-"));
                linkedList.add(rangeCommand.getRange().getUpperBound().getValue().orElse("+"));
            } else {
                linkedList.add(rangeCommand.getRange().getUpperBound().getValue().orElse("+"));
                linkedList.add(rangeCommand.getRange().getLowerBound().getValue().orElse("-"));
            }
            if (rangeCommand.getLimit().getCount() > 0) {
                linkedList.add("COUNT");
                linkedList.add(Integer.valueOf(rangeCommand.getLimit().getCount()));
            }
            return write(byteArray, ByteArrayCodec.INSTANCE, redisCommand, linkedList.toArray()).map(map -> {
                return new ReactiveRedisConnection.CommandResponse(rangeCommand, Flux.fromStream(map.entrySet().stream()).map(entry -> {
                    return StreamRecords.newRecord().in(rangeCommand.getKey()).withId(RecordId.of(((StreamMessageId) entry.getKey()).toString())).ofBuffer((Map) ((Map) entry.getValue()).entrySet().stream().collect(Collectors.toMap(entry -> {
                        return ByteBuffer.wrap((byte[]) entry.getKey());
                    }, entry2 -> {
                        return ByteBuffer.wrap((byte[]) entry2.getValue());
                    })));
                }));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.ReadCommand, Flux<ByteBufferRecord>>> read(Publisher<ReactiveStreamCommands.ReadCommand> publisher) {
        return execute(publisher, readCommand -> {
            Assert.notNull(readCommand.getStreamOffsets(), "StreamOffsets must not be null!");
            Assert.notNull(readCommand.getReadOptions(), "ReadOptions must not be null!");
            ArrayList arrayList = new ArrayList();
            if (readCommand.getConsumer() != null) {
                arrayList.add("GROUP");
                arrayList.add(readCommand.getConsumer().getGroup());
                arrayList.add(readCommand.getConsumer().getName());
            }
            if (readCommand.getReadOptions().getCount() != null && readCommand.getReadOptions().getCount().longValue() > 0) {
                arrayList.add("COUNT");
                arrayList.add(readCommand.getReadOptions().getCount());
            }
            if (readCommand.getReadOptions().getBlock() != null && readCommand.getReadOptions().getBlock().longValue() > 0) {
                arrayList.add("BLOCK");
                arrayList.add(readCommand.getReadOptions().getBlock());
            }
            arrayList.add("STREAMS");
            Iterator<StreamOffset<ByteBuffer>> it = readCommand.getStreamOffsets().iterator();
            while (it.hasNext()) {
                arrayList.add(toByteArray(it.next().getKey()));
            }
            Iterator<StreamOffset<ByteBuffer>> it2 = readCommand.getStreamOffsets().iterator();
            while (it2.hasNext()) {
                arrayList.add(it2.next().getOffset().getOffset());
            }
            return (readCommand.getConsumer() == null ? (readCommand.getReadOptions().getBlock() == null || readCommand.getReadOptions().getBlock().longValue() <= 0) ? read(toByteArray(readCommand.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD, arrayList.toArray()) : read(toByteArray(readCommand.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREAD_BLOCKING, arrayList.toArray()) : (readCommand.getReadOptions().getBlock() == null || readCommand.getReadOptions().getBlock().longValue() <= 0) ? read(toByteArray(readCommand.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP, arrayList.toArray()) : read(toByteArray(readCommand.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, RedisCommands.XREADGROUP_BLOCKING, arrayList.toArray())).map(map -> {
                return new ReactiveRedisConnection.CommandResponse(readCommand, Flux.fromStream(map.entrySet().stream()).map(entry -> {
                    return ((Map) entry.getValue()).entrySet().stream().map(entry -> {
                        return StreamRecords.newRecord().in(entry.getKey()).withId(RecordId.of(((StreamMessageId) entry.getKey()).toString())).ofBuffer((Map) ((Map) entry.getValue()).entrySet().stream().collect(Collectors.toMap(entry -> {
                            return ByteBuffer.wrap((byte[]) entry.getKey());
                        }, entry2 -> {
                            return ByteBuffer.wrap((byte[]) entry2.getValue());
                        })));
                    });
                }).flatMap(Flux::fromStream));
            });
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, String>> xGroup(Publisher<ReactiveStreamCommands.GroupCommand> publisher) {
        return execute(publisher, groupCommand -> {
            Assert.notNull(groupCommand.getKey(), "Key must not be null!");
            Assert.notNull(groupCommand.getGroupName(), "GroupName must not be null!");
            byte[] byteArray = toByteArray(groupCommand.getKey());
            if (groupCommand.getAction().equals(ReactiveStreamCommands.GroupCommand.GroupCommandAction.CREATE)) {
                Assert.notNull(groupCommand.getReadOffset(), "ReadOffset must not be null!");
                return write(byteArray, StringCodec.INSTANCE, XGROUP_STRING, "CREATE", byteArray, groupCommand.getGroupName(), groupCommand.getReadOffset().getOffset(), "MKSTREAM").map(str -> {
                    return new ReactiveRedisConnection.CommandResponse(groupCommand, str);
                });
            }
            if (groupCommand.getAction().equals(ReactiveStreamCommands.GroupCommand.GroupCommandAction.DELETE_CONSUMER)) {
                Assert.notNull(groupCommand.getConsumerName(), "ConsumerName must not be null!");
                return write(byteArray, StringCodec.INSTANCE, RedisCommands.XGROUP_LONG, "DELCONSUMER", byteArray, groupCommand.getGroupName(), groupCommand.getConsumerName()).map(l -> {
                    return new ReactiveRedisConnection.CommandResponse(groupCommand, l.longValue() > 0 ? "OK" : "Error");
                });
            }
            if (groupCommand.getAction().equals(ReactiveStreamCommands.GroupCommand.GroupCommandAction.DESTROY)) {
                return write(byteArray, StringCodec.INSTANCE, RedisCommands.XGROUP_LONG, "DESTROY", byteArray, groupCommand.getGroupName()).map(l2 -> {
                    return new ReactiveRedisConnection.CommandResponse(groupCommand, l2.longValue() > 0 ? "OK" : "Error");
                });
            }
            throw new IllegalArgumentException("unknown command " + groupCommand.getAction());
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> xRevRange(Publisher<ReactiveStreamCommands.RangeCommand> publisher) {
        return range(RedisCommands.XREVRANGE, publisher);
    }

    @Override // org.springframework.data.redis.connection.ReactiveStreamCommands
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>> xTrim(Publisher<ReactiveStreamCommands.TrimCommand> publisher) {
        return execute(publisher, trimCommand -> {
            Assert.notNull(trimCommand.getKey(), "Key must not be null!");
            Assert.notNull(trimCommand.getCount(), "Count must not be null!");
            byte[] byteArray = toByteArray(trimCommand.getKey());
            return write(byteArray, StringCodec.INSTANCE, RedisCommands.XTRIM, byteArray, "MAXLEN", trimCommand.getCount()).map(l -> {
                return new ReactiveRedisConnection.NumericResponse(trimCommand, l);
            });
        });
    }
}
