多线程消费队列中的接口数据,接口数据来源是kafka

首先看一下流程图,在根据其中一个接口(快件接口)作为例子,来对整个流程进行详解;

1  消费者执行消息类

package com.aspire.ca.prnp.service.impl;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.aspire.ca.prnp.db.service.impl.Constants;
import com.aspire.ca.prnp.db.service.impl.InsertDealRealIDCheckRecordThread;
import com.aspire.ca.prnp.db.service.impl.QueueFactory;
import com.aspire.ca.prnp.domain.express.status.RealIDCheckRecord;
import com.aspire.ca.prnp.kafka.AbstractPrnpConsumer;
import com.aspire.ca.prnp.service.PrnpKernelServer;
import com.aspire.prnp.util.JsonUtil;
import com.aspire.ca.prnp.db.service.impl.ThreadFactory;

public class PostalStatusConsumer extends AbstractPrnpConsumer {

private Logger logger = LoggerFactory.getLogger(PostalStatusConsumer.class);

private QueueFactory queueFactory;

@Override  //record为消费数据,kafka到InsertDealRealIDCheckRecordThread
public Boolean execute(ConsumerRecord<String, String> record) throws Exception {
if (Constants.KAFKA_CONSUM_MESSAGE_COUNT.get() == 0) {
Constants.LAST_START_COUNT_TIME = new AtomicLong(System.currentTimeMillis());
}
logger.info("消费者执行消息 offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
RealIDCheckRecord realIDCheckRecord = JsonUtil.jsonToBean(record.value(), RealIDCheckRecord.class); //ConsumerRecord<String, String>转化为实体类

//获取队列工厂类

queueFactory = (QueueFactory) PrnpKernelServer.getCtx().getBean("queueFactory");

//启动InsertDealRealIDCheckRecordThread线程,将InsertDealRealIDCheckRecordThread放到消费队列中,打开监控线程;

queueFactory.addKafkaRecord(InsertDealRealIDCheckRecordThread.class, realIDCheckRecord);  //6、2位置

//增加InsertDealRealIDCheckRecordThread数量,放到线程池中执行 ; 位置4
ThreadFactory.getInstance().exec(InsertDealRealIDCheckRecordThread.class, 20);  
if (Constants.KAFKA_CONSUM_MESSAGE_COUNT.get() + 5 >= Integer.MAX_VALUE) {
long costTime = System.currentTimeMillis() - Constants.LAST_START_COUNT_TIME.get();
if(costTime!=0){
logger.info(
">>KafkaConsumSpeedMonitor:本次消费计数期间,共消费数据:" + Constants.KAFKA_CONSUM_MESSAGE_COUNT.get() + ",耗时:" + (costTime / 1000) + "秒,消费速率:" + (Constants.KAFKA_CONSUM_MESSAGE_COUNT.get() / costTime) + "条/秒");
}
Constants.KAFKA_CONSUM_MESSAGE_COUNT = new AtomicInteger(0);
} else {
long costTime = (System.currentTimeMillis() - Constants.LAST_START_COUNT_TIME.get())/1000;
if(Constants.KAFKA_CONSUM_MESSAGE_COUNT.get()%1000==0&&costTime!=0){
logger.info(">>KafkaConsumSpeedMonitor:本次启动共消费数据:"+Constants.KAFKA_CONSUM_MESSAGE_COUNT.get()+"条,消费速率:"+(Constants.KAFKA_CONSUM_MESSAGE_COUNT.get()/costTime)+" 条/秒");
}
}
Constants.KAFKA_CONSUM_MESSAGE_COUNT.incrementAndGet();
return true;
}

}

2 队列工厂类

package com.aspire.ca.prnp.db.service.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;

/**
* 队列工厂
*/
@Service
public class QueueFactory {
private Logger logger = LoggerFactory.getLogger(QueueFactory.class);
private Map<Class, Queue> QUEUE_CONTAINER = null;

private static Integer DEFAULT_THREAD_SIZE = null;

private static AtomicInteger DEFAULT_OBJECT_SIZE = null;

private Boolean IS_OBSERVER_THREAD_START = Boolean.FALSE;   // 线程池观察线程是否启动标记 

private List<String> ES_THREAD_CLASS_NAME=null;

private QueueFactory() {
init();
}

//初始化队列工厂,默认消费线程为3个,消费的接口数(DEFAULT_OBJECT_SIZE)为2000

private void init() {
if (QUEUE_CONTAINER == null) {
QUEUE_CONTAINER = new ConcurrentHashMap<Class, Queue>();
}
if (DEFAULT_THREAD_SIZE == null) {
DEFAULT_THREAD_SIZE = 3;
}
if (DEFAULT_OBJECT_SIZE == null) {
DEFAULT_OBJECT_SIZE = new AtomicInteger(2000);
}
if(ES_THREAD_CLASS_NAME==null){
ES_THREAD_CLASS_NAME=new ArrayList<String>();
}

}

private static class QueueFactoryHolder {
public final static QueueFactory INSTANCE = new QueueFactory();
}

public static QueueFactory getInstance() {
return QueueFactoryHolder.INSTANCE;
}

/**
* 向队列中增加一个对像
* @param key
* @param value
*/

synchronized public void add(Class key, Object value) {
if(ES_THREAD_CLASS_NAME.contains(key.getName())){
return;
}
Queue queue = getQueue(key);
if (queue.size() > DEFAULT_OBJECT_SIZE.get()) {
try {
Thread.sleep(3000L);
logger.debug(">>queue" + key.getName() + "休眠,对象数量为:"+ queue.size());
} catch (InterruptedException e) {
logger.error(">>queue" + key.getName() + "休眠异常", e);
}
}
queue.add(value);
if (queue.size() % 50 == 0) {
logger.debug(">>queue" + key.getName() + "对象数量为:" + queue.size());
}
this.startThreadFactory(key);
}

/**
* 添加kafka消费消息
* @param key 线程类
* @param value 消费的实体类
*/
public void addKafkaRecord(Class key, Object value) {
if(ES_THREAD_CLASS_NAME.contains(key.getName())){
return;
}
Queue queue = getQueue(key);
if (queue.size() > DEFAULT_OBJECT_SIZE.get()) {
try {
Thread.sleep(10000L);
logger.debug(">>queue" + key.getName() + "休眠,对象数量为:"+ queue.size());
} catch (InterruptedException e) {
logger.error(">>queue" + key.getName() + "休眠异常", e);
}
}
queue.add(value);
if (queue.size() % 50 == 0) {
logger.debug(">>queue" + key.getName() + "对象数量为:" + queue.size());
}
this.startThreadFactory(key);
}
/**
* 向队列 中增加一批对像
*
* @param key
* @param values
*/
synchronized public void add(Class key, Collection values) {
Queue queue = getQueue(key);
if (queue.size() > DEFAULT_OBJECT_SIZE.get()) {
try {
Thread.sleep(3000L);
logger.debug(">>queue" + key.getName() + "休眠,对象数量为:"+ queue.size());
} catch (InterruptedException e) {
logger.error(">>queue" + key.getName() + "休眠异常", e);
}
}
queue.addAll(values);
if (queue.size() % 50 == 0) {
logger.debug(">>queue" + key.getName() + "对象数量为:" + queue.size());
}
this.startThreadFactory(key);
}

/**
* 获取一条数据
*
* @param key
* @param size
* @return
*/
synchronized public Object get(Class key) {
Object obj = null;
Queue queue = this.getQueue(key);
if (queue.size() > 0) {
obj = queue.poll();
}
return obj;
}

/**
* 从队列中获取一批数据
*
* @param key
* @param size
* @return
*/
synchronized public List get(Class key, int size) {
List result = new ArrayList(0);
for (int i = 0; i < size; i++) {
Object obj = get(key);
if (obj != null) {
result.add(obj);
} else {
break;
}
}
return result;
}

synchronized public int getSize(Class key) {
int size = 0;
if (!isEmpty(key)) {
size = this.getQueue(key).size();
}
return size;
}

/**
* 是否为空
*/
synchronized public Boolean isEmpty(Class key) {
Boolean res = Boolean.TRUE;
if (this.getQueue(key) != null && this.getQueue(key).size() > 0) {
res = Boolean.FALSE;
}
return res;
}

/**
* 获取队列,如果存在,直接从容器中取得,如果不存在,创建队例,并存储入容器
* @param key
* @return
*/
synchronized private Queue getQueue(Class key) {
Queue queue = null;
if (QUEUE_CONTAINER.containsKey(key)) {
queue = QUEUE_CONTAINER.get(key);
logger.debug(">>已通过Key:" + key + ",获取队列,队列 当前大小:" + queue.size());
} else {
logger.debug(">>通过Key:" + key + "未获取队列 ,新创建队列,并放入容器中。");
queue = new ConcurrentLinkedQueue();
QUEUE_CONTAINER.put(key, queue);
}
return queue;
}

public Map<Class, Queue> getAllQueue() {
return QUEUE_CONTAINER;
}

/**
* 启动线程池
*
* @param clazz
*/
private void startThreadFactory(Class clazz) {
if (!IS_OBSERVER_THREAD_START) {
Runnable runnable = new CheckQueueFactoryThread();       // BeanUtils.instantiate(CheckQueueFactoryThread.class);
ExecuteUserCountThreadPool.getInstance().submitTask(runnable);  //线程池执行线程
IS_OBSERVER_THREAD_START = Boolean.TRUE;   //标记改成true,表示该线程已经被监控;
}
ThreadFactory.getInstance().exec(clazz, DEFAULT_THREAD_SIZE);
}

}

3 CheckQueueFactoryThread  监控队列工厂线程

package com.aspire.ca.prnp.db.service.impl;

import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.aspire.ca.prnp.domain.express.status.Indiv;
import com.aspire.ca.prnp.interfaces.IIndivUser;
import com.aspire.ca.prnp.service.PrnpKernelServer;

/**
*监控队列工厂线程
*/
public class CheckQueueFactoryThread implements Runnable {

private static Logger logger = LoggerFactory.getLogger(CheckQueueFactoryThread.class);
private QueueFactory queueFactory = null;
private static Long LAST_OPERATE_DB_TIME = null;
private static Integer LOOP_COUNT = 0;

public CheckQueueFactoryThread() {
init();
}

@Override
public void run() {
while (true) {
StringBuilder sb = new StringBuilder();
sb.append(">>>CheckQueueFactoryThread Trace:");
for (int i = 0; i < 30; i++) {
sb.append("-");
}
sb.append("线程工厂观察器");
for (int i = 0; i < 30; i++) {
sb.append("-");
}
sb.append(" ");
Map<Class, Queue> allQueue = queueFactory.getAllQueue();
if (allQueue != null && !allQueue.isEmpty()) {
Set set = allQueue.keySet();
if (set != null) {
Iterator<Class> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
Class clazz = keyIterator.next();
Queue queue = allQueue.get(clazz);
sb.append(">>>CheckQueueFactoryThread Trace:");
sb.append("Class is:" + clazz.getName());
sb.append(",queue size is:" + queue.size());
for (int i = 0; i < 10; i++) {
sb.append(" ");
}
sb.append(" ");
}
}
}
sb.append(">>>CheckQueueFactoryThread Trace:");
for (int i = 0; i < 30; i++) {
sb.append("-");
}
sb.append("结束线程工厂观察器");
for (int i = 0; i < 30; i++) {
sb.append("-");
}
sb.append(" ");
logger.debug(">>>Trace Queue:" + sb.toString());
try {
Thread.sleep(30000L);
} catch (InterruptedException e) {
logger.error(">>DO Multi Data Save:", e);
}
}

}

private void init() {
queueFactory = (QueueFactory) PrnpKernelServer.getCtx().getBean("queueFactory");
LAST_OPERATE_DB_TIME = System.currentTimeMillis();
}

private void debug(String msg) {
String line = ">>batch save EcIndivAddr thread:" + msg+ " current loop count is:" + LOOP_COUNT;
logger.debug(line);
}

}

