Streams
Redis 5.0 版本新增了 Stream 数据结构,主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅(pub/sub)来实现消息队列的功能,但它有一个缺点就是消息无法持久化,如果网络断开、Reids 宕机等,消息就会被丢弃。
而 Redis Stream 提供了消息的持久化和主备份复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和 对应的内容。
每个 Stream 都有一个唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 执行追加消息时自动创建:
Message content
:消息内容Consumer Group
:消费组,使用XGROUP CREATE
命令创建,一个消费组有多个消费者(Consumer)last_delivered_id
:游标,每个消费组会有个游标last_delivered_id
,任意一个消费者读取了消息都会使游标往前移动。pending_ids
:消费者(Consumer)的状态变量,作用是维护消费者的未确定的id,pending_ids 记录了当前已经被客户端读取的消息,但是还没有ack
(Acknowledge character:确认字符 )
消息队列相关命令
XADD 【添加消息到末尾】
如果指定的队列不存在,则创建一个队列,XADD
语法格式:
XADD key ID filed value [filed value ...]
key
:队列名称,如果不存在就创建ID
:消息 id,我们使用*
表示由redis
生成,可以自定义,但是要保证递增性。生成的消息 ID 由两部分组成,毫秒时间戳-该毫秒内产生的第N条消息。field value
:记录。结构类似 Hash 结构,以key-value
的形式存在。
127.0.0.1:6379> XADD log * txt 'hello world' timestamp 1643775142000
"1725430527683-0"
127.0.0.1:6379> XADD log * txt 'hello world' timestamp 1643775143000
"1725430532903-0"
127.0.0.1:6379> XLEN log
(integer) 2
127.0.0.1:6379> XRANGE log - +
1) 1) "1725430527683-0"
2) 1) "txt"
2) "hello world"
3) "timestamp"
4) "1643775142000"
2) 1) "1725430532903-0"
2) 1) "txt"
2) "hello world"
3) "timestamp"
4) "1643775143000"
127.0.0.1:6379>
XTRIM:对流进行修剪,限制长度
语法格式:
XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]
key
:队列名称MAXLEN
:长度MINID:允许的最小 ID
count
:数量
127.0.0.1:6379> XTRIM log MAXLEN 1
(integer) 1
127.0.0.1:6379> XRANGE log - +
1) 1) "1725430532903-0"
2) 1) "txt"
2) "hello world"
3) "timestamp"
4) "1643775143000"
127.0.0.1:6379>
XDEL:删除消息
XDEL key ID [ID ...]
XLEN:获取流包含的元素数量,即消息长度。
XRANGE 【获取消息列表】
获取消息列表,会自动过滤已删除的消息
XRANGE key start end [COUNT count]
key
:队列名称start
:开始值,-
表示最小值end
:结束值,+
表示最大值count
:表示获取多少条数据
XREVRANGE【反向获取消息列表】
反向获取消息列表,ID 从大到小。
XREVRANGE key end start [COUNT count]
key
:队列名称start
:开始值,-
表示最小值end
:结束值,+
表示最大值count
:数量
XREAD:以阻塞或非阻塞方式获取消息列表。
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
count
:读取的消息数量。milliseconds
:可选,阻塞毫秒数,没有设置就是非阻塞模式,设置为 0 表示永远阻塞。key
:队列名id
:消息 ID,返回大于指定 ID 的消息,
$ 表示特殊 ID,表示当前 Stream 已经存储的最大的 ID 作为最后一个ID,当前Stream中不存在大于当前ID的消息,因此此时返沪nil
0-0 表示从最小的ID开始获取Stream中的消息,当不指定count,将会返回所有消息,也可以使用0(00/00也都是可以的)。
127.0.0.1:6379> xread streams log $
(nil)
127.0.0.1:6379> xread streams log 0
1) 1) "log"
2) 1) 1) "1725432909707-0"
2) 1) "txt"
2) "hello world"
3) "timestamp"
4) "1643775142000"
2) 1) "1725432915247-0"
2) 1) "txt"
2) "hello world"
3) "timestamp"
4) "1643775143000"
127.0.0.1:6379>
消费者组相关命令
XGROUP CREATE:创建消费者组
XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read]
key
:队列名称,如果不存在就创建groupname
:组名$
:表示从尾部开始消费,直接收最新消息,当前 Stream 消息会被全部忽略
# 从头开始消费
XGROUP CREATE log consumer-group-name 0
# 从尾部开始消费
XGROUP CREATE log consumer-group-name $
XREADGROUP GROUP:读取消费者组中的消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
group
:消费组名consumer
:消费者名count
:读取数量milliseconds
:阻塞毫秒数。key
:队列名ID
:消息ID
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
Stream 中的消息一旦被消费者组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里的消费者不能消费同一条消息。但是不同消费组内的消费者可以消费同一条消息。
XACK:将消息标记为“已处理”
XGROUP SETID:为消费者组设置新的最后递送消息 ID
XGROUP DELCONSUMER:删除消费者。
XGROUP DESTROY:删除消费者组。
XPENDING:显示待处理消息的相关信息。
XCLAIM:转移消息的归属权
XINFO:查看流和消费者组的相关信息
XINFO GROUPS:打印消费者组的信息
XINFO STREAM:打印流信息
最后更新于
这有帮助吗?