RABBITMQ(Maven)

简单模式(一个生产者,一个队列,一个消费者)

1.导入jar包

              <dependency>

                     <groupId>com.rabbitmq</groupId>

                     <artifactId>amqp-client</artifactId>

                     <version>3.4.1</version>

              </dependency>

2.创建rabbitmq连接的工具类

package com.rabbitmq.util;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

/**

 *   创建rabbitmq连接的工具类

 * @author KFS

 *

 */

public class ConnectionUtil {

      

       public static Connection getConnection() throws Exception{

              //创建连接工厂

              ConnectionFactory connectionFactory=new ConnectionFactory();

              //设置参数

              connectionFactory.setHost("127.0.0.1");//主机ip

              connectionFactory.setVirtualHost("/taotao");//虚拟主机名

              connectionFactory.setUsername("admin");//账号

              connectionFactory.setPassword("admin");//密码

              //创建连接

              Connection newConnection = connectionFactory.newConnection();

              return newConnection;

       }

}

3.simple模式发送消息

package com.simple.rabbitmq;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   simple模式发送消息

 * @author KFS

 *

 */

public class Send {

       public static void main(String[] args) throws Exception{

              //通过rabbitmq工具类得到连接

              Connection connection=ConnectionUtil.getConnection();

              //创建通道

              Channel channel = connection.createChannel();

              /*

               *   创建消息队列(如果有可以不用创建,但创建会覆盖之前的)

               *   第一参数:队列名称

               *   第二参数:队列是否持久化(存储到磁盘)

               *   第三参数:队列是否被独占

               *   第四参数:队列是否自动删除

               *   第五参数:

               */

              channel.queueDeclare("test_simple_queue", false, false, false, null);

              //创建消息

              String message="simple_queue";

              /*

               *   发送消息

               *   第一参数:交换机名(简单模式不用交换机,但不能用null)

               *   第二参数:队列名称

               *   第三参数:

               *   第四参数:消息(字节流)

               *

               */

              channel.basicPublish("", "test_simple_queue", null, message.getBytes());

              System.out.println("发送的消息:"+message);

              //关闭资源

              channel.close();

              connection.close();

       }

}

4.simple模式接受消息

package com.simple.rabbitmq;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   simple模式接受消息

 * @author KFS

 *

 */

public class Receive {

       public static void main(String[] args) throws Exception{

              //通过rabbitmq工具类得到连接

              Connection connection=ConnectionUtil.getConnection();

              //创建通道

              Channel channel = connection.createChannel();

              /*

               *   创建消息队列(如果有可以不用创建,但创建会覆盖之前的)

               *   第一参数:队列名称

               *   第二参数:队列是否持久化(存储到磁盘)

               *   第三参数:队列是否被独占

               *   第四参数:队列是否自动删除

               *   第五参数:

               */

              channel.queueDeclare("test_simple_queue", false, false, false, null);

              //定义消费者

              QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

              /*

               *   监听队列

               *   第一参数:队列名称

               *   第二参数:是否自动回复完成接受

               *   第三参数:消费者名称

               */

              channel.basicConsume("test_simple_queue",true, queueingConsumer);

             

              while(true) {

                     //获取消息

                     Delivery nextDelivery = queueingConsumer.nextDelivery();

                     //打印消息

                     String message=new String(nextDelivery.getBody());

                     System.out.println(message);

              }

             

       }

}

work模式(一个生产者,一个队列,多个消费者)

1.work模式发送消息

package com.work.rabbitmq;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   work模式发送消息

 * @author KFS

 *

 */

public class Send {

