[TOC]
0x00 Go语言项目操作之NSQ分布式消息队列实践 描述: 目前比较流行的一个分布式的消息队列是RabbitMQ、ZeroMQ、Kafka(大项目中推荐)、NSQ
,本章将着重介绍 NSQ 基础概念、安装部署及Go语言如何操作NSQ分布式消息队列,因为NSQ是采用Go语言进行开发使用的。
NSQ 快速了解 Q: 什么是NSQ? 答: NSQ 是一个基于Go语言的分布式实时消息平台, 具有分布式,易于水平扩展,易于安装,易于集成(主流语言都有对应的客户端库)的特点。
其主要核心概念
Topic: 在生产者publish时会创建topic,一个topic就是程序发布消息的一个逻辑键。
Channels: 通道组与消费者相关,是消费者之间的负载均衡,channel在某种意义上来说是一个“队列”。每当一个发布者发送一条消息到一个topic,消息会被复制到所有消费者连接的channel上,消费者通过这个特殊的channel读取消息,实际上,在消费者第一次订阅时就会创建channel。(Channel会将消息进行排列,如果没有消费者读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中)
Messages: 消息构成了我们数据流的中坚力量,消费者可以选择结束消息,表明它们正在被正常处理,或者重新将他们排队待到后面再进行处理。每个消息包含传递尝试的次数,当消息传递超过一定的阀值次数时,我们应该放弃这些消息,或者作为额外消息进行处理。
NSQ 的优势 :
安装运行简单: 易于配置和部署,并且内置了管理界面。
协议简单: NSQ 支持多种语言客户端接入,其有一个快速的二进制协议,通过短短的几天工作量就可以很简单地实现这些协议,我们还自己创建了我们的纯JS驱动(当时只存在coffeescript驱动)
在线扩容:NSQ 支持横向扩展,没有任何集中式代理。
分布式 : 提倡分布式和分散的拓扑,没有单点故障,支持容错和高可用性,并提供可靠的消息交付保证
NSQ 的特性 :
持久化模式方案: 采用的方式时内存+硬盘的模式,当内存到达一定程度时就会将数据持久化到硬盘, 如果设置了--mem-queue-size=0
则所有的消息将会存储到磁盘。
队列中的每条消息至少传递一次。
队列中消息不保证有序的。
NSQ 四个重要组件构成 : (1) nsqd
:一个负责接收、排队、转发消息到客户端的守护进程,它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的, 其默认监听端口4150
和4151
。
[TOC]
0x00 Go语言项目操作之NSQ分布式消息队列实践 描述: 目前比较流行的一个分布式的消息队列是RabbitMQ、ZeroMQ、Kafka(大项目中推荐)、NSQ
,本章将着重介绍 NSQ 基础概念、安装部署及Go语言如何操作NSQ分布式消息队列,因为NSQ是采用Go语言进行开发使用的。
NSQ 快速了解 Q: 什么是NSQ? 答: NSQ 是一个基于Go语言的分布式实时消息平台, 具有分布式,易于水平扩展,易于安装,易于集成(主流语言都有对应的客户端库)的特点。
其主要核心概念
Topic: 在生产者publish时会创建topic,一个topic就是程序发布消息的一个逻辑键。
Channels: 通道组与消费者相关,是消费者之间的负载均衡,channel在某种意义上来说是一个“队列”。每当一个发布者发送一条消息到一个topic,消息会被复制到所有消费者连接的channel上,消费者通过这个特殊的channel读取消息,实际上,在消费者第一次订阅时就会创建channel。(Channel会将消息进行排列,如果没有消费者读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中)
Messages: 消息构成了我们数据流的中坚力量,消费者可以选择结束消息,表明它们正在被正常处理,或者重新将他们排队待到后面再进行处理。每个消息包含传递尝试的次数,当消息传递超过一定的阀值次数时,我们应该放弃这些消息,或者作为额外消息进行处理。
NSQ 的优势 :
安装运行简单: 易于配置和部署,并且内置了管理界面。
协议简单: NSQ 支持多种语言客户端接入,其有一个快速的二进制协议,通过短短的几天工作量就可以很简单地实现这些协议,我们还自己创建了我们的纯JS驱动(当时只存在coffeescript驱动)
在线扩容:NSQ 支持横向扩展,没有任何集中式代理。
分布式 : 提倡分布式和分散的拓扑,没有单点故障,支持容错和高可用性,并提供可靠的消息交付保证
NSQ 的特性 :
持久化模式方案: 采用的方式时内存+硬盘的模式,当内存到达一定程度时就会将数据持久化到硬盘, 如果设置了--mem-queue-size=0
则所有的消息将会存储到磁盘。
队列中的每条消息至少传递一次。
队列中消息不保证有序的。
NSQ 四个重要组件构成 : (1) nsqd
:一个负责接收、排队、转发消息到客户端的守护进程,它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的, 其默认监听端口4150
和4151
。1 2 3 4 5 6 ./nsqd -broadcast-address=192.168.1.2:4160 ./nsqd -broadcast-address=192.168.1.2 -lookupd-tcp-address=192.168.1.2:4160
nsqlookupd
:管理拓扑信息并提供最终一致性的发现服务的守护进程,值得注意其数据并不是持久化保存,也不需要与任何其他nsqlookupd实例协调以满足查询, 因此根据你系统的冗余要求尽可能多地部署nsqlookupd节点(通常三个), 其默认监听端口4160
和4161
。
nsqadmin
:它是一套实时监控集群状态、执行各种管理任务的Web管理平台, 默认监听端口4171
。
1 2 ./nsqadmin -lookupd-http-address=192.168.1.2:4161
utilities
:常见基础功能、数据流处理工具,如 nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq
NSQ 架构及其工作模式 我们首先开看NSQ工作模式图,值得非常注意的一点就是在非集群模式下可以直接连接指定的nsqd, 而集群模式下则通过nsqlookup查询到nsqd地址再连接。
weiyigeek.top-NSQ工作模式
上图中每个nsqd实例旨在一次处理多个数据流, 该数据流称为”topics”,并且topic与channels是1对多的关系, 每个channel都会收到topic所有消息的副本,实际上下游的服务是通过对应的channel来消费topic消息。
topic 在首次使用时创建,方法是将其发布到指定topic,或者订阅指定topic上的channel
channel 是通过订阅指定的channel在第一次使用时创建的。
topic 和 channel 都相互独立地缓冲数据,防止缓慢的消费者导致其他chennel的积压(同样适用于topic级别),但是channel可以并且通常会连接多个客户端。
假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端,如下图所示:
weiyigeek.top-NSQ工作模式
总而言之消息是从topic -> channel
(每个channel接收该topic的所有消息的副本)多播的,但是从channel -> consumers
均匀分布(每个消费者接收该channel的一部分消息)。
NSQ 消息队列的应用场景 (1) 异步处理: 我们可以利用消息队列把业务流程中的非关键流程异步化,从而显著降低业务请求的响应时间。
(2) 应用解耦: 通过使用消息队列将不同的业务逻辑解耦,降低系统间的耦合,提高系统的健壮性,后续有其他业务要使用订单数据可直接订阅消息队列,提高系统的灵活性。
weiyigeek.top-应用解耦
(3) 流量削峰: 在类似秒杀(大秒)等场景下,某一时间可能会产生大量的请求,使用消息队列能够为后端处理请求提供一定的缓冲区,保证后端服务的稳定性,例如:秒杀请求 --Write--> 消息队列 --根据规则读取--> 请求处理。
(4) 消息通信: 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯, 例如实现点对点消息队列,或者聊天室进行消息发布和接收等。1 2 3 4 5 客户端A -->> 消息队列 <<-- 客户端B 客户端A <<-->> 消息队列 <<-->> 客户端B
例如: NSQ接收和发送消息流程如下图所示。
weiyigeek.top-NSQ接收和发送消息流程
附录补充
NSQ 官网地址: https://nsq.io/
NSQ 安装配置 从NSQ官方下载页面(https://nsq.io/deployment/installing.html) , 根据自己的平台下载并解压到指定目录, 然后设置环境变量即可。
此处使用Docker方式安装部署测试 实践环境说明:1 2 3 4 5 6 $ docker --version Docker version 19.03.15, build 99e3ed8919 $ docker-compose --version docker-compose version 1.25.0, build unknown nsq v1.2.1
步骤01.首先创建一个 docker-compose.yml 存放了容器运行配置清单。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 $ vim docker-compose.yml version: '2' services: nsqlookupd: container_name: nsqlookupd image: nsqio/nsq command : /nsqlookupd ports: - "4160:4160" - "4161:4161" nsqd: image: nsqio/nsq container_name: nsqd command : /nsqd --lookupd-tcp-address=nsqlookupd:4160 ports: - "4150:4150" - "4151:4151" nsqadmin: image: nsqio/nsq container_name: nsqadmin command : /nsqadmin --lookupd-http-address=nsqlookupd:4161 ports: - "4171:4171"
Tips:从 上面的docker-compose.yml文件可以看到 nsqd服务
需要注册到 nsqlookupd 的 4160 端口, 而 nsqadmin服务
需要注册到 nsqlookupd 的 4161 端口
步骤02.在该yml文件同级目录下执行如下命令进行创建并后台运行容器。1 2 3 4 5 6 7 8 9 10 $ docker-compose up -d $ docker ps
步骤03.访问nsqadmin提供的消息队列监控的Web管理平台 http://10.10.107.225:4171/lookup。
weiyigeek.top-nsqadmin-Web管理平台
至此安装完毕,通过docker来部署NSQ是非常简单的。
NSQ 实践操作 1.go-nsq 安装 描述: NSQ官方为了开发者提供了Go语言版的客户端go-nsq
(https://github.com/nsqio/go-nsq),更多客户端支持请查看CLIENT LIBRARIES(https://nsq.io/clients/client_libraries.html)。
go-nsq库安装命令如下所示: 1 2 3 4 5 6 7 ➜ src cd weiyigeek.top ➜ weiyigeek.top go get -u github.com/nsqio/go-nsq
2.go-nsq 简单使用 生产者Producer 简单的生产者示例代码如下:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 package mainimport ( "bufio" "fmt" "log" "os" "strings" "github.com/nsqio/go-nsq" ) var Producer *nsq.Producertype NsqProducer struct { nsqd_host string nsqd_port int } func (NP NsqProducer) InitProducer () (*nsq.Producer, error) { config := nsq.NewConfig() nsqdAddr := fmt.Sprintf("%s:%d" , NP.nsqd_host, NP.nsqd_port) Producer, err := nsq.NewProducer(nsqdAddr, config) if err != nil { fmt.Printf("create producer failed, err:%v\n" , err) return nil , err } return Producer, nil } func main () { nsqd := &NsqProducer{ nsqd_host: "10.10.107.225" , nsqd_port: 4150 , } Producer, err := nsqd.InitProducer() if err != nil { fmt.Printf("Init producer failed, err:%v\n" , err) return } else { log.Printf("Init Producer success!" ) } fmt.Println("请输入你要向Topic_Demo消息队列传递的消息:" ) reader := bufio.NewReader(os.Stdin) for { data, err := reader.ReadString('\n' ) if err != nil { fmt.Printf("read string from stdin failed, err:%v\n" , err) continue } data = strings.TrimSpace(data) if strings.ToUpper(data) == "Q" { break } err = Producer.Publish("Topic_Demo" , []byte (data)) if err != nil { fmt.Printf("publish msg to nsq failed, err:%v\n" , err) continue } } }
消费者-Consumer 简单的消费者示例代码如下:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 package mainimport ( "fmt" "os" "os/signal" "syscall" "time" nsq "github.com/nsqio/go-nsq" ) type NsqConsumer struct { Title string } func (m *NsqConsumer) HandleMessage (msg *nsq.Message) (err error) { fmt.Printf("%s : recv from %v, msg:%v\n" , m.Title, msg.NSQDAddress, string (msg.Body)) return } func (NC *NsqConsumer) InitConsumer (topic string , channel string , address string ) (err error) { config := nsq.NewConfig() config.LookupdPollInterval = 15 * time.Second c, err := nsq.NewConsumer(topic, channel, config) if err != nil { fmt.Printf("create consumer failed, err:%v\n" , err) return } c.AddHandler(NC) if err := c.ConnectToNSQLookupd(address); err != nil { return err } return nil } func main () { consumer := &NsqConsumer{ Title: "NSQ_USE" , } err := consumer.InitConsumer("Topic_Demo" , "channel_first" , "10.10.107.225:4161" ) if err != nil { fmt.Printf("init consumer failed, err:%v\n" , err) return } c := make (chan os.Signal) signal.Notify(c, syscall.SIGINT) <-c }
执行结果: 从结果可以看到当开启多个消费者时会一个发送一次消息队列中的信息。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 ➜ demo1 go run . 2021/12/28 21:09:40 Init Producer success! 请输入你要向Topic_Demo消息队列传递的消息: Whoami 2021/12/28 21:21:04 INF 1 (10.10.107.225:4150) connecting to nsqd WeiyiGeek ... topic测试 2021/12/28 21:21:56 INF 1 (10.10.107.225:4150) connecting to nsqd channel test ➜ demo2 go run . 2021/12/28 21:21:37 INF 1 [Topic_Demo/channel_first] (ae4830fd10e1:4150) connecting to nsqd NSQ_USE : recv from ae4830fd10e1:4150, msg:Whoami NSQ_USE : recv from ae4830fd10e1:4150, msg:WeiyiGeek NSQ_USE : recv from ae4830fd10e1:4150, msg:topic测试 NSQ_USE : recv from ae4830fd10e1:4150, msg:channel ➜ demo2 go run . 2021/12/28 21:22:19 INF 1 [Topic_Demo/channel_first] querying nsqlookupd http://10.10.107.225:4161/lookup?topic=Topic_Demo 2021/12/28 21:22:19 INF 1 [Topic_Demo/channel_first] (ae4830fd10e1:4150) connecting to nsqd NSQ_USE : recv from ae4830fd10e1:4150, msg:test
weiyigeek.top-Producer与Consumer实践
Tips: 在客户端执行是如果采用ConnectToNSQLookupd方法即通过lookupd查询连NSQD,需要在hosts绑定对应的容器hostname和宿主机地址(粗暴解决)。例如此处1 2 3 4 $ cat /etc/hosts 127.0.0.1 localhost 127.0.1.1 Ubuntu-PC 10.10.107.225 ae4830fd10e1
Tips: 此处我们可以通过nsqdadmin
提供的Web页面查看到我们Publish的topic,以及我们生产者向队列传递的值,和消费者从通道中接收到的值,点击页面上的Topic_Demo就能进入一个展示更多详细信息的页面, 而在/counter
页面显示处理的消息数量。
weiyigeek.top-Topic&Channel查看
Tips: 在/lookup
界面支持创建topic和channel, 这是提供了一种在将服务部署到生产环境之前设置流层次结构的方法, 如果频道名称为空,则只创建主题。
3.go-nsq 直连方式 描述: 上面实践了通过nsqlookupd的http接口查询后长连接到nsqd, 本节将简单演示直连nsqd(tcp长连接)写法。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 package mainimport ( "flag" "log" "time" "github.com/nsqio/go-nsq" ) func main () { go startConsumer() startProducer() } var url string func init () { flag.StringVar(&url, "url" , "10.10.107.225:4150" , "nsqd" ) flag.Parse() } func startProducer () { cfg := nsq.NewConfig() producer, err := nsq.NewProducer(url, cfg) if err != nil { log.Fatal(err) } for { if err := producer.Publish("DirectConnection" , []byte ("test message" )); err != nil { log.Fatal("publish error: " + err.Error()) } time.Sleep(1 * time.Second) } } func startConsumer () { cfg := nsq.NewConfig() consumer, err := nsq.NewConsumer("DirectConnection" , "first" , cfg) if err != nil { log.Fatal(err) } consumer.AddHandler(nsq.HandlerFunc(func (message *nsq.Message) error { log.Println(string (message.Body)) return nil })) if err := consumer.ConnectToNSQD(url); err != nil { log.Fatal(err) } <-consumer.StopChan }
至此完毕!