NSQ是Go语言编写的,开源的分布式消息队列中间件,具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,同时支持横向扩展,操作友好,是一个成熟的、已在大规模生成环境下应用的产品。
2. 消息中间件应用场景 2.1 异步处理参照下图利用消息队列把业务流程中的非关键流程异步化,从而显著降低业务请求的响应时间。
]
通过使用消息队列将不同的业务逻辑解耦,降低系统间的耦合,提高系统的健壮性。后续有其他业务要使用订单数据可直接订阅消息队列,提高系统的灵活性。
2.3 流量削峰 类似秒杀(大秒)等场景下,某一时间可能会产生大量的请求,使用消息队列能够为后端处理请求提供一定的缓冲区,保证后端服务的稳定性。
3. 组件 3.1 nsqd nsqd是一个守护进程,它接收、排队并向客户端发送消息。默认监听了两个tcp端口,一个(4150)用来服务客户端,一个(4151)用来提供http端口,此外还可以选择性地监听一个可选的https端口。
3.2 nsqlookupd nsqlookupd是维护所有nsqd状态、提供服务发现的守护进程。它能为消费者查找特定topic下的nsqd提供了运行时的自动发现服务。 **它不维持持久状态,也不需要与任何其他nsqlookupd实例协调以满足查询。**因此根据你系统的冗余要求尽可能多地部nsqlookupd节点。它们消耗的资源很少,可以与其他服务共存。我们的建议是为每个数据中心运行至少3个集群。
3.3 nsqadmin一个实时监控集群状态、执行各种管理任务的Web管理平台。
4. NSQ架构设计 4.1 流程图 4.2 Topic和Channel-
每个nsqd实例旨在一次处理多个数据流。这些数据流称为“topics”,一个topic具有1个或多个“channels”。每个channel都会收到topic所有消息的副本,实际上下游的服务是通过对应的channel来消费topic消息。
-
topic和channel不是预先配置的。topic在首次使用时创建,方法是将其发布到指定topic,或者订阅指定topic上的channel。channel是通过订阅指定的channel在第一次使用时创建的。
-
topic和channel都相互独立地缓冲数据,防止缓慢的消费者导致其他chennel的积压(同样适用于topic级别)。
-
channel可以并且通常会连接多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。例如:
总而言之,消息是从topic -> channel(每个channel接收该topic的所有消息的副本)多播的,但是从channel -> consumers均匀分布(每个消费者接收该channel的一部分消息)。
- 消息默认不持久化,可以配置成持久化模式。nsq采用的方式时内存+硬盘的模式,当内存到达一定程度时就会将数据持久化到硬盘。
- 如果将--mem-queue-size设置为0,所有的消息将会存储到磁盘。
- 服务器重启时也会将当时在内存中的消息持久化。
- 每条消息至少传递一次。
- 消息不保证有序。
5.2 使用教程下载地址: https://nsq.io/deployment/installing.html,选择对应的环境去下载NSQ
双击nsqlookupd.exe启动
监听了4160的tcp端口和4161的http端口
打开bin的cmd界面 输入命令指定nsqlookupd地址
nsqd --lookupd-tcp-address=127.0.0.1:4160
指定nsqadmin地址 打开nsqadmin
nsqadmin -lookupd-http-address=127.0.0.1:4161
发现监听的4171端口的http服务我们去打开web管理页面,如下:
上述如果遇见admin管理页面打开有红色报错,说明上述服务中,有其中的进程没有启动成功!请按照上面的顺序重新再启动一遍!
6. Demo演示 6.1 生产者// Package nsq
// nsq_producer/main.go
package main
import (
"bufio"
"fmt"
"os"
"strings"
"github.com/nsqio/go-nsq"
)
// NSQ Producer Demo
var producer *nsq.Producer
// 初始化生产者
func initProducer(str string) (err error) {
config := nsq.NewConfig()
producer, err = nsq.NewProducer(str, config)
if err != nil {
fmt.Printf("create producer failed, err:%vn", err)
return err
}
return nil
}
func main() {
nsqAddress := "127.0.0.1:4150"
err := initProducer(nsqAddress)
if err != nil {
fmt.Printf("init producer failed, err:%vn", err)
return
}
reader := bufio.NewReader(os.Stdin) // 从标准输入读取
for {
data, err := reader.ReadString('n')
if err != nil {
fmt.Printf("read string from stdin failed, err:%vn", err)
continue
}
data = strings.TrimSpace(data)
if strings.ToUpper(data) == "Q" { // 输入Q退出
break
}
// 向 'topic_demo' publish 数据
err = producer.Publish("topic_demo", []byte(data))
if err != nil {
fmt.Printf("publish msg to nsq failed, err:%vn", err)
continue
}
}
}
- 运行,且在控制台随便输入一串数据
- 发现admin页面有Demo代码中生产者所发布的Topic 【topic_demo】
- 点击Topic进来发现确实有一条发布的数据信息
- 去控制台再发一段信息,admin确实会加入一条Messages
这个代表信息的消费数量,0则为未消费
下面也可以在admin为集群地址的nsqlookupd手动配置nsqd 的topic和对应的channel
下面可以看到对应主机连接的nsqd节点以及和生产者的关系
6.2 消费者// Package main
// nsq_consumer/main.go
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/nsqio/go-nsq"
)
// NSQ Consumer Demo
// MyHandler 是一个消费者类型
type MyHandler struct {
Title string
}
// HandleMessage 是需要实现的处理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
fmt.Printf("%s recv from %v, msg:%vn", m.Title, msg.NSQDAddress, string(msg.Body))
return
}
// 初始化消费者
func initConsumer(topic string, channel string, address string) (err error) {
config := nsq.NewConfig()
config.LookupdPollInterval = 15 * time.Second
c, err := nsq.NewConsumer(topic, channel, config)
if err != nil {
fmt.Printf("create consumer failed, err:%vn", err)
return
}
consumer := &MyHandler{
Title: "沙河1号",
}
c.AddHandler(consumer)
// if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD
if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询
return err
}
return nil
}
func main() {
err := initConsumer("topic_demo", "first", "127.0.0.1:4161")
if err != nil {
fmt.Printf("init consumer failed, err:%vn", err)
return
}
c := make(chan os.Signal) // 定义一个信号的通道
signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c
<-c // 阻塞
}
- 运行,拿到之前生产者的两条数据
- 下面这个是消费者的策略,代码中定义每15秒去查询一遍nsq中对应topic的数据
- 现在生产者再输入数据,消费依然能拿到
发现消费者的channel First消费了3条数据,是对的,且生产者有一个消费者的channel
6.3 第二个消费者将消费者代码copy一份,并将channel换为two,再次启动,查看生产者在发布一条数据后,两名消费者会表现如何?
后来启动的消费者无法拿到已经被第一个消费者消费的数据了,这时候发现生产者的topic多了一个two的channel绑定上了
文末生产者发送消息"消费者1号",发现两名消费者都收到了,说明nsqd是广播模式,会发送副本给到topic下的每个channel
NSQ的基本使用和功能介绍完毕,进阶功能则是利用好NSQ作为消息中间件的功能去做一个demo使用一下,这将会对掌握有极大的帮助!以及对NSQ的策略可以研究一下,和Demo示例如何更好的写出通用的消息中间件去发布订阅!



