生产者消费者案例
我们这里展示一个生产者和一个消费者的案例
生产者代码:
package com.layton; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置连接参数 factory.setHost("127.0.0.1");//默认值为localhost factory.setPort(5672);//默认值为5672 factory.setVirtualHost("/layton");//虚拟机默认为/ factory.setUsername("layton");//guest factory.setPassword("layton");//guest //3.获取对应连接connection Connection connection = factory.newConnection(); //4.创建channel Channel channel = connection.createChannel(); //5.创建队列queue,名字有则创建,没有则不会创建 channel.queueDeclare("hello_world", true, false, false, null); //6.发消息 String body = "hello world2"; channel.basicPublish("","hello_world",null,body.getBytes()); //7.释放资源 channel.close(); connection.close(); } } /* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments 参数: queue 队列名称 durable 是否持久化,当mq重启之后还在 exclusive 是否独占:只能有一个消费者监听队列 当connection关闭时是否删除队列 autoDelete 是否自动删除:当没有consumer会自动删除自己 arguments 不讲 */ /* String exchange, String routingKey, BasicProperties props, byte[] body 参数: exchange 交换机的名称 简单模式下交换机会使用默认的 routingKey 路由名称 props 配置信息 body 字节数组 真实的消息数据 */
运行后该生产者就会将消息放到mq里
消费者代码:
package com.layton; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.设置连接参数 factory.setHost("127.0.0.1");//默认值为localhost factory.setPort(5672);//默认值为5672 factory.setVirtualHost("/layton");//虚拟机默认为/ factory.setUsername("layton");//guest factory.setPassword("layton");//guest //3.获取对应连接connection Connection connection = factory.newConnection(); //4.创建channel Channel channel = connection.createChannel(); //5.创建队列queue,名字有则创建,没有则不会创建 channel.queueDeclare("hello_world", true, false, false, null); //6.收消息 Consumer consumer = new DefaultConsumer(channel) { //回调方法,当收到消息后会自动执行该方法 /** * @param consumerTag 标识 * @param envelope 获取一些信息,交换机,路由key * @param properties 配置信息 * @param body 真实的数据 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag = " + consumerTag); System.out.println("envelopeExchange = " + envelope.getExchange()); System.out.println("envelopeRoutingKey = " + envelope.getRoutingKey()); System.out.println("properties = " + properties); System.out.println("body = " + new String(body)); } }; channel.basicConsume("hello_world",true,consumer); //7.不要释放资源 } } /* String queue, boolean autoAck, Consumer callback 参数: queue 队列名 autoAck 是否自动确认,和mq说一声我收到了 callback 回调对象 */
运行完消费者就会从mq里取出消息