       public static void main(String[] args) throws Exception{

              //通过rabbitmq连接工具类得到连接

              Connection connection=ConnectionUtil.getConnection();

              //创建通道

              Channel channel = connection.createChannel();

              /*

               * 创建消息队列(如果有就不用创建,但创建不会错)

               * 第一参数:队列名称

               * 第二参数:该队列是否持久化

               * 第三参数:该队列是否被独占

               * 第四参数:该队列是否自动删除

               * 第五参数:

               *

               */

              channel.queueDeclare("test_work_queue", false, false, false, null);

              for(int i=1;i<=100;i++) {

                     //创建消息

                     String message="work_queue"+i;

                     /*

                      * 发送消息

                      * 第一参数:交换机名

                      * 第二参数:队列名称

                      * 第三参数:

                      * 第四参数:消息(字节流)

                      *

                      */

                     channel.basicPublish("", "test_work_queue", null, message.getBytes());

                     System.out.println("work_发送的消息:"+message);

              }

              //关闭资源

              channel.close();

              connection.close();

       }

}

a)      work模式平均接受消息(一个就是一个消费者,可以创建多个消费者,不过是平均分配消息。)

package com.work.rabbitmq;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   work模式接受消息1

 * @author KFS

 *

 */

public class Receive1 {

       public static void main(String[] args) throws Exception{

              //得到连接

              Connection connection=ConnectionUtil.getConnection();

              //创建通道

              Channel channel = connection.createChannel();

              /*

               * 创建消息队列(如果有就不用创建,但创建不会错)

               * 第一参数:队列名称

               * 第二参数:该队列是否持久化

               * 第三参数:该队列是否被独占

               * 第四参数:该队列是否自动删除

               * 第五参数:

               *

               */

              channel.queueDeclare("test_work_queue", false, false, false, null);

              //创建队列消费者

              QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

              /*

               * 监听队列中的内容

               * 第一参数:队列名称

               * 第二参数:是否自动回复

               *

               */

              channel.basicConsume("test_work_queue",true, queueingConsumer);

             

              //获取消息

              while(true) {

                     Delivery nextDelivery = queueingConsumer.nextDelivery();

                     String message=new String(nextDelivery.getBody());

                     Thread.sleep(1000);

                     System.out.println("接受消息:"+message);

              }

       }

}

b)      work模式能者多劳接受消息(一个生产者,一个队列,可以创建多个消费者,不过这是能者多劳的)

package com.work.rabbitmq;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   work模式接受消息1

 * @author KFS

 *

 */

public class Receive1 {

       public static void main(String[] args) throws Exception{

              //得到连接

              Connection connection=ConnectionUtil.getConnection();

              //创建通道

              Channel channel = connection.createChannel();

              /*

               * 创建消息队列(如果有就不用创建,但创建不会错)

               * 第一参数:队列名称

               * 第二参数:该队列是否持久化

               * 第三参数:该队列是否被独占

               * 第四参数:该队列是否自动删除

               * 第五参数:

               *

               */

              channel.queueDeclare("test_work_queue", false, false, false, null);

              //创建队列消费者

              QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

              //设置同一时刻只会发送一条消息给消费者(要放在监听队列内容之上)

              channel.basicQos(1);

              /*

               * 监听队列中的内容

               * 第一参数:队列名称

               * 第二参数:是否自动回复(能者多劳需要手动回复)

               *

               */

              channel.basicConsume("test_work_queue",false, queueingConsumer);

             

             

              //获取消息

              while(true) {

                     Delivery nextDelivery = queueingConsumer.nextDelivery();

                     String message=new String(nextDelivery.getBody());

                     Thread.sleep(10);

                     System.out.println("接受消息:"+message);

                     //手动回复

                     channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false);

              }

       }

}

订阅模式,交换机,exchange:fanout(一个生产者,一个交换机,多个队列,多个消费者)

1.订阅模式发送消息

package com.fanout.rabbitmq;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   订阅模式(exchange,交换机)发送消息

 * @author KFS

 *

 */

public class Send {

       public static void main(String[] args) throws Exception{

              //通过rabbitmq连接工具类得到连接

              Connection connection=ConnectionUtil.getConnection();

              //创建通道

              Channel channel = connection.createChannel();

              /*

               * 创建交换机exchange

               * 第一参数:交换机名称

               * 第二参数:交换机类型:

               *

               */

              channel.exchangeDeclare("test_fanout", "fanout");

             

              //消息内容

              String message="testFanout";

              /*

               * 发送消息

               * 第一参数:交换机名称

               * 第二参数:

               * 第三参数:

               * 第四参数:消息(字节流)

               *

               */

              channel.basicPublish("test_fanout", "", null, message.getBytes());

              System.out.println("发送消息:"+message);

             

              //关闭资源

              channel.close();

              connection.close();

       }

}

