RabbitMq

配置文件 

 1 <project xmlns="http://maven.apache.org/POM/4.0.0"
 2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5     <groupId>com.mmr.myrabbitmq</groupId>
 6     <artifactId>myrabbitmq</artifactId>
 7     <version>0.0.1-SNAPSHOT</version>
 8 
 9     <dependencies>
10         <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
11         <dependency>
12             <groupId>com.rabbitmq</groupId>
13             <artifactId>amqp-client</artifactId>
14             <version>4.0.2</version>
15         </dependency>
16         <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
17         <dependency>
18             <groupId>org.slf4j</groupId>
19             <artifactId>slf4j-api</artifactId>
20             <version>2.0.0-alpha1</version>
21         </dependency>
22         <!-- https://mvnrepository.com/artifact/log4j/log4j -->
23         <dependency>
24             <groupId>log4j</groupId>
25             <artifactId>log4j</artifactId>
26             <version>1.2.17</version>
27         </dependency>
28         <!-- https://mvnrepository.com/artifact/junit/junit -->
29         <dependency>
30             <groupId>junit</groupId>
31             <artifactId>junit</artifactId>
32             <version>4.12</version>
33             <scope>test</scope>
34         </dependency>
35 
36     </dependencies>
37 
38 
39 </project>
View Code

L04____简单队列__0101_Send

 1 package com.mmr.rabbitmq.utils;
 2  3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5  6 import com.rabbitmq.client.AMQP.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 10 public class L04____简单队列__0101_Send {
11     public static String queueName =  "Hello";
12     public static void main(String[] args) {
13         // TODO 自动生成的方法存根
14         ConnectionFactory factory = new ConnectionFactory();
15         factory.setHost("118.24.13.38");
16         factory.setPort(5672);
17         factory.setVirtualHost("/vhost");
18         factory.setUsername("guest");
19         factory.setPassword("guest");
20         Connection connection = null;
21         com.rabbitmq.client.Channel channel = null;
22         
23         
24         
25         
26         try {
27             connection = factory.newConnection();
28         } catch (IOException e) {
29             // TODO 自动生成的 catch 块
30             e.printStackTrace();
31         } catch (TimeoutException e) {
32             // TODO 自动生成的 catch 块
33             e.printStackTrace();
34         }
35         
36         String msgString = "Hello aaa kkkk坎坎坷坷 SSD对对对  000kk超出";
37         if(connection != null) {
38             try {
39                 channel = connection.createChannel();
40                 if(channel != null) {
41                     channel.queueDeclare(queueName, false, false, false, null);
42                     channel.basicPublish("", queueName, null, msgString.getBytes());
43                     System.out.println("发送完成: " + msgString);
44                 }
45             } catch (IOException e) {
46                 // TODO 自动生成的 catch 块
47                 e.printStackTrace();
48             }
49         }
50         
51 52         
53         
54         try {
55             channel.close();
56             connection.close();
57         } catch (IOException e) {
58             // TODO 自动生成的 catch 块
59             e.printStackTrace();
60         } catch (TimeoutException e) {
61             // TODO 自动生成的 catch 块
62             e.printStackTrace();
63         }
64         
65     }
66 67 }
68
View Code

L04____简单队列__0102_Recv

 1 package com.mmr.rabbitmq.utils;
 2  3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5  6 import com.rabbitmq.client.AMQP.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 import com.rabbitmq.client.Consumer;
10 import com.rabbitmq.client.QueueingConsumer;
11 import com.rabbitmq.client.QueueingConsumer.Delivery;
12 13 public class L04____简单队列__0102_Recv {
14     public static String queueName =  "Hello";
15     public static void main(String[] args) {
16         // TODO 自动生成的方法存根
17         ConnectionFactory factory = new ConnectionFactory();
18         factory.setHost("118.24.13.38");
19         factory.setPort(5672);
20         factory.setVirtualHost("/vhost");
21         factory.setUsername("guest");
22         factory.setPassword("guest");
23         Connection connection = null;
24         com.rabbitmq.client.Channel channel = null;
25         
26         
27         
28         
29         try {
30             connection = factory.newConnection();
31         } catch (IOException e) {
32             // TODO 自动生成的 catch 块
33             e.printStackTrace();
34         } catch (TimeoutException e) {
35             // TODO 自动生成的 catch 块
36             e.printStackTrace();
37         }
38         
39         
40         if(connection != null) {
41             try {
42                 channel = connection.createChannel();
43                 if(channel != null) {
44                     QueueingConsumer consumer = new QueueingConsumer(channel);
45                     // 监听队列
46                     channel.basicConsume(queueName, true, consumer);
47                     while(true) {
48                         System.out.println("----");
49                         Delivery delivery = consumer.nextDelivery();
50                         String msgString = new String(delivery.getBody(), "utf-8");
51                         System.out.println("【rcv】 msg:" + msgString);
52                     }
53                 }
54             } catch (Exception e) {
55                 // TODO 自动生成的 catch 块
56                 e.printStackTrace();
57             }
58         }
59         
60 61         
62         
63         try {
64             channel.close();
65             connection.close();
66         } catch (IOException e) {
67             // TODO 自动生成的 catch 块
68             e.printStackTrace();
69         } catch (TimeoutException e) {
70             // TODO 自动生成的 catch 块
71             e.printStackTrace();
72         }
73         
74     }
75 76 }
View Code