4  线程工厂类

package com.aspire.ca.prnp.db.service.impl;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.BeanUtils;

/**
* 线程工厂
*/
public class ThreadFactory {
private static Map<String, Integer> THREAD_CONTAINER = null;

private static class ThreadFactoryHolder {
public final static ThreadFactory INSTANCE = new ThreadFactory();
}

private ThreadFactory() {
init();
}

private void init() {
if (THREAD_CONTAINER == null) {
THREAD_CONTAINER = new ConcurrentHashMap<String, Integer>();
}
}

//单列模式

public static ThreadFactory getInstance() {
return ThreadFactoryHolder.INSTANCE;
}

/**
* 启动线程
* @param clazz
* @param size
*/
public void exec(Class clazz, int size) {
Integer runThreadSize = this.getThreadSize(clazz);
int need2CreateThreadSzie = size - runThreadSize;
if (need2CreateThreadSzie > 0) {
for (int i = 0; i < need2CreateThreadSzie; i++) {
Runnable runnable = BeanUtils.instantiate(clazz);  //转化成可执行线程,clazz目标线程
ExecuteUserCountThreadPool.getInstance().submitTask(runnable);//线程池执行线程
}
THREAD_CONTAINER.put(clazz.getName(), size);
}
}

private Integer getThreadSize(Class clazz) {
Integer size = 0;
String className = clazz.getName();
if (THREAD_CONTAINER.containsKey(className)) {
size = THREAD_CONTAINER.get(className);
}
return size;
}

}

