消息队列报 堆溢出解决方案

参考文档:

https://www.jianshu.com/p/d05c488b84cc

 /**
     * 用于周期性监控线程池的运行状态
     */
    private final ScheduledExecutorService scheduledExecutorService =
        Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("async thread executor monitor").build());
    /**
     * 自定义异步线程池
     * (1)任务队列使用有界队列
     * (2)自定义拒绝策略
     */
    private final ThreadPoolExecutor threadPoolExecutor =
        new ThreadPoolExecutor(consumerThreadNum, consumerThreadNum, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(consumerThreadNum),
                               new BasicThreadFactory.Builder().namingPattern("Thread-mns-ali-consumer-%d").build(),
                               (r, executor) -> log.error("the async executor pool is full!!"));

    @PostConstruct
    public void processMessage() {
        queue = client.getQueueRef();
        /**
         * 开启多个线程消费消息
         */
        for (int i = 0; i < consumerThreadNum; i++) {
            threadPoolExecutor.submit(new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                         process();
                    } catch (Exception e) {
                      
                    }
                   
                }
            }, "Thread-consumer-" + i));
        }
        
        
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            /**
             * 线程池需要执行的任务数
             */
            long taskCount = threadPoolExecutor.getTaskCount();
            /**
             * 线程池在运行过程中已完成的任务数
             */
            long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
            /**
             * 曾经创建过的最大线程数
             */
            long largestPoolSize = threadPoolExecutor.getLargestPoolSize();
            /**
             * 线程池里的线程数量
             */
            long poolSize = threadPoolExecutor.getPoolSize();
            /**
             * 线程池里活跃的线程数量
             */
            long activeCount = threadPoolExecutor.getActiveCount();
            
            for (; activeCount < consumerThreadNum; activeCount++) {
                threadPoolExecutor.submit(new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                             process();
                        } catch (Exception e) {
                           
                        }
                       
                    }
                }, "Thread-consumer-" + activeCount));
            }
        }, 0, 10, TimeUnit.MINUTES);
        
        
    }

    private void process() {

        while (!Thread.currentThread().isInterrupted()) {
            try {

                
                
                /**
                 * dequeue from queue
                 */
                Message message = queue.popMessage();

                /**
                 * no enough message
                 */
                if (message == null) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } //没有消息就休眠1秒
                    continue;
                }

                try {
                    String body = message.getMessageBodyAsString("UTF-8");
                    MnsMessage mnsMessage = GsonUtils.jsonToBean(body, MnsMessage.class);
                    
                    Thread.sleep(2000); //没有消息就休眠1秒
                    
                    handleHouse(message);
                    //处理成功删除消息
                    queue.deleteMessage(message.getReceiptHandle());
                } catch (Exception e) {
                    if(e instanceof BaseException){ //监听是不是超时异常
                      }
                    //处理异常,删除消息
                    queue.deleteMessage(message.getReceiptHandle());
                }finally{
                    // 最后执行MDC删除
                    MDC. remove(Common.SESSION_TOKEN_KEY);
                }
            } catch (Exception e) {
              
            }
        }
    }

原文地址:https://www.cnblogs.com/QAZLIU/p/9982986.html