自定义异步线程池工具,用于执行异步方法

此方法经历百万数据量线上实践验证过



import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.google.common.eventbus.AsyncEventBus;
import org.springframework.context.ApplicationContext;

/**
 * 基于guava eventbus的调用工具类
 * 
 */
public class EnventBusUtil {

    private ApplicationContext applicationContext;

    public static void  invoke(String beanName, String methodName, Object[] args){


    }
    /**
     * 异步调用bean的methodName方法
     * 
     * @param bean
     * @param methodName
     * @param args
     */
    public static void invoke(Object bean, String methodName, Object[] args) {
        EnventBusUtil.invoke(bean, methodName, args, null);
    }

    /**
     * 异步调用bean的methodName方法
     * 
     * @param bean对象 或者bean名称(字符串)
     * @param methodName
     * @param args
     * @param clazzs
     */
    @SuppressWarnings("rawtypes")
    public static void invoke(Object bean, String methodName, Object[] args, Class[] clazzs) {
        // new一个事件
        Event event = new Event(bean, methodName, args, clazzs);

        // 将事件提交到bus上
        eventBus.post(event);
    }

    /**
     * 多线程,使用固定线程池大小,非指定线程池大小可能大致OOM
     */
    //private static ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
    //    private static ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(30);
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(30, 30, 0l, TimeUnit.MILLISECONDS,
                                                       new LinkedBlockingQueue<Runnable>(5000));
    /**
     * event bus
     */
    private static AsyncEventBus      eventBus = new AsyncEventBus("enventBusUtil", executor);
    //private static EventBus           eventBus = new EventBus("enventBusUtil");

    /**
     * 事件监听器
     */
    private static EventListener      listener = new EventListener();

    static {
        // 注册监听器到bus
        eventBus.register(listener);

        //启动线程
        executor.prestartAllCoreThreads();

    }

}


import java.io.Serializable;

/**
 * 被监听事件
 * 
 */
public class Event implements Serializable {
    private static final long serialVersionUID = 4161755693819623893L;

    /**
     * bean
     */
    private Object            bean;

    /**
     * 方法名
     */
    private String            methodName;

    /**
     * 参数
     */
    private Object[]          args;

    @SuppressWarnings("rawtypes")
    private Class[]           clazzs;

    public Event(Object bean, String methodName, Object[] args) {
        this.bean = bean;
        this.methodName = methodName;
        this.args = args;
    }

    public Event(Object bean, String methodName, Object[] args, @SuppressWarnings("rawtypes") Class[] clazzs) {
        this.bean = bean;
        this.methodName = methodName;
        this.args = args;
        this.clazzs = clazzs;
    }

    public Object getBean() {
        return bean;
    }

    public void setBean(Object bean) {
        this.bean = bean;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Object[] getArgs() {
        return args;
    }

    public void setArgs(Object[] args) {
        this.args = args;
    }

    @SuppressWarnings("rawtypes")
    public Class[] getClazzs() {
        return clazzs;
    }

    @SuppressWarnings("rawtypes")
    public void setClazzs(Class[] clazzs) {
        this.clazzs = clazzs;
    }

}


import java.lang.reflect.Method;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;

/**
 * 事件监听类, 通过反射调用传入event中的方法
 * 
 */
public class EventListener {
    private final static Logger logger = LoggerFactory.getLogger(EventListener.class);

    @Subscribe
    @AllowConcurrentEvents
    public void listen(Event event) throws Exception {
        if (null == event) {
            logger.warn("EventListener.listen() event is null, do nothing");
            return;
        }

        //目标方法参数列表
        Object[] args = event.getArgs();

        //目标方法参数类型列表
        @SuppressWarnings("rawtypes")
        Class[] parameterTypes = event.getClazzs();

        if (null == parameterTypes) {
            if (null != args && args.length > 0) {
                parameterTypes = new Class[args.length];
                for (int i = 0; i < args.length; i++) {
                    parameterTypes[i] = args[i].getClass();
                }
            }
        }
        Object a=event.getBean().getClass();
        //取bean的方法对象
        Method method = event.getBean().getClass().getDeclaredMethod(event.getMethodName(), parameterTypes);
        method.setAccessible(true);

        //调用目标方法
        method.invoke(event.getBean(), args);

    }
}

调用方法

参数: bean 需要传入bean对象

    methodName 传入执行的方法

    args 方法参数数组;需要注意传数组

    classs 方法参数的类型数组;

EnventBusUtil.invoke(bean,"methodName", list.toArray(),c);

 根据bean名称获取bean

@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (SpringContextUtil.applicationContext == null) {
SpringContextUtil.applicationContext = applicationContext;
}
}

public static ApplicationContext getApplicationContext(){
return applicationContext;
}

/**
* 适用于springbean使用注解@Service("XXXService")
* 获取接口对象 参数传入 XXXService
* @param beanName
* @return
*/
public static Object getBean(String beanName){
return getApplicationContext().getBean(beanName);
}

/**
* 适用于springbean使用注解@Service
* 获取接口对象 参数传入 XXXService.class 不是 XXXServiceImpl.class
* @param c
* @return
*/
public static Object getBean(Class c){
return getApplicationContext().getBean(c);
}

public static <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}

/**
* 获取当前环境
*/
public static String getActiveProfile() {
return applicationContext.getEnvironment().getActiveProfiles()[0];
}
}
原文地址:https://www.cnblogs.com/li-lun/p/13399053.html