数据库
PythonJava前端运维
Redis
Redis
  • Redis 简介
  • 安装和连接
  • Redis 键(key)
  • Redis 数据类型
    • 基本数据类型
      • String
      • Lists
      • Sets
      • zset
      • Hashs
    • 特殊类型
      • Bitmaps
      • HyperLogLog
      • Geospatial
    • Sorted sets
    • Streams
    • Geospatial
    • Bitfields
  • Redis 配置
  • Redis CLI
  • Redis 持久化
  • Redis 事务
  • Redis 管道
  • Redis 发布订阅
  • Redis 脚本
  • Redis 数据备份与恢复
  • Redis 缓存问题
  • Redis 运维监控
由 GitBook 提供支持
在本页
  • 消息队列相关命令
  • XADD 【添加消息到末尾】
  • XTRIM:对流进行修剪,限制长度
  • XDEL:删除消息
  • XLEN:获取流包含的元素数量,即消息长度。
  • XRANGE 【获取消息列表】
  • XREVRANGE【反向获取消息列表】
  • XREAD:以阻塞或非阻塞方式获取消息列表。
  • 消费者组相关命令
  • XGROUP CREATE:创建消费者组
  • XREADGROUP GROUP:读取消费者组中的消息
  • XACK:将消息标记为“已处理”
  • XGROUP SETID:为消费者组设置新的最后递送消息 ID
  • XGROUP DELCONSUMER:删除消费者。
  • XGROUP DESTROY:删除消费者组。
  • XPENDING:显示待处理消息的相关信息。
  • XCLAIM:转移消息的归属权
  • XINFO:查看流和消费者组的相关信息
  • XINFO GROUPS:打印消费者组的信息
  • XINFO STREAM:打印流信息

这有帮助吗?

  1. Redis 数据类型

Streams

上一页Sorted sets下一页Geospatial

最后更新于8个月前

这有帮助吗?

Redis 5.0 版本新增了 Stream 数据结构,主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅(pub/sub)来实现消息队列的功能,但它有一个缺点就是消息无法持久化,如果网络断开、Reids 宕机等,消息就会被丢弃。

而 Redis Stream 提供了消息的持久化和主备份复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和 对应的内容。

每个 Stream 都有一个唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 执行追加消息时自动创建:

  • Message content:消息内容

  • Consumer Group:消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)

  • last_delivered_id:游标,每个消费组会有个游标last_delivered_id,任意一个消费者读取了消息都会使游标往前移动。

  • pending_ids:消费者(Consumer)的状态变量,作用是维护消费者的未确定的id,pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符 )

消息队列相关命令

XADD 【添加消息到末尾】

如果指定的队列不存在,则创建一个队列,XADD 语法格式:

XADD key ID filed value [filed value ...]
  • key:队列名称,如果不存在就创建

  • ID:消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要保证递增性。生成的消息 ID 由两部分组成,毫秒时间戳-该毫秒内产生的第N条消息。

  • field value:记录。结构类似 Hash 结构,以 key-value 的形式存在。

127.0.0.1:6379> XADD log * txt 'hello world' timestamp 1643775142000
"1725430527683-0"
127.0.0.1:6379> XADD log * txt 'hello world' timestamp 1643775143000
"1725430532903-0"
127.0.0.1:6379> XLEN log
(integer) 2
127.0.0.1:6379> XRANGE log - +
1) 1) "1725430527683-0"
   2) 1) "txt"
      2) "hello world"
      3) "timestamp"
      4) "1643775142000"
2) 1) "1725430532903-0"
   2) 1) "txt"
      2) "hello world"
      3) "timestamp"
      4) "1643775143000"
127.0.0.1:6379> 

XTRIM:对流进行修剪,限制长度

语法格式:

XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]
  • key:队列名称

  • MAXLEN:长度

  • MINID:允许的最小 ID

  • count:数量

127.0.0.1:6379> XTRIM log MAXLEN 1
(integer) 1
127.0.0.1:6379> XRANGE log - +
1) 1) "1725430532903-0"
   2) 1) "txt"
      2) "hello world"
      3) "timestamp"
      4) "1643775143000"
127.0.0.1:6379> 

XDEL:删除消息

XDEL key ID [ID ...]

XLEN:获取流包含的元素数量,即消息长度。

XRANGE 【获取消息列表】

获取消息列表,会自动过滤已删除的消息

XRANGE key start end [COUNT count]
  • key:队列名称

  • start:开始值,- 表示最小值

  • end:结束值,+ 表示最大值

  • count:表示获取多少条数据

XREVRANGE【反向获取消息列表】

反向获取消息列表,ID 从大到小。

XREVRANGE key end start [COUNT count]
  • key:队列名称

  • start:开始值,- 表示最小值

  • end:结束值,+ 表示最大值

  • count:数量

XREAD:以阻塞或非阻塞方式获取消息列表。

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • count :读取的消息数量。

  • milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式,设置为 0 表示永远阻塞。

  • key :队列名

  • id :消息 ID,返回大于指定 ID 的消息,

$ 表示特殊 ID,表示当前 Stream 已经存储的最大的 ID 作为最后一个ID,当前Stream中不存在大于当前ID的消息,因此此时返沪nil

0-0 表示从最小的ID开始获取Stream中的消息,当不指定count,将会返回所有消息,也可以使用0(00/00也都是可以的)。

127.0.0.1:6379> xread streams log  $
(nil)
127.0.0.1:6379> xread streams log  0
1) 1) "log"
   2) 1) 1) "1725432909707-0"
         2) 1) "txt"
            2) "hello world"
            3) "timestamp"
            4) "1643775142000"
      2) 1) "1725432915247-0"
         2) 1) "txt"
            2) "hello world"
            3) "timestamp"
            4) "1643775143000"
127.0.0.1:6379> 

消费者组相关命令

XGROUP CREATE:创建消费者组

XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read]
  • key:队列名称,如果不存在就创建

  • groupname:组名

  • $:表示从尾部开始消费,直接收最新消息,当前 Stream 消息会被全部忽略

# 从头开始消费
XGROUP CREATE log consumer-group-name 0

# 从尾部开始消费
XGROUP CREATE log consumer-group-name $

XREADGROUP GROUP:读取消费者组中的消息

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
  • group:消费组名

  • consumer:消费者名

  • count:读取数量

  • milliseconds:阻塞毫秒数。

  • key:队列名

  • ID:消息ID

XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >

Stream 中的消息一旦被消费者组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里的消费者不能消费同一条消息。但是不同消费组内的消费者可以消费同一条消息。

XACK:将消息标记为“已处理”

XGROUP SETID:为消费者组设置新的最后递送消息 ID

XGROUP DELCONSUMER:删除消费者。

XGROUP DESTROY:删除消费者组。

XPENDING:显示待处理消息的相关信息。

XCLAIM:转移消息的归属权

XINFO:查看流和消费者组的相关信息

XINFO GROUPS:打印消费者组的信息

XINFO STREAM:打印流信息

Redis Stream 数据结构
Drawing