Spring AMQP:MessageListenerAdapter MessageConverter

一.MessageListenerAdapter

消息监听适配器

配置:

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //设置监听的队列
        container.setQueues(queue());
        //设置当前消费者数量
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(5);
        //重回队列
        container.setDefaultRequeueRejected(false);
        //签收机制
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //消费端标签策略
        container.setConsumerTagStrategy(queue-> queue+"_"+ UUID.randomUUID().toString());
        //消息适配器
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        container.setMessageListener(adapter);
        return container;
    }
MessageDelegate类:
public class MessageDelegate {

    public void handleMessage(byte[] messageBody){
        System.out.println("默认方法,消息:"+new String(messageBody));
    }

    public void handleMessage(String messageBody){
        System.out.println("字符串方法,消息"+messageBody);
    }
}

注意,自定义的Delegate类,默认方法名是handlermessage

当然,想自定义方法名,可以setDefaultListenerMethod(),这里不再演示。

测试字符串: 

    @Test
    public void testSend1(){
        rabbitTemplate.convertAndSend("amqp.bean.topic", "amqp.send","hello spring");
    }

测试字节数组:

    @Test
    public void testSend2(){
        rabbitTemplate.convertAndSend("amqp.bean.topic", "amqp.send","hello spring2".getBytes());
    }

此外也可以自定义转换器

public class MyMessageConvert implements MessageConverter {

    /**
     * java对象转换为Message对象
     */
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(object.toString().getBytes(),messageProperties);
    }

    /**
     * message对象转换为java对象
     */
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return new String(message.getBody());
    }
}

 在适配器中配置转换器

        //消息适配器
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setMessageConverter(new MyMessageConvert());
        container.setMessageListener(adapter);

配置了消息转换器后,现在无论发送的消息是字符串还是字节数组,适配器都直接进入handleMessage(String messageBody)方法。

 适配器还可以将队列名称与方法名称进行一一匹配。

需要传入一个map,key是队列名,value是方法名

adapter.setQueueOrTagToMethodName(queueName,method);

 源码:

/**
	 * Set the mapping of queue name or consumer tag to method name. The first lookup
	 * is by queue name, if that returns null, we lookup by consumer tag, if that
	 * returns null, the {@link #setDefaultListenerMethod(String) defaultListenerMethod}
	 * is used.
	 * @param queueOrTagToMethodName the map.
	 * @since 1.5
	 */
	public void setQueueOrTagToMethodName(Map<String, String> queueOrTagToMethodName) {
		this.queueOrTagToMethodName.putAll(queueOrTagToMethodName);
	}

当队列与方法绑定后,队列里的消息会被绑定的方法所处理。 

二.MessageConverter

消息转换器,一般自定义转换器需要实现这个接口,重写toMessage和fromMessage方法。

Json转换器:Jackson2JsonMessageConverter,可以进行java对象的转换功能

先加上jackson依赖:

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.9</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.9</version>
        </dependency>

json测试:

在delegate类中定义处理json的方法:

    public void handleMessage(Map messageBody){
        System.out.println("json转换,消息:"+messageBody);
    }

在adapter中配置Jackson2JsonMessageConverter

        //消息适配器
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        //adapter.setMessageConverter(new MyMessageConvert());
        //json形式转换器
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        adapter.setMessageConverter(converter);
        container.setMessageListener(adapter);

测试发送消息:

    @Test
    public void testJson() throws JsonProcessingException {
        User user = new User("张三",22);
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(user);
        MessageProperties properties = new MessageProperties();
        properties.setContentType("application/json");
        Message message = new Message(json.getBytes(), properties);
        rabbitTemplate.send("amqp.bean.topic", "amqp.send",message);
    }

测试结果:

 配置DefaultJackson2JavaTypeMapper映射器:进行java对象的映射关系

        //消息适配器
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        //adapter.setMessageConverter(new MyMessageConvert());
        //json形式转换器
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper mapper = new DefaultJackson2JavaTypeMapper();
        //设置安全包
        mapper.setTrustedPackages("com.wj.springamqp.domain");
        converter.setJavaTypeMapper(mapper);
        adapter.setMessageConverter(converter);
        container.setMessageListener(adapter);

注意要setTrustedPackages,否则会报如下错误。

 测试类:

    @Test
    public void testJson() throws JsonProcessingException {
        User user = new User("张三",22);
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(user);
        MessageProperties properties = new MessageProperties();
        properties.setContentType("application/json");
        //key:_TypeId_ value:类的全路径
        //加上去后,就可以支持json转换为java对象
        properties.getHeaders().put("__TypeId__","com.wj.springamqp.domain.User");
        Message message = new Message(json.getBytes(), properties);
        rabbitTemplate.send("amqp.bean.topic", "amqp.send",message);
    }

注意!!!"__TypeId__"这里下划线是四个,前面两个后面两个,不然无法转为java对象。

Jackson2JsonMessageConverter和DefaultJackson2JavaTypeMapper支持java对象多映射转换。

        //json形式转换器
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper mapper = new DefaultJackson2JavaTypeMapper();
        //设置安全包
        mapper.setTrustedPackages("com.wj.springamqp.domain");

        Map<String, Class<?>> map = new HashMap<>();
        map.put("user", com.wj.springamqp.domain.User.class);
        map.put("stu", com.wj.springamqp.domain.Stu.class);
        mapper.setIdClassMapping(map);

        converter.setJavaTypeMapper(mapper);

 此外MessageConverter支持pdf,image的转换,在发送消息的时候,需要将文件转换为二进制字节流,然后在转换器中生成文件。

原文地址:https://www.cnblogs.com/wwjj4811/p/13036210.html