5线程池

package com.aspire.ca.prnp.db.service.impl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.aspire.prnp.util.ConstantUtil;
import com.aspire.webbas.configuration.config.ConfigurationHelper;

/**
* 发送下发消息报文线程池管理
*/
public class ExecuteUserCountThreadPool {

private static ExecuteUserCountThreadPool userCountThreadPool = new ExecuteUserCountThreadPool();

/**
* 线程池对象
*/
private static ExecutorService threadPool;

/**
* 单例
*/
private ExecuteUserCountThreadPool(){
int threadCount = ConfigurationHelper.getConfiguration(ConstantUtil.CONFIG_FILE_NAME_CONFIG).getInt("send_outmessage_thread_count", 500);
//初始化线程池,大小为threadCount
threadPool = Executors.newFixedThreadPool(threadCount);
}

/**
* 获取线程池管理实例
* @return
*/
public static ExecuteUserCountThreadPool getInstance(){
return userCountThreadPool;
}

/**
* 提交任务到线程池中执行
* @param task
*/
public void submitTask(Runnable task){
threadPool.submit(task);
}

/**
* 关闭线程池,停止接收新任务
*/
public void shutdown(){
threadPool.shutdown();
}
}

6 快件接口业务线程

package com.aspire.ca.prnp.db.service.impl;

import java.util.List;

import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.aspire.ca.prnp.domain.express.status.RealIDCheckRecord;
import com.aspire.ca.prnp.interfaces.IExpressDeliveryStatus;
import com.aspire.ca.prnp.service.PrnpKernelServer;

/**
* 快件接口业务线程
*/
public class InsertDealRealIDCheckRecordThread implements Runnable {

private static Logger logger = LoggerFactory.getLogger(InsertDealRealIDCheckRecordThread.class);
private QueueFactory queueFactory = null;
private IExpressDeliveryStatus<RealIDCheckRecord, Boolean> service = null;  //快件接口类
private static Long LAST_OPERATE_DB_TIME = null;
private static Integer LOOP_COUNT = 0;  //本线程计数器
private static Long TIME_STEP=Constants.REFRESH_DB_TIME_STEP;

public InsertDealRealIDCheckRecordThread() {
init();
}
@Override
public void run() {
while (true) {
long timeStep = System.currentTimeMillis() - LAST_OPERATE_DB_TIME;
if (queueFactory.getSize(InsertDealRealIDCheckRecordThread.class) > 1 || timeStep > TIME_STEP) {

//这个位置得到实体参数时,是同步的,从消费队列中取一个数据进行消费
List<RealIDCheckRecord> realIDCheckRecordList = queueFactory.get(InsertDealRealIDCheckRecordThread.class,1);
if (CollectionUtils.isNotEmpty(realIDCheckRecordList)) {
try {
service.expressStatusInfo(realIDCheckRecordList.get(0));   //快件接口业务
logger.debug("InsertDealRealIDCheckRecordThread消费kafka取出的快件状态消息");
} catch (Exception e) {
logger.error(">>DO Multi Data Save:", e);
queueFactory.add(InsertDealRealIDCheckRecordThread.class,realIDCheckRecordList);
} finally {
LAST_OPERATE_DB_TIME = System.currentTimeMillis();

}
}
}
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
logger.error(">>DO Multi Data Save:", e);
}
LOOP_COUNT++;
}

}
private void init() {
queueFactory = (QueueFactory) PrnpKernelServer.getCtx().getBean("queueFactory");
service = (IExpressDeliveryStatus<RealIDCheckRecord, Boolean>) PrnpKernelServer.getCtx().getBean("expressDeliveryStatusImpl2");
LAST_OPERATE_DB_TIME = System.currentTimeMillis();
}

private void debug(String msg) {
String line = ">>batch save EcIndivAddr thread:" + msg+" current loop count is:"+LOOP_COUNT;
logger.debug(line);
}

}

7 快件接口 这里不贴了,可以参考流程图中线程1、线程2、线程3、线程4相关队列和线程操作;

原文地址:https://www.cnblogs.com/fuqiang-terry/p/6909506.html