java ee wildfly spring 在线程池的线程中注入

public class RtmpSpyingTests extends AbstractTransactionalJUnit4SpringContextTests {
    @Autowired
    ThreadPoolTaskExecutor rtmpSpyingTaskExecutor;

    @Autowired
    ApplicationContext ctx;

    @Autowired
    RtmpSourceRepository rtmpRep;

    @Test
    public void test() {
            RtmpSource rtmpSourceSample = new RtmpSource("test");

            rtmpRep.save(rtmpSourceSample);
            rtmpRep.flush();

            List<RtmpSource> rtmpSourceList = rtmpRep.findAll();  // Here I get a list containing rtmpSourceSample

            RtmpSpyingTask rtmpSpyingTask = ctx.getBean(RtmpSpyingTask.class, 
                        "arg1","arg2");
                rtmpSpyingTaskExecutor.execute(rtmpSpyingTask);

    }
}

public class RtmpSpyingTask implements Runnable {

    @Autowired
    RtmpSourceRepository rtmpRep;

    String nameIdCh;
    String rtmpUrl;

    public RtmpSpyingTask(String nameIdCh, String rtmpUrl) {
        this.nameIdCh = nameIdCh;
        this.rtmpUrl = rtmpUrl;
    }

    public void run() {
        // Here I should get a list containing rtmpSourceSample, but instead of that
        // I get an empty list
        List<RtmpSource> rtmpSource = rtmpRep.findAll();  
    }
}

应该用
@Service
public class AsyncTransactionService {

    @Autowired
    RtmpSourceRepository rtmpRep;

    @Transactional(readOnly = true)
    public List<RtmpSource> getRtmpSources() {
        return rtmpRep.findAll();
    }

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void insertRtmpSource(RtmpSource rtmpSource) {
        rtmpRep.save(rtmpSource);
    }
}

或者

用内部类。

package com.italktv.platform.audioDist.service;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.italktv.platform.audioDist.mongo.CustomerRepository;
import com.italktv.platform.audioDist.mongo.PlayUrl;
import com.italktv.platform.audioDist.mongo.PlayUrl.MyUrl;
import com.italktv.platform.audioDist.mongo.PlayUrlRepository;
import com.italktv.platform.audioDist.mysql.SubSet;
import com.italktv.platform.audioDist.mysql.UserRepository;
import com.italktv.platform.audioDist.task.MyTask;
import com.italktv.platform.audioDist.task.TaskManager;

@Component
public class ScheduleJobs {
    private static final Logger log = LoggerFactory.getLogger(ScheduleJobs.class);

    public final static long SECOND = 1 * 1000;
    LocalDateTime nowDate = LocalDateTime.now();

    @Autowired
    // This means to get the bean called userRepository
    // Which is auto-generated by Spring, we will use it to handle the data
    private UserRepository userRepository;

    @Autowired
    private PlayUrlRepository repository;
    @Autowired
    private CustomerRepository cc;
    
    @Autowired
    private UserRepository user;

      @Autowired 
    TaskManager taskManager;

    @Scheduled(fixedRate = SECOND * 400)
    public void fixedRateJob() {
        nowDate = LocalDateTime.now();
        System.out.println("=== start distribution: " + nowDate);
        dotask();
    }

//    @PostConstruct
//    public void init() {
//
//        taskManager = new TaskManager();
//        taskManager.init();
//    }
//
//    @PreDestroy
//    void destroy() {
//        taskManager.destroy();
//    }

    void dotask() {

        Map<Integer, List<SubSet>> map = userRepository.getUploadFileMap();
        for (Entry<Integer, List<SubSet>> subject : map.entrySet()) {
            int subjectId = subject.getKey();
            log.info(" subject id:" + subjectId);
            List<SubSet> allsub = subject.getValue();
            for (SubSet item : allsub) {
                log.info(" sub:" + item.toString());
                taskManager.add(new MessagePublish(item.id, item.path));
            }
            
            //wait them finished
            //TODO:
            
            //update subject status
            //TODO
            
        }

    }
    
    ////////////////////////内部类////////////////////////
    public class MessagePublish  extends MyTask implements Serializable{
        public MessagePublish() {
            super();
        }
        public  MessagePublish(int id,String name ){
            this.srcFile = name;
            this.partId=id;
        }
        
        @Value("${platform.audio.dist.domain}") private String domain;
        
        @Override
        public String call() {
            System.out.println(srcFile + " is uploading...");
            try {
                //获取消息发布的区域
                TimeUnit.SECONDS.sleep(new Random().nextInt(10)+1);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println(srcFile + " uploaded.");
            
            //2.RECORD TO MONGO DB
            PlayUrl play=new PlayUrl();
            play.programid="programid fake"+ "";
            play.domain=domain;
            play.protocol="HTTP";
            MyUrl myurl=new MyUrl();
            myurl.high="http://xxx.xxx/xi//";
            play.url=myurl;
            repository.save(play);
            //TODO:
            
            //IF FAILED, RETRY, RECORD RETRY TIMES.
            //TODO:
            
            return "ok";
        }
        
    }
}


package com.italktv.platform.audioDist.task;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;



@Component
public class TaskManager {
    
    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(TaskManager.class);

//    @Resource(lookup = "java:comp/DefaultManagedScheduledExecutorService")
//    ManagedScheduledExecutorService executor;

    Map<String, Future<String>> tasks;
    ExecutorService executor ;
    @PostConstruct
    public void init() {
        logger.info(" === init TaskManager===");
        tasks = new HashMap<String, Future<String>>();
        executor =   Executors.newFixedThreadPool(3);
    }

    public void add(MyTask task) {
        logger.info("add delay:"+ task.partId+task.srcFile);
          Future<String> future = executor.submit(task);
        tasks.put(task.srcFile, future);
    }

    public boolean cancel(String name) {
        logger.info("cancel "+ name);
        boolean ret = false;
        Future<String> future = tasks.get(name);
        if (future == null) {
            logger.info("Not found name:" + name);
        } else {
            ret = future.cancel(true);
            logger.info("cancel "+ name+":"+ret);
            tasks.remove(name);
        }
        return ret;
    }

    public void waitTaskDone(){
        Collection<Future<String>> futuretasks = tasks.values();
        for(Future<String> future: futuretasks ){
            System.out.println("future done? " + future.isDone());

            String result="";
            try {
                result = future.get();
            } catch (InterruptedException | ExecutionException e) {
                logger.error("future exec failed.");
                e.printStackTrace();
            }

            System.out.println("future done? " + future.isDone());
            System.out.print("result: " + result);
        }
    }
    @PreDestroy
    public void destroy(){
        try {
            System.out.println("attempt to shutdown executor");
            executor.shutdown();
            executor.awaitTermination(5, TimeUnit.SECONDS);
            }
        catch (InterruptedException e) {
            System.err.println("tasks interrupted");
        }
        finally {
            if (!executor.isTerminated()) {
                System.err.println("cancel non-finished tasks");
            }
            executor.shutdownNow();
            System.out.println("shutdown finished");
        }
    }
}


package com.italktv.platform.audioDist.task;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

public abstract class MyTask implements Callable<String> {
    protected String srcFile;
    protected int partId;
    String programId;

    protected MyTask() {

    }

}
原文地址:https://www.cnblogs.com/bigben0123/p/7458684.html