RABBITMQ(小总结 持续更新...

(一)理解消息通信

1.消息通信概念---消费者、生产者和代理

生产者(producer)创建消息,然后发送到代理服务器(RaabitMQ)。

其中消息包括两部分内容:有效载荷(payload)和标签(label)。

     有效载荷就是你想要传输的数据,它可以是任何内容。

  标签描述了有效载荷,并且RabbitMQ用它来决定谁来将获得消息的拷贝。

RaabitMQ会根据标签将消息发送到感兴趣的接收方。----“发后即忘”的单向通信方式。

消费者(consumer)连接到代理服务器(RaabitMQ)并订阅到队列(queue)上。

这里解释一下“连接”的意思。消费者与生产者都必须首先连接到RaabitMQ,才能消费或发布消息。即生产者或消费者与RaabitMQ代理服务器之间创建一条TCP连接。

一旦TCP连接打开(你通过了认证),生产者或消费者就可以创建一条AMQP信道(channel),信道是建立在“真实的”TCP连接内的虚拟连接。AMQP命令都是通过信道发送出去

的。每条信道都会被指派一个唯一ID(AMQP库会帮你记住ID的)。所以,不论发布消息,订阅队列或是接收消息,这些动作都是通过信道完成的。

总之,RaabitMQ可以看做是软件的路由器。

2.AMQP元素---转发器、队列、绑定

生产者将消息发布到转发器上,消息最终到达队列,并被消费者接收;绑定决定了消息如何从路由器路由到特定的队列。

每个rabbitmq-server叫做一个Broker,等着tcp连接进入。

在rabbit-server进程内有Exchange,定义了这个消息的发送类型。(一对多、直连、多对多等)。

Queue是进程内的逻辑队列,有多个,有名字。

Binding联系Exchange与Queue。

Routingkey由生产者指定。Bidingkey由消费者指定。二者联合决定一条消息的来去。

最基本的rabbitmq连接代码:

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("xxxxxx");

factory.setPort(xxxx);

factory.setUsername("xxx");
factory.setPassword("xxx");

Connection connection =factory.newConnection();

final Channel channel =connection.createChannel();

最后这个channel就可以用来收和发消息了。

声明exchange与queue:

channel.exchangeDeclare("logs","fanout");   
(对列名称 类型 是否持久化,不使用时是否自动删除,是否是内部的(不能被客户端使用),其他参数)
String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); channel.queueDeclare (对列名称,是否持久化,独占的queue(仅供此连接),不使用时是否自动删除,其他参数) channel.queueBind (对列名称,交换机名字,此次绑定使用的路由关键字,其他参数)

  

发出消息:

String message="";   

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());  

  

原文地址:https://www.cnblogs.com/tinmh/p/6138471.html