2.订阅模式接收消息(一个生产者,一个交换机,多个队列,队列名称不同,多个消费者,消费者都能收到消息)

package com.fanout.rabbitmq;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   订阅模式接受消息

 * @author KFS

 *

 */

public class Receive1 {

       public static void main(String[] args) throws Exception{

              //通过rabbitmq连接工具类得到连接

              Connection connection=ConnectionUtil.getConnection();

              //创建通道

              Channel channel = connection.createChannel();

              /*

               * 创建交换机exchange

               * 第一参数:交换机名称

               * 第二参数:交换机类型:

               *

               */

              channel.exchangeDeclare("test_fanout", "fanout");

              //创建队列

              channel.queueDeclare("test_fanout_queue1", false, false, false, null);

              /*

               * 绑定队列到交换机

               * 第一参数:队列名称

               * 第二参数:交换机名称

               * 第三参数:

               *

               */

              channel.queueBind("test_fanout_queue1", "test_fanout", "");

        // 同一时刻服务器只会发一条消息给消费者

        channel.basicQos(1);

       

        //定义消费者

        QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

        //监听队列

        channel.basicConsume("test_fanout_queue1", false,queueingConsumer);

       

        while(true) {

               //获取消息

               Delivery nextDelivery = queueingConsumer.nextDelivery();

               String message=new String(nextDelivery.getBody());

               System.out.println(message);

               Thread.sleep(10);

              

               //手动回复完成

               channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false);

        }

       

       

       }

}

路由模式,exchange:direct(和订阅模式有相同点,一个生产者,一个交换机,多个队列,不同的队列名,多个消费者)

1.路由模式发送消息

package com.direct.rabbitmq;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   路由模式redirect发送消息

 * @author KFS

 *

 */

public class Send {

       public static void main(String[] args) throws Exception{

              //1.通过rabbitmq连接工具类创建连接

              Connection connection = ConnectionUtil.getConnection();

              //2通过连接.创建通道

              Channel channel = connection.createChannel();

              /*

               * 3.创建交换机

               * 第一参数:交换机名称

               * 第二参数:交换机类型

               *

               */

              channel.exchangeDeclare("test_direct_exchange", "direct");

             

              String message="test_direct";

              /*

               * 4.发送消息

               * 第一参数:交换机名称

               * 第二参数:钥匙(接受方的是这个的才能接受)

               * 第三参数:

               * 第四参数:消息(字节流)

               *

               */

              channel.basicPublish("test_direct_exchange", "key1", null, message.getBytes());

              System.out.println("发送消息:"+message);

             

              //关闭资源

              channel.close();

              connection.close();

       }

}

2.路由模式接收消息(一个生产者,一个交换机,多个队列,队列名称不同,多个消费者,消费者都能收到消息)

package com.direct.rabbitmq;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   路由模式接收消息1

 * @author KFS

 *

 */

public class Receive1 {

       public static void main(String[] args) throws Exception{

              //1.通过rabbitmq连接工具类创建连接

              Connection connection = ConnectionUtil.getConnection();

              //2.通过连接创建通道

              Channel channel = connection.createChannel();

              /*

               * 3.通过通道创建队列

               *

               */

              channel.queueDeclare("test_direct_queue1", false, false, false, null);

              /*

               * 4.绑定队列和交换机

               * 第一参数:队列名称

               * 第二参数:交换机名称

               * 第三参数:钥匙

               *

               */

              channel.queueBind("test_direct_queue1", "test_direct_exchange", "key1");

              //同一时刻服务器只发一条消息

              channel.basicQos(1);

             

              //定义队列消费者

              QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

              //监听队列,设置手动回复

              channel.basicConsume("test_direct_queue1",false, queueingConsumer);

             

              //获取消息

              Delivery nextDelivery = queueingConsumer.nextDelivery();

              String message=new String(nextDelivery.getBody());

              System.out.println(message);

             

              //手动回复

              channel.basicAck(nextDelivery.getEnvelope().getDeliveryTag(), false);

       }

}

