RabbitMQ Hello world(二)

简介:

Rabbitmq 是消息代理中间件,它接收或者发送消息。你可以把它想想宬一个邮局:当你把邮件放到邮箱时,你可以确定某一位邮递员可以准确的把邮件送到收件人手中,在这个比喻中,rabbitmq是一个邮箱,邮局及邮递员。
Rabbitmq和邮局的区别是,它不处理纸张,它接收,存储,转发二进制文件消息。
 

Rabbitmq 消息服务中几个术语。 

  • 生产者:发送消息。消息的产生者。
  • 队列:在Rabbitmq内部,如同邮箱。尽管消息流转通过RabbitMq和你的应用,但是只被存储在对列里。对列只能绑定到主机内存并且磁盘限制,本质上是一个大的消息缓存。生产者发送消息到对列中,消费者从对列中接收数据。
  • 消费者:接收消息,大部分接收程序等待接收消息。
 

Hello world

Maven
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.3</version>
  </dependency> 
RabitMqConnectionConfig.java
package com.rabitmq.config;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabitMqConnectionConfig {
    
    //MQ 连接地址
    private static final String RABIT_MQ_HOST = "localhost";
    
    private static final int RABIT_MQ_PORT = 5672;
    
    private static final String RABIT_MQ_USER_NAME = "guest";
    
    private static final String RABIT_MQ_USER_PASSWD = "guest";
    
//    private static final String RABIT_TEST_EXCHANGE = "test_exchange";
    
    public static final String RABIT_TEST_QUEUE = "test_rabit_mq";
    
    /**
     * rabbitmq 连接器
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RABIT_MQ_HOST);
        connectionFactory.setPort(RABIT_MQ_PORT);
        connectionFactory.setUsername(RABIT_MQ_USER_NAME);
        connectionFactory.setPassword(RABIT_MQ_USER_PASSWD);
        return connectionFactory.newConnection();
    }
    
    /**
     * 
     * @param connection
     * @throws IOException
     */
    public static void closeConnection(Connection connection) throws IOException {
        if(connection != null) {
            connection.close();
        }
    }
}
RabitMqProducer.java
package com.rabitmq.producer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabitmq.config.RabitMqConnectionConfig;

public class RabitMqProducer {
    
    //发布者名称
    private String producerName;
    
    public RabitMqProducer(String producerName) {
        super();
        this.producerName = producerName;
    }


    public void send(String message) {

        Connection connection = null;
        Channel channel = null;
        try {
            connection = RabitMqConnectionConfig.getConnection();
            channel = connection.createChannel();            
            Map<String, Object> map = new HashMap<String, Object>();
            map.put("x-queue-type", "classic");
            channel.queueDeclare(RabitMqConnectionConfig.RABIT_TEST_QUEUE, true, false, false, map);
//            channel.queueBind(RABIT_TEST_QUEUE, RABIT_TEST_EXCHANGE, "");
//            String sendMessage = "producerName:" + this.producerName + ",message:"+message;
            for (int i = 0; i < 1000;i++) {
                String sendMessage = "producerName:" + this.producerName + ",message:"+message + "----"+i;
                channel.basicPublish("", RabitMqConnectionConfig.RABIT_TEST_QUEUE, null, sendMessage.getBytes("utf-8"));
                System.out.println("sendMessage:["+ sendMessage +"]");
            }
            
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            if(channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if(connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    
    public static void main(String[] args) {
        String message = "你好棒呀";
        new RabitMqProducer("pro_aaa").send(message);
        new RabitMqProducer("pro_bbb").send(message);
        new RabitMqProducer("pro_ccc").send(message);
        new RabitMqProducer("pro_ddd").send(message);
    }
}
RabitMqConsumer.java
package com.rabitmq.consumer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabitmq.config.RabitMqConnectionConfig;

public class RabitMqConsumer {
    
    public RabitMqConsumer() {
        
    }

    public String receive(final String consumerName) {
        System.out.println( consumerName + "  receive start" );
        Connection connection =null;
        Channel channel = null;
        try {
            connection = RabitMqConnectionConfig.getConnection();
            channel = connection.createChannel();
            Map<String, Object> args = new HashMap<>();
            args.put("x-queue-type", "classic");
            channel.queueDeclare(RabitMqConnectionConfig.RABIT_TEST_QUEUE, true, false, false, args);
            channel.basicConsume(RabitMqConnectionConfig.RABIT_TEST_QUEUE, true, new DeliverCallback() {
                
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    String messageStr = new String(message.getBody(),"utf-8");
                    System.out.println("consumerName :" + consumerName + " ,deliver call back" + consumerTag + ",message:" + messageStr);
                }
            }, new CancelCallback() {
                
                @Override
                public void handle(String consumerTag) throws IOException {
                    System.out.println("consumerName :" + consumerName + " ,Cancel call back :" + consumerTag);
                }
            });
            
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
//            if(channel != null) {
//                try {
//                    channel.close();
//                } catch (IOException e) {
//                    e.printStackTrace();
//                } catch (TimeoutException e) {
//                    e.printStackTrace();
//                }
//            }
//            if(connection != null) {
//                try {
//                    connection.close();
//                } catch (IOException e) {
//                    e.printStackTrace();
//                }
//            }

        }
        
        
        return "";
    }
    
    public static void main(String[] args) {
        new RabitMqConsumer().receive("consumer_aaa");
        new RabitMqConsumer().receive("consumer_bbb");
        new RabitMqConsumer().receive("consumer_ccc");
        new RabitMqConsumer().receive("consumer_ddd");
    }
}
 
 
 
 
 
 
 
 
 
 
 

原文地址:https://www.cnblogs.com/--net/p/12806820.html