Redis Stream

Redis支持Pub/Sub,但如果发布消息时订阅者没上线,那信息就会丢失。Redis在5.0中引入了Stream,提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Stream在Redis中是一种数据结构,可以简单的理解为一个消息链表。

消息(Entry)

每条消息除了内容外,都有一个唯一的ID。格式为{Unix毫秒时间戳}-{序号},比如1681365707499-0

创建

使用XADD命令向Stream追加一条消息。

1
2
redis> XADD persons * name Jim surname Green
"1681365707499-0"

*代表用户不指定,由redis来生成ID。

如果目标Stream不存在,会创建一个新的Stream。

1
2
3
4
redis> KEYS *
1) "persons"
redis> TYPE persons
stream

读取

使用XREAD来读取消息。

1
2
3
4
5
6
7
redis> XREAD STREAMS persons 0
1) 1) "persons"
2) 1) 1) "1681365707499-0"
2) 1) "name"
2) "Jim"
3) "surname"
4) "Green"

XREAD可以指定返回的消息条数,以及消息起始ID。
再添加三条数据:

1
2
3
4
5
6
redis> XADD persons * name Geoffrey surname Hinton
"1681365756516-0"
redis> XADD persons * name Yann surname LeCun
"1681365759975-0"
redis> XADD persons * name Andrew surname Ng
"1681365768735-0"

然后可以这样读取:

1
2
3
4
5
6
7
8
9
10
11
12
redis> XREAD COUNT 2 STREAMS persons 1681365707499-0
1) 1) "persons"
2) 1) 1) "1681365756516-0"
2) 1) "name"
2) "Geoffrey"
3) "surname"
4) "Hinton"
2) 1) "1681365759975-0"
2) 1) "name"
2) "Yann"
3) "surname"
4) "LeCun"

COUNT 2代表最多返回2条消息,最后的1681365707499-0代表从这条消息后(不含这条消息)开始读取。

除了XREAD外,还有XRANGEXREVRANGE可以读取消息。

查看Stream信息

创建后可使用XINFO STREAM查看Stream的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
redis> XINFO STREAM persons
1) "length"
2) (integer) 4
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1681365768735-0"
9) "groups"
10) (integer) 0
11) "first-entry"
12) 1) "1681365707499-0"
2) 1) "name"
2) "Jim"
3) "surname"
4) "Green"
13) "last-entry"
14) 1) "1681365768735-0"
2) 1) "name"
2) "Andrew"
3) "surname"
4) "Ng"

消息修剪

如果堆积的消息过多,会造成内存浪费,我们有两种方式来修剪。

  1. 在添加时指定最大消息条数

先添加一些测试数据。

1
2
3
4
5
6
7
8
redis> XADD tempstream * message "hello, 1"
"1681369081881-0"
redis> XADD tempstream * message "hello, 2"
"1681369084603-0"
redis> XADD tempstream * message "hello, 3"
"1681369086480-0"
redis> XLEN tempstream
(integer) 3

在添加时指定最大消息条数。

1
2
3
4
5
6
7
8
9
10
11
12
13
redis> XADD tempstream MAXLEN 3 * message "hello, 4"
"1681369112438-0"
redis> XREAD STREAMS tempstream 0
1) 1) "tempstream"
2) 1) 1) "1681369084603-0"
2) 1) "message"
2) "hello, 2"
2) 1) "1681369086480-0"
2) 1) "message"
2) "hello, 3"
3) 1) "1681369112438-0"
2) 1) "message"
2) "hello, 4"

因为指定了最多保留3条消息,所以最老的第1条消息被修剪。

  1. 使用XTRIM修剪
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
redis> XADD tempstream * message "hello, 5"
"1681369225281-0"
redis> XLEN tempstream
(integer) 4
redis> XTRIM tempstream MINID 1681369086480
(integer) 1
redis> XREAD STREAMS tempstream 0
1) 1) "tempstream"
2) 1) 1) "1681369086480-0"
2) 1) "message"
2) "hello, 3"
2) 1) "1681369112438-0"
2) 1) "message"
2) "hello, 4"
3) 1) "1681369225281-0"
2) 1) "message"
2) "hello, 5"

指定了MINID后,在这条消息之前的消息全部会被修剪。

同时,XTRIM也可以使用MAXLEN

1
2
3
4
5
6
7
8
9
10
11
12
13
redis> XTRIM tempstream MAXLEN ~ 2
(integer) 1
redis> XREAD STREAMS tempstream 0
1) 1) "tempstream"
2) 1) 1) "1681369086480-0"
2) 1) "message"
2) "hello, 3"
2) 1) "1681369112438-0"
2) 1) "message"
2) "hello, 4"
3) 1) "1681369225281-0"
2) 1) "message"
2) "hello, 5"

注意那个~号,它会采用更高效的算法来决定修剪多少条,在这里它觉得当前条数已经足够少决定不执行修剪。

消费者(Consumer)与消费者组(Consumer Group)

消费者组主要包含一个last_delivered_id,记录了最后一次成功消费的消息ID,用于跟踪消费者组在流中的进度。

