Sender(agumaster_crawler)->RabbitMq->Reciever(agumaster)

发送方:

package com.heyang.agumasterCrawler;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.heyang.agumasterCrawler.codename.BaseCodeNameCrawler;
import com.heyang.agumasterCrawler.codename.FenghuangCrawler;
import com.heyang.agumasterCrawler.entity.Stock;
import com.heyang.agumasterCrawler.entity.StockBundle;
import com.heyang.agumasterCrawler.sender.Sender;

@SpringBootApplication
public class AgumasterCrawlerApplication implements CommandLineRunner {
    @Autowired
    private Sender sender=null;
    
    @Override
    public void run(String... args) throws Exception {
        BaseCodeNameCrawler crawler=new FenghuangCrawler();
        List<Stock> stockList=crawler.getStockList();
        
        StockBundle sb=new StockBundle();
        sb.setSource("凤凰财经");
        sb.setCount(stockList.size());
        sb.setStockList(stockList);
        sb.setType("stock");
        
        ObjectMapper mapper = new ObjectMapper();
        
        String str=mapper.writeValueAsString(sb);
        this.sender.send("stockQueue",str);
    }
    
    public static void main(String[] args) {
        SpringApplication.run(AgumasterCrawlerApplication.class, args);
    }
}

传到MQ的消息:

{"type":"stock","source":"凤凰财经","count":3791,"stockList":[{"id":0,"code":"688466","name":"N金科"},{"id":1,"code":"000825","name":"太钢不锈"},{"id":2,"code":"300022","name":"吉峰科技"},{"id":3,"code":"002536","name":"飞龙股份"},{"id":4,"code":"300459","name":"金科文化"},{"id":5,"code":"00240...
]}

接收方:

package com.ufo.hy.agumaster.mq;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ufo.hy.agumaster.entity.Stock;
import com.ufo.hy.agumaster.service.StockService;

/**
 * Used to receive stock code/names
 * @author Heyang
 *
 */
@Component
@RabbitListener(queues="stockQueue")
public class Receiver {
    private final Logger logger = LoggerFactory.getLogger(Receiver.class);
    
    @Autowired
    protected StockService stkService=null;
    
    @RabbitHandler
    public void QueueReceive(String receivedMsg) {
        logger.info("Got mas:"+receivedMsg);
        ObjectMapper mapper = new ObjectMapper();
        
        try {
            JsonNode node = mapper.readTree(receivedMsg);
            String type=node.get("type").asText();
            
            if("stock".equals(type)) {
                JsonNode listNode=node.path("stockList");
                String source=node.get("source").asText();
                String count=node.get("count").asText();
                logger.info("Got {} stocks from {}.",count,source);

                // 遍历list节点的子节点
                List<Stock> stockList=new ArrayList<Stock>();
                Iterator<JsonNode> iterator = listNode.elements();
                while (iterator.hasNext()) {
                    JsonNode stock = iterator.next();
                    String code=stock.get("code").asText();
                    String name=stock.get("name").asText();
                    
                    stockList.add(new Stock(0,code,name));
                }
                
                int[] arr=stkService.batchUpdate(stockList,source,null);
                int inserted=arr[0],updated=arr[1];
                
                logger.info("Updated {},inserted {}.",updated,inserted);
            }                   
                    
        }catch(Exception ex) {
            ex.printStackTrace();
        }
    }
} 
原文地址:https://www.cnblogs.com/heyang78/p/12862970.html