1 背景
最近在研究nsq的相关源码。在研究源码之前,要搞明白nsq的使用,因此需要搭建nsq的环境。从nsq的官方文档上看,nsq安装一般也就是如下几种方法。首先需要说明的是,本人是mac电脑,安装过程也许不适用与非mac系统!
2 二进制方式
对于mac系统来说,只需要如下指令即可搞定:
brew install nsq
这种方法很简单,自己没有实际操作,仅供参考!
3 源码方式
直接clone源码:git clone https://github.com/nsqio/nsq;源码结构如下所示:

如果用源码运行,而不是Make后将可执行文件放到bin目录的这种方法,那么下载后,解决完所有的依赖包后,cd
进入到 nsqio/nsq/apps/nsqd
目录后,如果直接运行go run main.go
,则会提示:
nsqio/nsq/apps/nsqd/main.go:44:13: undefined: nsqdFlagSet
nsqio/nsq/apps/nsqd/main.go:54:10: undefined: config
......................
正确的做法是:go run ./
或 go run main.go options.go
,这样对应的服务才能运行起来!
首先启动nsqlookud,
然后启动nsqd, nsqd --lookupd-tcp-address=127.0.0.1:4160 --tcp-address=0.0.0.0:4150 --http-address=0.0.0.0:4151
最后启动nqsadmin ,nsqadmin --lookupd-http-address=127.0.0.1:4161
4 Docker
需要说明的是,我按照官方文档中提供的docker-compose 文件来启动nsq,失败了!没有找到具体原因,于是自己重写了。
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- 4160:4160
- 4161:4161
nsqd:
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 --broadcast-address=nsqd
depends_on:
- nsqlookupd
ports:
- 4150:4150
- 4151:4151
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- 4171:4171
执行docker-compose up
命名后,出现了界面:

新建文件,安装go-nsq依赖,构造生产者和消费者的demo(源码从网上来,实操后,是可以正常运行的)
package main
import (
"fmt"
"time"
"github.com/nsqio/go-nsq"
)
type ConsumerHandler struct{}
func (*ConsumerHandler) HandleMessage(msg *nsq.Message) error {
fmt.Println(string(msg.Body))
return nil
}
func Producer() {
producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())
if err != nil {
fmt.Println("NewProducer", err)
panic(err)
}
i := 1
for {
if err := producer.Publish("test", []byte(fmt.Sprintf("Hello World %d", i))); err != nil {
fmt.Println("Publish", err)
panic(err)
}
time.Sleep(time.Second * 1)
i++
}
}
func ConsumerA() {
consumer, err := nsq.NewConsumer("test", "test-channel-a", nsq.NewConfig())
if err != nil {
fmt.Println("NewConsumer", err)
panic(err)
}
consumer.AddHandler(&ConsumerHandler{})
if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
fmt.Println("ConnectToNSQLookupd", err)
panic(err)
}
}
func ConsumerB() {
consumer, err := nsq.NewConsumer("test", "test-channel-b", nsq.NewConfig())
if err != nil {
fmt.Println("NewConsumer", err)
panic(err)
}
consumer.AddHandler(&ConsumerHandler{})
if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
fmt.Println("ConnectToNSQLookupd", err)
panic(err)
}
}
func main() {
ConsumerA()
ConsumerB()
Producer()
}
执行后

最后在nsqadmin里面查看,已有数据:
