使用并发来提高数据抓取的效率

在做项目的时候,有一个增强是需要把两个列从一个表迁移到另一个表,在做正式的迁移之前需要对原始数据进行备份,备份的实现也比较简单,就是把数据从数据库中读出来然后写到CSV文件中,主键以及列与列之间用分号分隔。我count了一下,总共是有559行数据,数据量其实挺小,之前的实现并没有使用多线程并发取数据,因为真实代码是受公司保护的,我用伪代码来描述一下之前的实现:

public void foo() throws Exception {
    PrintWriter pw = new PrintWriter(xxx);
    List<Map> ml = service.getData();
    for (Map m : ml) {
        // compose string from map
       pw.write(string);
pw.write(" "); } pw.close(); }

在for循环里面,数据需要依照ml的顺序一行一行写入文件,下一个写入操作必须等待上一个写入操作完成之后才能执行,这样执行完整个写入的时间就有点长,才559行数据花了16秒。

想想如果有几百万行数据呢,这个耗费的时间就很可观了,于是决定使用多线程来解决这个问题,在这个例子里面我想比较一下Callable和Runnable接口的性能,我先用Callable来实现并发:

 1 public void foo() throws Exception {
 2     final PrintWriter pw = new PrintWriter(xxx);
 3     List<Map> ml = service.getData();
 4     ExecutorService exec = Executors.newCachedThreadPool();
 5     for (final Map m : ml) {
 6         Callable cal = new Callable() {
 7             @Override
 8             public Object call() throws Exception {
 9                 // compose string
10                 pw.write(string);
11                 pw.write("
");
12             }
13         };
14         exec.submit(cal);
15     }
16     pw.write("completed");
17     pw.close();
18 }

从运行结果可以看到,用了多线程之后该方法的执行只花费了不到1秒,没看错,性能整整提高了16倍之多,但是打开结果文件看的时候发现有几个问题需要修复:

问题一: 有的行有空格,有的行包含两条数据

问题二: 线程还没有完全结束,completed就打印出来了

可见在并发编程中,有的地方还是要注意,否则得到的结果就不是我们想要的。先来看看问题一,这个问题产生的原因是pw.write(string);和pw.write(" ");两者不满足原子性,因为线程是并发执行的,如果线程1和线程2同时在写string,这样就会把两行数据写到一行,如果同时写换行,则会出现多个换行的情况,这个问题的解决方案有两个:

1. 在pw.write(" ");后让当前正在执行的线程小睡一会儿,例如Thread.sleep(500);

2. 让pw.write(string);和pw.write(" ");两个写入操作满足原子性,也就是这两个操作分隔为两步。

第一种解决方案会造成运行时间加长,所以我采用第二种方案,就是这样:pw.write(sb.toString()+" "); 这下满足原子性要求了吧

问题二出现的原因是并发的线程还没有完全执行完,completed就打印了,处理起来也倍儿简单,在打印completed之前让当前线程小睡半秒,再执行,果然问题都没有了。

刚才是用Callable实现并发的,大家知道callable跟runnable的区别是,Callable是concurrent包中,JDK1.5后新增的,Callable的call方法可以有返回值也能抛出检查型异常,Runnable的run方法就不能了。现在使用Runnable来看看:

public void foo() throws Exception {
    final PrintWriter pw = new PrintWriter(xxx);
    List<Map> ml = service.getData();
    for (final Map m : ml) {
        Runnable cal = new Runnable() {
            @Override
            public void call() {
                // compose string
                pw.write(string);
                pw.write("
");
            }
        };
        new Thread(cal).start();
    }
    pw.write("completed");
    pw.close();
}

可以看到,Runnable跟Callable的性能并没有什么区别。

原文地址:https://www.cnblogs.com/stonefeng/p/5829307.html