通配符模式(一个发送者,一个交换机,多个队列,多个消费者)

1.通配符发送消息

package com.topic.rabbitmq;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   通配符模式topic发送消息

 * @author KFS

 *

 */

public class Send {

       public static void main(String[] args) throws Exception{

              //1.通过rabbitmq连接工具类创建连接

              Connection connection = ConnectionUtil.getConnection();

              //2.通过连接创建通道

              Channel channel = connection.createChannel();

              //3.通过通道创建交换机

              channel.exchangeDeclare("test_topic_exchange", "topic");

              String message="test_topic";

              //4.发送消息

              channel.basicPublish("test_topic_exchange", "key.1", null, message.getBytes());

              System.out.println("发送消息:"+message);

             

              //关闭资源

              channel.close();

              connection.close();

       }

}

2.通配符接收消息

package com.topic.rabbitmq;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

import com.rabbitmq.util.ConnectionUtil;

/**

 *   通配符模式topic接收消息1

 * @author KFS

 *

 */

public class Receive1 {

       public static void main(String[] args) throws Exception{

              //1.通过rabbitmq连接工具类创建连接

              Connection connection = ConnectionUtil.getConnection();

              //2.通过连接创建通道

              Channel channel = connection.createChannel();

              //3.通过通道创建队列

              channel.queueDeclare("test_topic_queue1", false, false, false, null);

              //4.通过通道绑定队列

              channel.queueBind("test_topic_queue1", "test_topic_exchange", "key.*");

              //同一时刻服务器只发送一条消息

              channel.basicQos(1);

              //5.通过通道创建消费者

              QueueingConsumer queueingConsumer=new QueueingConsumer(channel);

              //6.监听队列

              channel.basicConsume("test_topic_queue1", queueingConsumer);

             

              //7.接收消息

              Delivery nextDelivery = queueingConsumer.nextDelivery();

              String message=new String(nextDelivery.getBody());

              System.out.println(message);

             

       }

}

RabbitMQ和Spring整合

1.监听者类方法

package com.spring.rabbitmq;

/**

 *   监听类方法

 * @author KFS

 *

 */

public class SpringReceive1 {

       public void receive1(String msg) {

              System.out.println("接收消息:"+msg);

       }

}

2.spring和rabbitmq整合xml:applicationContent-rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xmlns:rabbit="http://www.springframework.org/schema/rabbit"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

              http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

      

       <!-- 发送者 -->

       <!-- 定义rabbitmq连接工厂 -->

       <rabbit:connection-factory id="factory" host="127.0.0.1" port="5672" virtual-host="/taotao" username="admin" password="admin"/>

      

       <!-- 创建rabbitmq管理 -->

       <rabbit:admin connection-factory="factory"/>

       <!-- 创建交换机: -->

       <rabbit:fanout-exchange name="springExchange" auto-declare="true">

       </rabbit:fanout-exchange>

       <!-- 创建rabbitmq模板 -->

       <rabbit:template id="template" exchange="springExchange" connection-factory="factory"/>

      

      

       <!-- 接收者 -->

       <!-- 定义队列:队列和交换机绑定去网页上绑定 -->

       <rabbit:queue name="fanoutQueue" auto-declare="true"/>

      

       <!-- 队列监听 -->

       <rabbit:listener-container connection-factory="factory">

              <rabbit:listener ref="springReceive1" method="receive1" queue-names="fanoutQueue"/>

       </rabbit:listener-container>

       <!-- 定义监听类 -->

       <bean id="springReceive1" class="com.spring.rabbitmq.SpringReceive1"/>

      

</beans>

3.发送和接收代码

package com.spring.rabbitmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.context.support.AbstractApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

/**

 *   rabbitmq和spring整合发送消息和监听接收消息

 * @author KFS

 *

 */

