




// Command pubsub is an example of a fanout exchange with dynamic reliable
// membership, reading from stdin, writing to stdout.
// This example shows how to implement reconnect logic independent from a
// publish/subscribe loop with bridges to application types.

package main

import (


var url = flag.String("url", "amqp://guest:guest@", "AMQP url for both the publisher and subscriber")

// exchange binds the publishers to the subscribers
const exchange = "pubsub"

// message is the application type for a message.  This can contain identity,
// or a reference to the recevier chan for further demuxing.
type message []byte

// session composes an amqp.Connection with an amqp.Channel
type session struct {

// Close tears the connection down, taking the channel with it.
func (s session) Close() error {
	if s.Connection == nil {
		return nil
	return s.Connection.Close()

// redial continually connects to the URL, exiting the program when no longer possible
func redial(ctx context.Context, url string) chan chan session {
	sessions := make(chan chan session)

	go func() {
		sess := make(chan session)
		defer close(sessions)

		for {
			select {
			case sessions <- sess:
			case <-ctx.Done():
				log.Println("shutting down session factory")

			conn, err := amqp.Dial(url)
			if err != nil {
				log.Fatalf("cannot (re)dial: %v: %q", err, url)

			ch, err := conn.Channel()
			if err != nil {
				log.Fatalf("cannot create channel: %v", err)

			if err := ch.ExchangeDeclare(exchange, "fanout", false, true, false, false, nil); err != nil {
				log.Fatalf("cannot declare fanout exchange: %v", err)

			select {
			case sess <- session{conn, ch}:
			case <-ctx.Done():
				log.Println("shutting down new session")

	return sessions

// publish publishes messages to a reconnecting session to a fanout exchange.
// It receives from the application specific source of messages.
func publish(sessions chan chan session, messages <-chan message) {
	for session := range sessions {
		var (
			running bool
			reading = messages
			pending = make(chan message, 1)
			confirm = make(chan amqp.Confirmation, 1)

		pub := <-session

		// publisher confirms for this channel/connection
		if err := pub.Confirm(false); err != nil {
			log.Printf("publisher confirms not supported")
			close(confirm) // confirms not supported, simulate by always nacking
		} else {


		for {
			var body message
			select {
			case confirmed, ok := <-confirm:
				if !ok {
					break Publish
				if !confirmed.Ack {
					log.Printf("nack message %d, body: %q", confirmed.DeliveryTag, string(body))
				reading = messages

			case body = <-pending:
				routingKey := "ignored for fanout exchanges, application dependent for other exchanges"
				err := pub.Publish(exchange, routingKey, false, false, amqp.Publishing{
					Body: body,
				// Retry failed delivery on the next session
				if err != nil {
					pending <- body
					break Publish

			case body, running = <-reading:
				// all messages consumed
				if !running {
				// work on pending delivery until ack'd
				pending <- body
				reading = nil

// identity returns the same host/process unique string for the lifetime of
// this process so that subscriber reconnections reuse the same queue name.
func identity() string {
	hostname, err := os.Hostname()
	h := sha1.New()
	fmt.Fprint(h, hostname)
	fmt.Fprint(h, err)
	fmt.Fprint(h, os.Getpid())
	return fmt.Sprintf("%x", h.Sum(nil))

// subscribe consumes deliveries from an exclusive queue from a fanout exchange and sends to the application specific messages chan.
func subscribe(sessions chan chan session, messages chan<- message) {
	queue := identity()

	for session := range sessions {
		sub := <-session

		if _, err := sub.QueueDeclare(queue, false, true, true, false, nil); err != nil {
			log.Printf("cannot consume from exclusive queue: %q, %v", queue, err)

		routingKey := "application specific routing key for fancy toplogies"
		if err := sub.QueueBind(queue, routingKey, exchange, false, nil); err != nil {
			log.Printf("cannot consume without a binding to exchange: %q, %v", exchange, err)

		deliveries, err := sub.Consume(queue, "", false, true, false, false, nil)
		if err != nil {
			log.Printf("cannot consume from: %q, %v", queue, err)


		for msg := range deliveries {
			messages <- message(msg.Body)
			sub.Ack(msg.DeliveryTag, false)

// read is this application's translation to the message format, scanning from
// stdin.
func read(r io.Reader) <-chan message {
	lines := make(chan message)
	go func() {
		defer close(lines)
		scan := bufio.NewScanner(r)
		for scan.Scan() {
			lines <- message(scan.Bytes())
	return lines

// write is this application's subscriber of application messages, printing to
// stdout.
func write(w io.Writer) chan<- message {
	lines := make(chan message)
	go func() {
		for line := range lines {
			fmt.Fprintln(w, string(line))
	return lines

func main() {

	ctx, done := context.WithCancel(context.Background())

	go func() {
		publish(redial(ctx, *url), read(os.Stdin))

	go func() {
		subscribe(redial(ctx, *url), write(os.Stdout))






  1. 发布一条消息存进内部维护的数据仓库,但是他是切片


  2. rabitmq思想看着很高级,但是里面维护了太多的复杂数据管道传递

  3. go的这里给他停掉重启有严重的bug

  4. 下篇文章测试并发,看消息时间延时