创建

消费者组可使用XGROUP CREATE命令创建。

1
2
redis> XGROUP CREATE persons mygroup 0
OK

可以使用XINFO GROUPS查看Stream下的消费者组。

1
2
3
4
5
6
7
8
9
redis> XINFO GROUPS persons
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "0-0"

在创建消费者组时可以指定last-delivered-id,当然也可以创建完再使用XGROUP SETID修改,效果一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
redis> XGROUP CREATE persons tempgroup 1681365756516-0
OK
redis> XINFO GROUPS persons
1) ...
2) 1) "name"
2) "tempgroup"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1681365756516-0"
redis> XGROUP SETID persons tempgroup 1681365759975-0
OK
redis> XINFO GROUPS persons
1) ...
2) 1) "name"
2) "tempgroup"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1681365759975-0"
redis> XGROUP DESTROY persons tempgroup
(integer) 1

通过消费者读取消息

通过消费者组来读取消息需要改用XREADGROUP命令。

1
2
3
4
5
6
7
8
9
10
11
12
redis> XREADGROUP GROUP mygroup myconsumer COUNT 2 STREAMS persons >
1) 1) "persons"
2) 1) 1) "1681365707499-0"
2) 1) "name"
2) "Jim"
3) "surname"
4) "Green"
2) 1) "1681365756516-0"
2) 1) "name"
2) "Geoffrey"
3) "surname"
4) "Hinton"

在这里我们使用了消费者myconsumerXREADGROUP必须指定消费者组以及消费者。如果消费者不存在则会创建。
在读取完2条消息后,mygrouplast-delivered-id发生了变化:

1
2
3
4
5
6
7
8
9
redis> XINFO GROUPS persons
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 2
7) "last-delivered-id"
8) "1681365756516-0"

同时,consumers从0变成了1。一个新的消费者myconsumer被创建。我们可以用XINFO CONSUMERS来查看。

1
2
3
4
5
6
7
redis> XINFO CONSUMERS persons mygroup
1) 1) "name"
2) "myconsumer"
3) "pending"
4) (integer) 2
5) "idle"
6) (integer) 248693

我们可以注意到,mygrouppending从0变成了2,myconsumerpending也等于2。

挂起(pending)

pending代表未被处理的消息。
在上面的操作中,我们分配给了消费者myconsumer2条消息,myconsumer在处理完后,需要标记这2条消息为确认完成或拒绝完成,这样才能保证消息不丢失。

可以使用XPENDING命令来查看消费者组里挂起的消息。

1
2
3
4
5
6
redis> XPENDING persons mygroup
1) (integer) 2
2) "1681365707499-0"
3) "1681365756516-0"
4) 1) 1) "myconsumer"
2) "2"

可以使用XACK命令来确认消息处理完毕。被确认的消息会从待确认条目列表(Pending Entries List (PEL))中移去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
redis> XACK persons mygroup 1681365707499-0
(integer) 1
redis> XPENDING persons mygroup
1) (integer) 1
2) "1681365756516-0"
3) "1681365756516-0"
4) 1) 1) "myconsumer"
2) "1"
redis> XINFO CONSUMERS persons mygroup
1) 1) "name"
2) "myconsumer"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 1251414

我们再创建一个消费者,给它也分配2条消息看看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
redis> XGROUP CREATECONSUMER persons mygroup betterconsumer
(integer) 1
redis> XREADGROUP GROUP mygroup betterconsumer COUNT 2 STREAMS persons >
1) 1) "persons"
2) 1) 1) "1681365759975-0"
2) 1) "name"
2) "Yann"
3) "surname"
4) "LeCun"
2) 1) "1681365768735-0"
2) 1) "name"
2) "Andrew"
3) "surname"
4) "Ng"
redis> XPENDING persons mygroup
1) (integer) 3
2) "1681365756516-0"
3) "1681365768735-0"
4) 1) 1) "betterconsumer"
2) "2"
2) 1) "myconsumer"
2) "1"

标记1681365759975-0完成。

1
2
3
4
5
6
7
8
9
10
11
redis> XACK persons mygroup 1681365759975-0
(integer) 1
redis> XPENDING persons mygroup - + 10
1) 1) "1681365756516-0"
2) "myconsumer"
3) (integer) 2954839
4) (integer) 1
2) 1) "1681365768735-0"
2) "betterconsumer"
3) (integer) 572468
4) (integer) 1

2954839572468代表消费者已经消耗的时间。

超时处理

使用XCLAIM命令将超时的消息换人处理。

1
2
3
4
5
6
7
8
9
10
11
12
redis> XCLAIM persons mygroup betterconsumer 60000 1681365756516-0
1) 1) "1681365756516-0"
2) 1) "name"
2) "Geoffrey"
3) "surname"
4) "Hinton"
redis> XPENDING persons mygroup
1) (integer) 2
2) "1681365756516-0"
3) "1681365768735-0"
4) 1) 1) "betterconsumer"
2) "2"

也可使用XAUTOCLAIM来指定一个托底的消费者。