Install and Run NATS Streaming Server

NATS是一个开源的、轻量级的、高性能的分布式消息通信系统,使用的公司有百度、西门子、VMware、HTC和爱立信。NATS Streaming是以NATS为动力的数据流系统,是用go语言写的,NATS Streaming server可执行文件的名称是nats-streaming-server,NATS Streaming嵌入,扩展,并且与核心的NATS平台无缝的对接,NATS Streaming server是根据麻省理工学院许可提供的开源软件,Apcera积极维护和支持NATS Streaming server。

此图来自官网,其他特性也可在官网查看。

测试环境

os:CentOS release 6.8 (Final) ip:192.168.0.31

Install the NATS Streaming server

我这里使用go环境安装,所以先部署go环境。

Install Go

wget https://storage.googleapis.com/golang/go1.8.3.linux-amd64.tar.gz

tar -C /usr/local -xzvf go1.8.3.linux-amd64.tar.gz

上面将go安装到/usr/local下。

配置环境变量
将下面这句添加到/etc/profile作为全局变量,或者$HOME/bash_profile作为此用户的变量。然后source此文件让环境变量生效。

export PATH=$PATH:/usr/local/go/bin

设置你的GOPATH

你的go工作目录(GOPATH)存储你的go代码。它可以是除了go安装目录(GOROOT)外的任何路径。

  • 为你的工作空间设置GOPATH环境变量
export GOPATH=$HOME/go
  • 还设置GOPATH/bin变量,用于运行编译go程序。
export PATH=$PATH:$GOPATH/bin

然后使用go get来下载源代码并进行编译安装:

go get github.com/nats-io/nats-streaming-server

上面编译安装完成后,会在GOPATH/bin目录下生成一个nats-streaming-server的二进制文件。

Start the NATS Streaming server

可以直接运行上面生成的nats-streaming-server二进制文件,启动一个独立的server。

nats-streaming-server

当启动成功时,你可以看到nats-streaming-server在tcp端口4222上监听客户端连接:

[3132] 2017/07/30 20:10:33.943821 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.6.0
[3132] 2017/07/30 20:10:33.944007 [INF] STREAM: ServerID: TN2MjoOfZd83WZLJIGwtYh
[3132] 2017/07/30 20:10:33.944016 [INF] STREAM: Go version: go1.8.3
[3132] 2017/07/30 20:10:33.944226 [INF] Starting nats-server version 0.9.6
[3132] 2017/07/30 20:10:33.944424 [INF] Listening for client connections on 0.0.0.0:4222
[3132] 2017/07/30 20:10:33.944436 [INF] Server is ready
[3132] 2017/07/30 20:10:34.225391 [INF] STREAM: Message store is MEMORY
[3132] 2017/07/30 20:10:34.225513 [INF] STREAM: ---------- Store Limits ----------
[3132] 2017/07/30 20:10:34.225531 [INF] STREAM: Channels:                  100 *
[3132] 2017/07/30 20:10:34.225543 [INF] STREAM: --------- Channels Limits --------
[3132] 2017/07/30 20:10:34.225555 [INF] STREAM:   Subscriptions:          1000 *
[3132] 2017/07/30 20:10:34.225566 [INF] STREAM:   Messages     :       1000000 *
[3132] 2017/07/30 20:10:34.225577 [INF] STREAM:   Bytes        :     976.56 MB *
[3132] 2017/07/30 20:10:34.225588 [INF] STREAM:   Age          :     unlimited *
[3132] 2017/07/30 20:10:34.225599 [INF] STREAM: ----------------------------------

启动带有NATS监控的NATS Streaming Server

NATS Streaming Server暴露监控界面在嵌入NATS Server(gnatsd)在端口8222。

nats-streaming-server -m 8222

运行可以看到如下信息:

[3143] 2017/07/30 20:20:06.961108 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.6.0
[3143] 2017/07/30 20:20:06.961250 [INF] STREAM: ServerID: EYubV70M1FW4xetDCi8cXJ
[3143] 2017/07/30 20:20:06.961268 [INF] STREAM: Go version: go1.8.3
[3143] 2017/07/30 20:20:06.963097 [INF] Starting nats-server version 0.9.6
[3143] 2017/07/30 20:20:06.963988 [INF] Starting http monitor on 0.0.0.0:8222
[3143] 2017/07/30 20:20:06.964162 [INF] Listening for client connections on 0.0.0.0:4222
[3143] 2017/07/30 20:20:06.964183 [INF] Server is ready
[3143] 2017/07/30 20:20:07.247235 [INF] STREAM: Message store is MEMORY
[3143] 2017/07/30 20:20:07.247366 [INF] STREAM: ---------- Store Limits ----------
[3143] 2017/07/30 20:20:07.247385 [INF] STREAM: Channels:                  100 *
[3143] 2017/07/30 20:20:07.247397 [INF] STREAM: --------- Channels Limits --------
[3143] 2017/07/30 20:20:07.247408 [INF] STREAM:   Subscriptions:          1000 *
[3143] 2017/07/30 20:20:07.247420 [INF] STREAM:   Messages     :       1000000 *
[3143] 2017/07/30 20:20:07.247430 [INF] STREAM:   Bytes        :     976.56 MB *
[3143] 2017/07/30 20:20:07.247457 [INF] STREAM:   Age          :     unlimited *
[3143] 2017/07/30 20:20:07.247471 [INF] STREAM: ----------------------------------

到谷歌浏览器访问192.168.0.31:8222可看到如下监控界面:
监控界面

使用Go NATS Streaming clients的例子

发布者

stan-pub.go

package main

import (
	"github.com/nats-io/go-nats-streaming"
	"runtime"
	"fmt"
	"time"
)

func main() {
	//stan.Connect(clusterID, clientID, ops ...Option)
	//默认clusterID为test-cluster
	ns, err := stan.Connect("test-cluster", "myid", stan.NatsURL("nats://192.168.0.31:4222"))
	if err != nil{
		panic(err)
	}
	// Simple Synchronous Publisher
	// does not return until an ack has been received from NATS Streaming
	t1 :=time.Now().Format("2006-01-02 15:04:05")
	//发布50000条消息
	for i:=0;i<50000;i++ {
		ns.Publish("logp", []byte("hello go"))
	}
	t2 :=time.Now().Format("2006-01-02 15:04:05")
	// 打印发布开始时间和结束时间,可看出发布50000条消息耗时
	fmt.Println("开始时间:"+t1, "结束时间"+t2)
	ns.Close()
	runtime.Goexit()
}

订阅者

stan-sub.go

package main

import (
	"github.com/nats-io/go-nats-streaming"
	"fmt"
	"runtime"
	"log"
)

func main() {
	//stan.Connect(clusterID, clientID, ops ...Option)
	ns, err := stan.Connect("test-cluster", "myid1", stan.NatsURL("nats://192.168.0.31:4222"))
	if err != nil{
		panic(err)
	}
	// Simple Synchronous Publisher
	// does not return until an ack has been received from NATS Streaming
	i:=0
	_, err1 := ns.Subscribe("logp",func(msg *stan.Msg){
		i++
		fmt.Printf("Received a message: %s+%d
", string(msg.Data),i)
	},stan.DurableName("cdn1"))
	if err1 != nil{
		panic(err1)
	}
	log.Printf("Listening on [%s]
", "logp")
	runtime.Goexit()
}

我是在IDEA中同时的运行这两个程序,则会看到发布出去的消息被及时的订阅了。

参考网址:
官网
go客户端

原文地址:https://www.cnblogs.com/zeppelin/p/7261033.html