// 确认处理的消息
default Mono<Long> acknowledge(K key, String group, String... recordIds) {
return this.acknowledge(key, group, (RecordId[])Arrays.stream(recordIds).map(RecordId::of).toArray((x$0) -> {
return new RecordId[x$0];
}));
}
// 确认处理的消息,使用 RecordId
Mono<Long> acknowledge(K key, String group, RecordId... recordIds); // 确认处理的消息,使用 RecordId
// 确认处理的消息,使用 Record 对象
default Mono<Long> acknowledge(String group, Record<K, ?> record) {
return this.acknowledge(record.getRequiredStream(), group, record.getId());
}
// 添加多个记录
default Flux<RecordId> add(K key, Publisher<? extends Map<? extends HK, ? extends HV>> bodyPublisher) {
return Flux.from(bodyPublisher).flatMap((it) -> {
return this.add(key, it);
});
}
// 添加记录,使用 Map
default Mono<RecordId> add(K key, Map<? extends HK, ? extends HV> content) {
return this.add(StreamRecords.newRecord().in(key).ofMap(content));
}
// 添加 MapRecord
default Mono<RecordId> add(MapRecord<K, ? extends HK, ? extends HV> record) {
return this.add((Record)record);
}
// 添加记录
Mono<RecordId> add(Record<K, ?> record); // 添加记录
// 声明消费组
default Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, Duration minIdleTime, RecordId... recordIds) {
return this.claim(key, consumerGroup, newOwner, XClaimOptions.minIdle(minIdleTime).ids(recordIds));
}
// 声明消费组,使用 XClaimOptions
Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, RedisStreamCommands.XClaimOptions xClaimOptions);
// 删除记录
default Mono<Long> delete(K key, String... recordIds) {
return this.delete(key, (RecordId[])Arrays.stream(recordIds).map(RecordId::of).toArray((x$0) -> {
return new RecordId[x$0];
}));
}
// 删除记录,使用 Record 对象
default Mono<Long> delete(Record<K, ?> record) {
return this.delete(record.getStream(), record.getId());
}
// 删除记录,使用 RecordId
Mono<Long> delete(K key, RecordId... recordIds); // 删除记录
// 创建消费组
default Mono<String> createGroup(K key, String group) {
return this.createGroup(key, ReadOffset.latest(), group);
}
// 创建消费组,使用 ReadOffset
Mono<String> createGroup(K key, ReadOffset readOffset, String group); // 创建消费组
// 删除消费者
Mono<String> deleteConsumer(K key, Consumer consumer); // 删除消费者
// 销毁消费组
Mono<String> destroyGroup(K key, String group); // 销毁消费组
// 获取消费者信息
Flux<StreamInfo.XInfoConsumer> consumers(K key, String group); // 获取消费者信息
// 获取消费组信息
Flux<StreamInfo.XInfoGroup> groups(K key); // 获取消费组信息
// 获取流信息
Mono<StreamInfo.XInfoStream> info(K key); // 获取流信息
// 获取待处理消息摘要
@Nullable
Mono<PendingMessagesSummary> pending(K key, String group); // 获取待处理消息摘要
// 获取待处理消息
default Mono<PendingMessages> pending(K key, Consumer consumer) {
return this.pending(key, consumer, Range.unbounded(), -1L);
}
// 获取待处理消息,使用范围和计数
Mono<PendingMessages> pending(K key, String group, Range<?> range, long count); // 获取待处理消息
// 获取待处理消息,使用消费者、范围和计数
Mono<PendingMessages> pending(K key, Consumer consumer, Range<?> range, long count); // 获取待处理消息
// 获取流的大小
Mono<Long> size(K key); // 获取流的大小
// 获取指定范围内的记录
default Flux<MapRecord<K, HK, HV>> range(K key, Range<String> range) {
return this.range(key, range, Limit.unlimited());
}
// 获取指定范围内的记录,带限制
Flux<MapRecord<K, HK, HV>> range(K key, Range<String> range, Limit limit); // 获取指定范围内的记录
// 获取指定范围内的记录,使用目标类型
default <V> Flux<ObjectRecord<K, V>> range(Class<V> targetType, K key, Range<String> range) {
return this.range(targetType, key, range, Limit.unlimited());
}
// 获取指定范围内的记录,使用目标类型和限制
default <V> Flux<ObjectRecord<K, V>> range(Class<V> targetType, K key, Range<String> range, Limit limit) {
Assert.notNull(targetType, "Target type must not be null");
return this.range(key, range, limit).map((it) -> {
return this.map(it, targetType);
});
}
// 读取流的记录
default Flux<MapRecord<K, HK, HV>> read(StreamOffset<K> stream) {
Assert.notNull(stream, "StreamOffset must not be null");
return this.read(StreamReadOptions.empty(), stream);
}
// 读取流的记录,使用目标类型
default <V> Flux<ObjectRecord<K, V>> read(Class<V> targetType, StreamOffset<K> stream) {
Assert.notNull(stream, "StreamOffset must not be null");
return this.read(targetType, StreamReadOptions.empty(), stream);
}
// 读取多个流的记录
default Flux<MapRecord<K, HK, HV>> read(StreamOffset<K>... streams) {
return this.read(StreamReadOptions.empty(), streams);
}
// 读取多个流的记录,使用目标类型
default <V> Flux<ObjectRecord<K, V>> read(Class<V> targetType, StreamOffset<K>... streams) {
return this.read(targetType, StreamReadOptions.empty(), streams);
}
// 读取流的记录,使用读取选项
Flux<MapRecord<K, HK, HV>> read(StreamReadOptions readOptions, StreamOffset<K>... streams); // 读取流的记录
// 读取流的记录,使用读取选项和目标类型
default <V> Flux<ObjectRecord<K, V>> read(Class<V> targetType, StreamReadOptions readOptions, StreamOffset<K>... streams) {
Assert.notNull(targetType, "Target type must not be null");
return this.read(readOptions, streams).map((it) -> {
return this.map(it, targetType);
});
}
// 读取流的记录,使用消费者
default Flux<MapRecord<K, HK, HV>> read(Consumer consumer, StreamOffset<K>... streams) {
return this.read(consumer, StreamReadOptions.empty(), streams);
}
// 读取流的记录,使用消费者和目标类型
default <V> Flux<ObjectRecord<K, V>> read(Class<V> targetType, Consumer consumer, StreamOffset<K>... streams) {
return this.read(targetType, consumer, StreamReadOptions.empty(), streams);
}
// 读取流的记录,使用消费者和读取选项
Flux<MapRecord<K, HK, HV>> read(Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams); // 读取流的记录
// 读取流的记录,使用消费者、读取选项和目标类型
default <V> Flux<ObjectRecord<K, V>> read(Class<V> targetType, Consumer consumer, StreamReadOptions readOptions, StreamOffset<K>... streams) {
Assert.notNull(targetType, "Target type must not be null");
return this.read(consumer, readOptions, streams).map((it) -> {
return this.map(it, targetType);
});
}
// 获取反向范围内的记录
default Flux<MapRecord<K, HK, HV>> reverseRange(K key, Range<String> range) {
return this.reverseRange(key, range, Limit.unlimited());
}
// 获取反向范围内的记录,带限制
Flux<MapRecord<K, HK, HV>> reverseRange(K key, Range<String> range, Limit limit); // 获取反向范围内的记录
// 获取反向范围内的记录,使用目标类型
default <V> Flux<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Range<String> range) {
return this.reverseRange(targetType, key, range, Limit.unlimited());
}
// 获取反向范围内的记录,使用目标类型和限制
default <V> Flux<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Range<String> range, Limit limit) {
Assert.notNull(targetType, "Target type must not be null");
return this.reverseRange(key, range, limit).map((it) -> {
return this.map(it, targetType);
});
}
// 修剪流的记录
Mono<Long> trim(K key, long count); // 修剪流的记录
// 修剪流的记录,带近似修剪
Mono<Long> trim(K key, long count, boolean approximateTrimming); // 修剪流的记录,带近似修剪
// 获取 HashMapper
<V> HashMapper<V, HK, HV> getHashMapper(Class<V> targetType); // 获取 HashMapper
// 将 MapRecord 转换为 ObjectRecord
default <V> ObjectRecord<K, V> map(MapRecord<K, HK, HV> record, Class<V> targetType) {
Assert.notNull(record, "Records must not be null");
Assert.notNull(targetType, "Target type must not be null");
return StreamObjectMapper.toObjectRecord(record, this, targetType);
}
// 反序列化记录
MapRecord<K, HK, HV> deserializeRecord(ByteBufferRecord record); // 反序列化记录