GO语言使用redis stream队列demo
package main
import (
context
fmt
github.com/go-redis/redis/v8
time
)
var client *redis.Client
var ctx context.Context
var key = my_streamKey //key
var myConsumer = my_consumer //消费者
var group = my_group // 消费者组的名称
func main() {
Init()
for {
XAdd()
time.Sleep(time.Second * 1)
}
}
func Init() {
// 创建Redis客户端
client = redis.NewClient(&redis.Options{
Addr: 127.0.0.1:6379, // Redis服务器地址和端口
Password: 123456, // Redis服务器密码,如果有的话
DB: 12, // Redis数据库索引
PoolSize: 200, // 连接池大小
})
ctx = client.Context()
groupInit()
// 启动消费者
go func() {
for {
streams, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group, // 消费者组的名称
Consumer: myConsumer, // 消费者的名称
Streams: []string{key, >}, // Stream的名称和ID
Count: 100, // 要读取的消息数量
Block: time.Second * 1, // 阻塞时间,0表示不阻塞
}).Result()
if err != nil {
//fmt.Println(XReadGroup error:, err)
continue
}
for _, stream := range streams {
streamName := stream.Stream
for _, message := range stream.Messages {
messageID := message.ID
messageValues := message.Values
fmt.Printf(Stream: %s, Message ID: %s, Values: %v\n, streamName, messageID, messageValues)
// 标记消息已经被消费者读取
err := client.XAck(ctx, key, group, messageID).Err()
if err != nil {
fmt.Println(XAck error:, err)
return
}
fmt.Println(消息已经被标记为已读取)
}
}
}
}()
}
func XAdd() {
// 发布消息到Stream
streamID, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: key, // Stream的名称
Values: map[string]interface{}{
key1: value1,
key2: value2,
},
}).Result()
if err != nil {
fmt.Println(XAdd error:, err)
return
}
fmt.Println(Stream ID:, streamID)
}
func groupInit() {
// 判断key是否存在
existsKey, err := client.Exists(ctx, key).Result()
if err != nil {
fmt.Println(Exists error:, err)
return
}
if existsKey == 1 {
fmt.Println(Key存在)
// 获取所有消费者组信息
groups, err := client.XInfoGroups(ctx, key).Result()
if err != nil {
fmt.Println(XInfoGroups error:, err)
return
}
// 判断目标消费者组是否存在
exists := false
for _, g := range groups {
if g.Name == group {
exists = true
break
}
}
if exists {
fmt.Println(消费者组存在)
} else {
fmt.Println(消费者组不存在)
// 创建Stream
streamCreated, err := client.XGroupCreateMkStream(ctx, key, group, 0).Result()
if err != nil {
fmt.Println(创建消费组 XGroupCreateMkStream error:, err)
}
fmt.Println(创建消费组:, streamCreated)
}
} else if existsKey == 0 {
fmt.Println(Key不存在)
// 创建Stream
streamCreated, err := client.XGroupCreateMkStream(ctx, key, group, 0).Result()
if err != nil {
fmt.Println(创建消费组 XGroupCreateMkStream error:, err)
}
fmt.Println(创建消费组:, streamCreated)
} else {
fmt.Println(Exists返回值异常)
}
}