golang连接activemq,发送接收数据

介绍

使用golang连接activemq发送数据的话,需要使用一个叫做stomp的包,直接go get github.com/go-stomp/stomp即可

代码

生产者

package main

import (
	"fmt"
	"github.com/go-stomp/stomp"
	"time"
)

func main(){
	// 调用Dial方法,第一个参数是"tcp",第二个参数则是ip:port
	// 返回conn(连接)和err(错误)
	conn,err:=stomp.Dial("tcp", "47.adsasaads89:61613")
	// 错误判断
	if err!=nil{
		fmt.Println("err =", err)
		return
	}
	//发送十条数据
	for i:=0;i<10;i++ {
		// 调用conn下的send方法,接收三个参数
		//参数一:队列的名字
		//参数二:数据类型,一般是文本类型,直接写text/plain即可
		//参数三:内容,记住要转化成byte数组的格式
		//返回一个error
		err := conn.Send("testQ", "text/plain",[]byte(fmt.Sprintf("message:%d", i)))
		if err!=nil{
			fmt.Println("err =", err)
		}
	}
	/*
	这里为什么要sleep一下,那就是conn.Send这个过程是不阻塞的
	相当于Send把数据放到了一个channel里面
	另一个goroutine从channel里面去取数据再放到消息队列里面
	但是还没等到另一个goroutine放入数据,此时循环已经结束了
	因此最好要sleep一下,根据测试,如果不sleep,那么发送1000条数据,
	最终进入队列的大概是980条数据,这说明了什么
	说明了当程序把1000条数据放到channel里面的时候,另一个goroutine只往队列里面放了980条
	剩余的20条还没有来得及放入,程序就结束了
	 */
	time.Sleep(time.Second * 1)
}

消费者

package main

import (
	"fmt"
	"github.com/go-stomp/stomp"
	"time"
)

func recv_data(ch chan *stomp.Message) {
	//不断地循环,从channel里面获取数据
	for {
		v := <-ch
		//这里是打印当然还可以做其他的操作,比如写入hdfs平台
		//v是*stomp.Message类型,属性都在这里面
		
		/*
		type Message struct {
			// Indicates whether an error was received on the subscription.
			// The error will contain details of the error. If the server
			// sent an ERROR frame, then the Body, ContentType and Header fields
			// will be populated according to the contents of the ERROR frame.
			Err error
		
			// Destination the message has been sent to.
			Destination string
		
			// MIME content type.
			ContentType string // MIME content
		
			// Connection that the message was received on.
			Conn *Conn
		
			// Subscription associated with the message.
			Subscription *Subscription
		
			// Optional header entries. When received from the server,
			// these are the header entries received with the message.
			Header *frame.Header
		
			// The ContentType indicates the format of this body.
			Body []byte // Content of message
		}
		 */
		fmt.Println(string(v.Body))
	}
}

func main() {
	//创建一个channel,存放的是*stomp.Message类型
	ch := make(chan *stomp.Message)
	//将管道传入函数中
	go recv_data(ch)
	//和生产者一样,调用Dial方法,返回conn和err
	conn, err := stomp.Dial("tcp", "47.dsdsadsa9:61613")
	if err != nil {
		fmt.Println("err =", err)
	}
	//消费者订阅这个队列
	//参数一:队列名
	//参数二:确认信息,直接填默认地即可
	sub, err := conn.Subscribe("testQ", stomp.AckMode(stomp.AckAuto))
	for { //无限循环
		select {
		//sub.C是一个channel,如果订阅的队列有数据就读取
		case v := <-sub.C:
			//读取的数据是一个*stomp.Message类型
			ch <- v
			//如果30秒还没有人发数据的话,就结束
		case <-time.After(time.Second * 30):
			return
		}
	}
}
message:0
message:1
message:2
message:3
message:4
message:5
message:6
message:7
message:8
message:9
原文地址:https://www.cnblogs.com/traditional/p/11193524.html