Streams
最后更新于
这有帮助吗?
最后更新于
这有帮助吗?
Redis 5.0 版本新增了 Stream 数据结构,主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅(pub/sub)来实现消息队列的功能,但它有一个缺点就是消息无法持久化,如果网络断开、Reids 宕机等,消息就会被丢弃。
而 Redis Stream 提供了消息的持久化和主备份复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和 对应的内容。
每个 Stream 都有一个唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 执行追加消息时自动创建:
Message content
:消息内容
Consumer Group
:消费组,使用 XGROUP CREATE
命令创建,一个消费组有多个消费者(Consumer)
last_delivered_id
:游标,每个消费组会有个游标last_delivered_id
,任意一个消费者读取了消息都会使游标往前移动。
pending_ids
:消费者(Consumer)的状态变量,作用是维护消费者的未确定的id,pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack
(Acknowledge character:确认字符 )
如果指定的队列不存在,则创建一个队列,XADD
语法格式:
key
:队列名称,如果不存在就创建
ID
:消息 id,我们使用 *
表示由 redis
生成,可以自定义,但是要保证递增性。生成的消息 ID 由两部分组成,毫秒时间戳-该毫秒内产生的第N条消息。
field value
:记录。结构类似 Hash 结构,以 key-value
的形式存在。
语法格式:
key
:队列名称
MAXLEN
:长度
MINID:允许的最小 ID
count
:数量
获取消息列表,会自动过滤已删除的消息
key
:队列名称
start
:开始值,-
表示最小值
end
:结束值,+
表示最大值
count
:表示获取多少条数据
反向获取消息列表,ID 从大到小。
key
:队列名称
start
:开始值,-
表示最小值
end
:结束值,+
表示最大值
count
:数量
count
:读取的消息数量。
milliseconds
:可选,阻塞毫秒数,没有设置就是非阻塞模式,设置为 0 表示永远阻塞。
key
:队列名
id
:消息 ID,返回大于指定 ID 的消息,
$ 表示特殊 ID,表示当前 Stream 已经存储的最大的 ID 作为最后一个ID,当前Stream中不存在大于当前ID的消息,因此此时返沪nil
0-0 表示从最小的ID开始获取Stream中的消息,当不指定count,将会返回所有消息,也可以使用0(00/00也都是可以的)。
key
:队列名称,如果不存在就创建
groupname
:组名
$
:表示从尾部开始消费,直接收最新消息,当前 Stream 消息会被全部忽略
group
:消费组名
consumer
:消费者名
count
:读取数量
milliseconds
:阻塞毫秒数。
key
:队列名
ID
:消息ID
Stream 中的消息一旦被消费者组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里的消费者不能消费同一条消息。但是不同消费组内的消费者可以消费同一条消息。