前言

  关于NSQ是什么,NSQ是做什么的,怎么启动,网上的资料太多太多,详情请移步分布式实时消息平台NSQ,里面讲的很详细,还附带demo。

客户端/生产者(producer)

  NSQ发送消息非常简单,分两步完成:

  • 创建Producer实例
  • 调用Publish发送一个新的消息到指定的topic
    具体实现如下所示
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func main(){
    cfg := nsq.NewConfig()
    nsqd := "127.0.0.1:4150"
    producer, err := nsq.NewProducer(nsqd, cfg)
    if err != nil {
    log.Fatal(err)
    }
    if err := producer.Publish("test", []byte("Hello NSQ")); err != nil {
    log.Fatal("publish error:" + err.Error())
    }
    }

    服务端/消费者(consumer)

      消费者用于接收指定topic中的消息,实现需分为3步:
  • 调用NewConsumer为指定的主题/渠道创建消费者的新实例
  • 调用AddHandler为此使用者接收的消息设置处理程序
  • 调用ConnectToNSQD使用nsqd地址直接连接,有多个地址时使用ConnectToNSQDs,这里官方文档推荐使用ConnectToNSQLookupd
    具体实现如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    func main(){
    cfg := nsq.NewConfig()
    c, err := nsq.NewConsumer(topic, channel, cfg)
    if err != nil {
    panic(err)
    }
    c.AddHandler(&ConsumerT{})

    if err := c.ConnectToNSQD(address); err != nil {
    panic(err)
    }
    }
    func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
    return nil
    }

    消费者生产者搭配使用

      我上面所写的demo虽实现了最基本的NSQ的功能,但是对于一个demo来说,整体不够直观。理想中的状态应该是:服务端一直处于执行状态,客户端发送消息时,服务端接受并处理。
      改造后服务端:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    package main

    import (
    "fmt"

    "github.com/nsqio/go-nsq"
    )

    type ConsumerT struct{}

    func main() {
    InitConsumer("test", "ch1", "127.0.0.1:4150")
    select {}
    }

    func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
    return nil
    }

    func InitConsumer(topic string, channel string, address string) {
    cfg := nsq.NewConfig()
    c, err := nsq.NewConsumer(topic, channel, cfg)
    if err != nil {
    panic(err)
    }
    c.AddHandler(&ConsumerT{})

    if err := c.ConnectToNSQD(address); err != nil {
    panic(err)
    }
    }

      客户端这边呢,倒是没什么具体要更改的,可是每次都需要重复去运行才能发送消息也确实麻烦,所以做了点小更改,让客户端也一直处于运行状态,通过命令行的输入来发送消息,具体如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    package main

    import (
    "bufio"
    "fmt"
    "github.com/nsqio/go-nsq"
    "log"
    "os"
    "strings"
    )

    func main() {
    cfg := nsq.NewConfig()
    nsqd := "127.0.0.1:4150"
    producer, err := nsq.NewProducer(nsqd, cfg)
    if err != nil {
    log.Fatal(err)
    }

    reader := bufio.NewReader(os.Stdin)
    fmt.Println("Simple Shell")
    fmt.Println("---------------------")

    for {
    fmt.Print("-> topic: ")
    topic, _ := reader.ReadString('\n')
    topic = strings.Replace(topic, "\n", "", -1)
    fmt.Print("-> message: ")
    message, _ := reader.ReadString('\n')
    message = strings.Replace(message, "\n", "", -1)
    fmt.Println("消息发送中\n")

    if err := producer.Publish(topic, []byte(message)); err != nil {
    log.Fatal("publish error:" + err.Error())
    }

    }
    }
    源码请访问本人GitHub下载:https://github.com/InkDP/nsq-demo

    结语

      发送文章那一刻,发现上次发文章已经是5个月前,最近太懒都没学新的东西,希望早日回到高产似母猪的状态。