博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Docker部属Nsq集群
阅读量:6070 次
发布时间:2019-06-20

本文共 5300 字,大约阅读时间需要 17 分钟。

  用一了段时间NSQ还是很稳定的。除了稳定,还有一个特别值的说的就是部署非常简单。总想写点什么推荐给大家使用nsq来做一些东西。但是就是因为他太简单易用,文档也比较简单易懂。一直不知道要写啥!!!!!

  nsq官网: 

  

  为了容灾需要对nsqd多机器部属,有了Docker后,快速扩还是很方便的。

  部署完后我会用go和c#写一些代码方便大家学习。

 

  准备工作:

  》两台服务器:192.168.0.49; 192.168.0.105.

  》需要在两台机器上安装好Docker

  》两台机器上镜像的拉取 

docker pull nsqio/nsq

  我们在105上启动lookup, nsqd和客户端都需要连接这个lookup。  

docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd

  

  在105和49上启动nsqd, lookup的地址要写105

docker run --name nsqd -p 4150:4150 -p 4151:4151     nsqio/nsq /nsqd     --broadcast-address=192.168.0.105     --lookupd-tcp-address=192.168.0.105:4160
docker run --name nsqd -p 4150:4150 -p 4151:4151     nsqio/nsq /nsqd     --broadcast-address=192.168.0.49     --lookupd-tcp-address=192.168.0.105:4160

 

 

 

  到了这一步就可以写代码发送和接收信息了。但是还有一个管理系统需要启动一下。nsqadmin 

docker run --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=192.168.0.105:4161

 

   用浏览器看一下管理端:http://192.168.0.105:4171/nodes。找开Nodes标签里面有两个节点。192.168.0.105  和 192.168.0.49。其他的你可以点开看看。

  我用go语言 简单写一个发送信息的例子:

  go使用的库是 go-nsq 地址  : 

  

func main() {    config := nsq.NewConfig()    // 随便给哪个ip发都可以    //w1, _ := nsq.NewProducer("192.168.0.105:4150", config)    w1, _ := nsq.NewProducer("192.168.0.49:4150", config)    err1 := w1.Ping()    if err1 != nil {        log.Fatal("should not be able to ping after Stop()")        return    }    defer w1.Stop()    topicName := "publishtest"    msgCount := 2    for i := 1; i < msgCount; i++ {        err1 := w1.Publish(topicName, []byte("测试测试publis test case"))        if err1 != nil {            log.Fatal("error")        }    }}

  可以尝试给49和105都发送一次试试。再看一下我们的管理页面:

  publishtest被ip105和49都发送过。但是还没有channel:

 

 客户端golang代码

package mainimport (    "fmt"    "github.com/nsqio/go-nsq"    "log"    "os"    "os/signal"    "strconv"    "time"    "sync")func main() {    topicName := "publishtest"    msgCount := 2    for i := 0; i < msgCount; i++ {        //time.Sleep(time.Millisecond * 20)        go readMessage(topicName, i)    }    //cleanup := make(chan os.Signal, 1)    cleanup := make(chan os.Signal)    signal.Notify(cleanup, os.Interrupt)    fmt.Println("server is running....")    quit := make(chan bool)    go func() {                select {            case <- cleanup:                fmt.Println("Received an interrupt , stoping service ...")                for _, ele := range consumers {                    ele.StopChan <- 1                    ele.Stop()                }                quit <- true        }    }()    <-quit    fmt.Println("Shutdown server....")}type ConsumerHandle struct {    q       *nsq.Consumer    msgGood int}var consumers []*nsq.Consumer = make([]*nsq.Consumer, 0)var mux *sync.Mutex = &sync.Mutex{}func (h *ConsumerHandle) HandleMessage(message *nsq.Message) error {    msg := string(message.Body) + "  " + strconv.Itoa(h.msgGood)    fmt.Println(msg)    return nil}func readMessage(topicName string, msgCount int) {    defer func() {        if err := recover(); err != nil {            fmt.Println("error: ", err)        }    }()    config := nsq.NewConfig()    config.MaxInFlight = 1000    config.MaxBackoffDuration = 500 * time.Second    //q, _ := nsq.NewConsumer(topicName, "ch" + strconv.Itoa(msgCount), config)    //q, _ := nsq.NewConsumer(topicName, "ch" + strconv.Itoa(msgCount) + "#ephemeral", config)    q, _ := nsq.NewConsumer(topicName, "ch"+strconv.Itoa(msgCount), config)    h := &ConsumerHandle{q: q, msgGood: msgCount}    q.AddHandler(h)    err := q.ConnectToNSQLookupd("192.168.0.105:4161")    //err := q.ConnectToNSQDs([]string{"192.168.0.105:4161"})    //err := q.ConnectToNSQD("192.168.0.49:4150")    //err := q.ConnectToNSQD("192.168.0.105:4415")    if err != nil {        fmt.Println("conect nsqd error")        log.Println(err)    }    mux.Lock()    consumers = append(consumers, q)    mux.Unlock()    <-q.StopChan    fmt.Println("end....")}

 

 

 

  运行一下,会启动两个终端:

  用我们的发送代码发送信息,再看我们的客户端

  

 

  c# 使用的库为NsqSharp.Core 地址为:

  

 

  简单客户端代码为:

 

class Program    {        static void Main()        {            // Create a new Consumer for each topic/channel            var consumerCount = 2;            var listC = new  List
(); for (var i = 0; i < consumerCount; i++) { var consumer = new Consumer("publishtest", $"channel{i}" ); consumer.ChangeMaxInFlight(2500); consumer.AddHandler(new MessageHandler()); consumer.ConnectToNsqLookupd("192.168.0.105:4161"); listC.Add(consumer); } var exitEvent = new ManualResetEvent(false); Console.CancelKeyPress += (sender, eventArgs) => { eventArgs.Cancel = true; listC.ForEach(x => x.Stop()); exitEvent.Set(); }; exitEvent.WaitOne(); } } public class MessageHandler : IHandler { ///
Handles a message. public void HandleMessage(IMessage message) { string msg = Encoding.UTF8.GetString(message.Body); Console.WriteLine(msg); } ///
/// Called when a message has exceeded the specified
. ///
///
The failed message. public void LogFailedMessage(IMessage message) { // Log failed messages } }

 

转载地址:http://tybgx.baihongyu.com/

你可能感兴趣的文章
构建内网的MySQL的yum源
查看>>
Ansible之十一:变量详解
查看>>
LeetCode283. Move ZeroesC语言
查看>>
Loadrunner进行md5加密方法
查看>>
Essential Grid for ASP.NET MVC
查看>>
Mobiscroll 三级联动地区选择
查看>>
使用kubeadm部署k8s集群00-缓存gcr.io镜像
查看>>
策略模式Strategy (分离算法,选择实现)
查看>>
Server 2012私有云之高可用——”瑞友杯”虚拟化征文
查看>>
django新建支持中文mysql数据库
查看>>
html之marquee详解
查看>>
十个糟糕的程序员的行为
查看>>
《淘宝技术这十年》笔记 (大图,手机勿入)
查看>>
Java中日期转换问题
查看>>
我的友情链接
查看>>
事件捕获/事件冒泡
查看>>
Android中通过typeface设置字体
查看>>
httpd主配置文档的介绍及小练习
查看>>
Centos 7.1 快速搭建postfix邮件系统
查看>>
学 Win32 汇编[32] - 子程序进阶
查看>>