rabbitmq+java入门(一)hello world

 参考:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

源码:https://github.com/zuzhaoyue/JavaDemo

先决条件

本教程假定RabbitMQ 在标准端口(5672)上的本地主机上安装并运行。如果您使用不同的主机,端口或证书,则连接设置需要进行调整。

介绍

RabbitMQ是一个消息代理:它接受和转发消息。你可以把它想象成一个邮局:当你把你想要发送的邮件放在邮箱里时,你可以确定邮递员最终会将邮件递交你的收件人。与此类似,RabbitMQ既是邮政信箱、又是邮局和邮递员。

RabbitMQ和邮局的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块 - 消息

RabbitMQ和一般的消息传递会使用一些术语:

  • 生产(Producing)意味着发送。一个发送消息的程序就是一个生产者

  • 队列(queue)相当于上面例子里的邮箱尽管消息流经RabbitMQ和您的应用程序,但它们只能存储在队列中一个队列只受主机内存和磁盘限制的约束,它本质上是一个很大的消息缓冲区。可以同时有许多生产者(producer)向一个队列发送消息,当然也可以同时有许多消费者尝试从一个队列接收数据以下是队列的表示方式:

  • 消费(consuming)与接受(receiving)意思差不多。一个消费者的主要功能是等待接收信息:

请注意,生产者,消费者和broker不必驻留在同一主机上; 

在本教程中,我们将用Java编写两个程序; 一个生产者,用于发送单个消息。一个消费者,用于接收消息并将其打印出来。

在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的盒子是一个队列

(P)→[|||]→(C)

知道了上面这些后就可以写代码了~

发出

(P) - > [|||]

我们会调用消息发布者(生产者)来发送,调用消息使用者(消费者) Recv来消费发布者将连接到RabbitMQ,发送一条消息,然后退出。

发布者的代码如下:

/**
 * Created by zuzhaoyue on 18/5/15.
 */
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Send {
    private final static String QUEUE_NAME = "hello1";

    public static void main(String[] argv) throws Exception {
        //创建一个连接服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //为了发送信息,我们必须声明一个队列,然后才可以向这个队列中发消息,这个声明是幂等的,也就是说仅会在这个队列不存在时,才会创建一个队列。

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");

        //最后,我们关闭连接和队列
        channel.close();
        connection.close();
    }

}

 

接收

Rabbitmq会推送消息给消费者,因此与发布单个消息的发布者不同,我们不会关闭消费者,而是让它一直进行以监听消息并将其打印出来。

(C)
 

代码如下

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by zuzhaoyue on 18/5/15.
 */
public class Recv {
    private final static String QUEUE_NAME = "hello1";
    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        //以下的defaultconsumer实现了consumer这个接口,这个接口被用来缓冲服务器推送过来的信息
        //一开始的set up和刚刚的send.java里的相似:1.打开一个连接,2.声明一个队列(这个队列名要和刚刚的队列名相同)
        //注意:我们在这里声明队列,因为我们可能在生产者之前开始消费
        //我们告诉服务器从队列向我们传送消息,既然它会异步传送,我们以对象的形式提供一个回调,来缓冲这些消息,直到我们准备使用它们,这正是defaultconsumer做的事情
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        String  result = channel.basicConsume(QUEUE_NAME, true, consumer);
        System.out.println("result:" + result);

    }
}

执行send.java的main()方法后,访问127.0.0.1:15672

显示:

说明队列中有了一个消息,点开后我们可以看到该消息内容是hello world,如下图:

然后运行recv.java的main()方法,打开消费者,页面显示如下:

该消息已经被成功消费。

原文地址:https://www.cnblogs.com/zuxiaoyuan/p/9041288.html