L04____简单队列__0103_Recv

package com.mmr.rabbitmq.utils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class L04____简单队列__0103_Recv {
    public static String queueName =  "Hello";
    public static void main(String[] args) {
        // TODO 自动生成的方法存根
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("118.24.13.38");
        factory.setPort(5672);
        factory.setVirtualHost("/vhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = null;
        com.rabbitmq.client.Channel channel = null;
        
        
        
        
        try {
            connection = factory.newConnection();
        } catch (IOException e) {
            // TODO 自动生成的 catch 块
            e.printStackTrace();
        } catch (TimeoutException e) {
            // TODO 自动生成的 catch 块
            e.printStackTrace();
        }
        
        
        if(connection != null) {
            try {
                channel = connection.createChannel();
                if(channel != null) {
                    //创建频道
                    channel.queueDeclare(queueName, false, false, false, null);
                    DefaultConsumer consumer = new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                                byte[] body) throws IOException {
                            // TODO 自动生成的方法存根
                            //super.handleDelivery(consumerTag, envelope, properties, body);
                            String msgString = new String(body, "utf-8");
                            System.out.println("------------");
                            System.out.println("【rcv】 msg:" + msgString);
                            
                        }
                    };
                    //频道监听
                    channel.basicConsume(queueName, true, consumer);
                }
            } catch (Exception e) {
                // TODO 自动生成的 catch 块
                e.printStackTrace();
            }
        }
        

        
        
        /*
         * try { channel.close(); connection.close(); } catch (IOException e) { // TODO
         * 自动生成的 catch 块 e.printStackTrace(); } catch (TimeoutException e) { // TODO
         * 自动生成的 catch 块 e.printStackTrace(); }
         */
        
    }

}
View Code

L05____WorkQueue__0101_Send

 1 package com.mmr.rabbitmq.utils;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import com.rabbitmq.client.AMQP.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 
10 
11 /**
12  *  轮巡分发
13  * @author Administrator
14  *
15  */
16 public class L05____WorkQueue__0101_Send {
17     public static String queueName =  "Hello";
18     public static void main(String[] args) {
19         // TODO 自动生成的方法存根
20         ConnectionFactory factory = new ConnectionFactory();
21         factory.setHost("118.24.13.38");
22         factory.setPort(5672);
23         factory.setVirtualHost("/vhost");
24         factory.setUsername("guest");
25         factory.setPassword("guest");
26         Connection connection = null;
27         com.rabbitmq.client.Channel channel = null;
28         
29         
30         
31         
32         try {
33             connection = factory.newConnection();
34         } catch (IOException e) {
35             // TODO 自动生成的 catch 块
36             e.printStackTrace();
37         } catch (TimeoutException e) {
38             // TODO 自动生成的 catch 块
39             e.printStackTrace();
40         }
41         
42         
43         if(connection != null) {
44             try {
45                 channel = connection.createChannel();
46                 if(channel != null) {
47                     channel.queueDeclare(queueName, false, false, false, null);
48                     for(int i=0; i<50; i++) {
49                         String msgString = "Hello aaa kkkk坎坎坷坷 SSD对对对  000kk超出";
50                         msgString = msgString + " " + i;
51                         channel.basicPublish("", queueName, null, msgString.getBytes());
52                         System.out.println("发送完成: " + msgString);
53                     }
54                     
55                 }
56             } catch (IOException e) {
57                 // TODO 自动生成的 catch 块
58                 e.printStackTrace();
59             }
60         }
61         
62 
63         
64         
65         try {
66             channel.close();
67             connection.close();
68         } catch (IOException e) {
69             // TODO 自动生成的 catch 块
70             e.printStackTrace();
71         } catch (TimeoutException e) {
72             // TODO 自动生成的 catch 块
73             e.printStackTrace();
74         }
75         
76     }
77 
78 }
View Code

 

原文地址:https://www.cnblogs.com/wujianbo123/p/12112067.html