public class SpringSend {

       public static void main(String[] args) throws Exception{

              //读取xml文件

              AbstractApplicationContext applicationContext=new ClassPathXmlApplicationContext("classpath:applicationContent-rabbitmq.xml");

              //获取rabbitmq模板对象

              RabbitTemplate rabbitTemplate=(RabbitTemplate)applicationContext.getBean("template");

              //发送消息

              rabbitTemplate.convertAndSend("spring_send");

              //监听者会一直监听

             

              //休眠

              Thread.sleep(1000);

              //摧毁容器

              applicationContext.destroy();

       }

}

RabbitMQ的使用(后台系统修改,删除,添加商品会触发消息队列,在搜索系统接收消息在进行相应操作)

1.后台系统

a)      需要的rabbitmq与spring的整合文件:applicationContext-rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xmlns:rabbit="http://www.springframework.org/schema/rabbit"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

              http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

       <!-- rabbitmq生产者 -->

       <!-- 配置连接工厂 -->

       <rabbit:connection-factory id="factory" host="127.0.0.1" port="5672" virtual-host="/taotao" username="admin" password="admin"/>

      

       <!-- MQ的管理器:管理队列和交换机 -->

       <rabbit:admin connection-factory="factory"/>

       <!-- 定义交换机 -->

       <rabbit:topic-exchange name="taotao_item_topic" auto-declare="true">

       </rabbit:topic-exchange>

       <!-- rabbitmq模板 -->

       <rabbit:template id="rabbitTemplate" connection-factory="factory" exchange="taotao_item_topic"></rabbit:template>

      

</beans>

b)      商品业务层代码

package com.taotao.manage.service;

import java.util.Date;

import java.util.HashMap;

import java.util.Map;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import com.fasterxml.jackson.core.JsonProcessingException;

import com.fasterxml.jackson.databind.ObjectMapper;

import com.taotao.manage.bean.Item;

import com.taotao.manage.bean.ItemDesc;

import com.taotao.manage.bean.ItemParamItem;

@Service

public class ItemServiceImp extends BaseServiceImp<Item> implements ItemService{

       @Autowired

       private ItemDescService itemDescService;

       @Autowired

       private ItemParamItemService itemParamItemService;

       @Autowired

       private RabbitTemplate rabbitTemplate;

       private ObjectMapper objectMapper=new ObjectMapper();

      

       /**

        *   添加商品和商品描述

        * @param item

        * @param desc

        */

       @Override

       public void insertItem(Item item, String desc,String itemParams) {

              item.setStatus(1);

              this.insert(item);

             

              ItemDesc record=new ItemDesc();

              record.setItemDesc(desc);

              record.setItemId(item.getId());

              itemDescService.insert(record);

             

              ItemParamItem itemParamItem=new ItemParamItem();

              itemParamItem.setItemId(item.getId());

              itemParamItem.setParamData(itemParams);

              itemParamItemService.insert(itemParamItem);

             

              //以下是消息队列

              Map<String, Object> message=new HashMap<>();

              message.put("type", "item.insert");

              message.put("date", new Date());

              try {

                     message.put("item", objectMapper.writeValueAsString(item));

                     message.put("itemDesc", objectMapper.writeValueAsString(record));

                     message.put("itemParamItem", objectMapper.writeValueAsString(itemParamItem));

                     rabbitTemplate.convertAndSend("item.insert", message);

              } catch (JsonProcessingException e) {

                     e.printStackTrace();

              }

             

       }

       /**

        *   修改商品和商品描述

        */

       @Override

       public void updateItem(Item item, String desc,String itemParams,Long itemParamId) {

              item.setStatus(1);

              this.update(item);

             

              ItemDesc itemDesc=new ItemDesc();

              itemDesc.setItemId(item.getId());

              itemDesc.setItemDesc(desc);

              itemDescService.update(itemDesc);

             

              ItemParamItem itemParamItem=new ItemParamItem();

              itemParamItem.setId(itemParamId);

              itemParamItem.setParamData(itemParams);

              itemParamItemService.update(itemParamItem);

             

              //以下是消息队列

              Map<String, Object> message=new HashMap<>();

              message.put("type", "item.update");

              message.put("date", new Date());

              try {

                     message.put("item", objectMapper.writeValueAsString(item));

                     message.put("itemDesc", objectMapper.writeValueAsString(itemDesc));

                     message.put("itemParamItem", objectMapper.writeValueAsString(itemParamItem));

                     rabbitTemplate.convertAndSend("item.update", message);

              } catch (JsonProcessingException e) {

                     e.printStackTrace();

              }

       }

       /**

        *   批量修改商品状态status:1-正常,2-下架,3-删除

        */

       @Override

       public void updateStatus(Long[] ids,Integer status) {

              Item item=new Item();

              item.setStatus(status);

              for(Long id:ids) {

                     item.setId(id);

                     this.update(item);

                    

                     //以下是消息队列

                     Map<String, Object> message=new HashMap<>();

                     message.put("type", "item.delete");

                     message.put("date", new Date());

                     try {

                            message.put("item", objectMapper.writeValueAsString(item));

                            rabbitTemplate.convertAndSend("item.delete", message);

                     } catch (JsonProcessingException e) {

                            e.printStackTrace();

                     }

              }

       }

      

}

2.搜索系统

a)      rabbitmq与spring的整合文件:applicationContext-rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xmlns:rabbit="http://www.springframework.org/schema/rabbit"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

              http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

       <!-- rabbitmq消费者 -->

       <!-- 配置连接工厂 -->

       <rabbit:connection-factory id="factory" host="127.0.0.1" port="5672" virtual-host="/taotao" username="admin" password="admin"/>

      

       <!-- MQ的管理器:管理队列和交换机 -->

       <rabbit:admin connection-factory="factory"/>

       <!-- 配置队列 -->

       <rabbit:queue name="taotao_item_search" auto-declare="true"></rabbit:queue>

       <bean id="itemsListener" class="com.taotao.search.listener.ItemsListener">

       </bean>

       <!-- 队列监听 -->

       <rabbit:listener-container connection-factory="factory">

              <rabbit:listener ref="itemsListener" method="execut" queue-names="taotao_item_search"/>

       </rabbit:listener-container>

      

</beans>

b)      监听代码:rabbitmq传消息可以传Object类,但监听只能监听到String,其他的会无限循环。

package com.taotao.search.listener;

import java.util.Map;

import org.apache.commons.lang3.StringUtils;

import org.springframework.beans.factory.annotation.Autowired;

import com.fasterxml.jackson.databind.ObjectMapper;

import com.taotao.manage.bean.Item;

import com.taotao.search.service.SearchService;

/**

 *   rabbitmq监听类

 * @author KFS

 *

 */

public class ItemsListener {

       @Autowired

       private SearchService searchService;

       private ObjectMapper objectMapper=new ObjectMapper();

       public void execut(Map<String, Object> map) throws Exception{

              /*

               * rabbitmq传消息可以传Object类,但监听只能监听到String,其他的会无限循环。

               */

              String type=map.get("type").toString();

              if(StringUtils.equals(type, "item.insert") || StringUtils.equals(type, "item.update")) {

                     Item item = objectMapper.readValue(map.get("item").toString(), Item.class);

                     com.taotao.search.bean.Item record=new com.taotao.search.bean.Item();

                     record.setId(item.getId());

                     record.setTitle(item.getTitle());

                     record.setSellPoint(item.getSellPoint());

                     record.setPrice(item.getPrice());

                     record.setNum(item.getNum());

                     record.setBarcode(item.getBarcode());

                     record.setImage(item.getImage());

                     record.setCid(item.getCid());

                     record.setStatus(item.getStatus());

                     searchService.insert(record);

              }else if(StringUtils.equals(type, "item.delete")) {

                     Item item = objectMapper.readValue(map.get("item").toString(), Item.class);

                     searchService.delete(item.getId());

              }

             

       }

}

原文地址:https://www.cnblogs.com/kfsrex/p/11853645.html