spring Batch实现数据库大数据量读写

1. data-source-context.xml

Xml代码  收藏代码
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
  4.     xmlns:tx="http://www.springframework.org/schema/tx"  
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
  6.     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd  
  7.     http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">  
  8.   
  9.     <!-- 1) USE ANNOTATIONS TO IDENTIFY AND WIRE SPRING BEANS. -->  
  10.     <context:component-scan base-package="net.etongbao.vasp.ac" />  
  11.       
  12.     <!-- 2) DATASOURCE, TRANSACTION MANAGER AND JDBC TEMPLATE -->  
  13.    
  14.     <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"  
  15.         destroy-method="close" abstract="false" scope="singleton">  
  16.         <!-- oracle.jdbc.driver.oracleDriver -->  
  17.         <property name="driverClass" value="oracle.jdbc.OracleDriver" />  
  18.         <property name="jdbcUrl" value="jdbc:oracle:thin:@192.168.1.23:1521:orcl01" />  
  19.         <property name="user" value="USR_DEV01" />  
  20.         <property name="password" value="2AF0829C" />  
  21.         <property name="checkoutTimeout" value="30000" />  
  22.         <property name="maxIdleTime" value="120" />  
  23.         <property name="maxPoolSize" value="100" />  
  24.         <property name="minPoolSize" value="2" />  
  25.         <property name="initialPoolSize" value="2" />  
  26.         <property name="maxStatements" value="0" />  
  27.         <property name="maxStatementsPerConnection" value="0" />  
  28.         <property name="idleConnectionTestPeriod" value="30" />     
  29.     </bean>  
  30.     <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">  
  31.         <property name="dataSource" ref="dataSource" />  
  32.     </bean>  
  33.   
  34.     <bean id="transactionManager"  
  35.         class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
  36.         <property name="dataSource" ref="dataSource" />  
  37.     </bean>  
  38.       
  39.     <tx:annotation-driven transaction-manager="transactionManager" />  
  40. </beans>  

 2. quartz-context.xml       commit-interval="10000"每次批量数据的条数,数值越大效率越高,可在此处添加事物处理,

每次回滚数就是commit-interval数

Xml代码  收藏代码
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:batch="http://www.springframework.org/schema/batch"  
  4.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
  5.     xmlns:tx="http://www.springframework.org/schema/tx"  
  6.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
  7.     http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd  
  8.     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd  
  9.     http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">  
  10.   
  11.     <import resource="data-source-context.xml"/>  
  12.       
  13.     <!--   JOB REPOSITORY - WE USE IN-MEMORY REPOSITORY FOR OUR EXAMPLE -->  
  14.     <bean id="jobRepository"  
  15.         class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">  
  16.         <property name="transactionManager" ref="transactionManager" />  
  17.     </bean>  
  18.       
  19.     <!-- batch config -->  
  20.     <bean id="jobLauncher"  
  21.         class="org.springframework.batch.core.launch.support.SimpleJobLauncher">  
  22.         <property name="jobRepository" ref="jobRepository" />  
  23.     </bean>  
  24.   
  25.     <!--  FINALLY OUR JOB DEFINITION. THIS IS A 1 STEP JOB -->  
  26.     <batch:job id="ledgerJob">  
  27.         <batch:listeners>  
  28.             <batch:listener ref="appJobExecutionListener" />  
  29.         </batch:listeners>  
  30.         <batch:step id="step1">  
Xml代码  收藏代码
  1. <span style="white-space: pre;">    </span>   <batch:tasklet transaction-manager="transactionManager">  
  2.         <batch:tasklet>  
  3.             <batch:listeners>  
  4.                 <batch:listener ref="itemFailureLoggerListener" />  
  5.             </batch:listeners>  
  6.             <batch:chunk reader="ledgerReader" writer="ledgerWriter"  
  7.                 commit-interval="10000" /> <!-- 1万条进行一次commit -->  
  8.         </batch:tasklet>  
Xml代码  收藏代码
  1.                   </batch:tasklet>  
  2.     </batch:step>  
  3. </batch:job>  
  4.    
  5. <!--  READER -->  
  6. <bean id="ledgerReader"  
  7.     class="org.springframework.batch.item.database.JdbcCursorItemReader">  
  8.     <property name="dataSource" ref="dataSource" />  
  9.     <property name="sql" value="select * from ledger" />   
  10.     <property name="rowMapper" ref="ledgerRowMapper" />  
  11. </bean>  
  12.    
  13. <!-- Spring Batch Job同一个job instance,成功执行后是不允许重新执行的【失败后是否允许重跑,可通过配置Job的restartable参数来控制,默认是true】,如果需要重新执行,可以变通处理,  
  14.     添加一个JobParameters构建类,以当前时间作为参数,保证其他参数相同的情况下却是不同的job instance -->  
  15. <bean id="jobParameterBulider" class="org.springframework.batch.core.JobParametersBuilder" />  
  16.   
  17. <!-- 定时任务 开始 -->    
  18. <bean id="ledgerJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">    
  19.     <property name="targetObject">    
  20.         <!-- 定时执行的类 -->    
  21.         <ref bean="quartzLedgerJob" />    
  22.     </property>    
  23.     <property name="targetMethod">    
  24.         <!-- 定时执行的类方法 -->    
  25.         <value>execute</value>    
  26.     </property>    
  27. </bean>    
  28.    
  29. <bean id="ledgerCronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean" >    
  30.     <!-- 这里不可以直接在属性jobDetail中引用taskJob,因为他要求的是一个jobDetail类型的对象,所以我们得通过MethodInvokingJobDetailFactoryBean来转一下 -->    
  31.     <property name="jobDetail" >    
  32.         <ref bean="ledgerJobDetail" />    
  33.     </property>    
  34.     <!--在每天下午18点到下午18:59期间的每1分钟触发  -->    
  35.     <!--在每天上午10点40分准时触发  -->    
  36.     <property name="cronExpression" >    
  37.         <!-- <value>0 * 15 * * ?</value> -->  
  38.         <value>0 45 10 * * ? * </value>   
  39.     </property>    
  40.          
  41. </bean>    
  42.        
  43. <!-- 触发器工厂,将所有的定时任务都注入工厂-->    
  44. <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">    
  45.     <!-- 添加触发器 -->    
  46.     <property name="triggers">    
  47.         <list>    
  48.             <!-- 将上面定义的测试定时任务注入(可以定义多个定时任务,同时注入)-->    
  49.             <ref local="ledgerCronTrigger" />    
  50.         </list>    
  51.     </property>    
  52. </bean>    
  53. <!-- 定时任务 结束 -->    
  54. lt;/beans>    

3.定时调度job类 QuartzLedgerJob.java

 package net.etongbao.vasp.ac.quartz;

Java代码  收藏代码
  1. import java.util.Date;  
  2. import org.slf4j.Logger;  
  3. import org.slf4j.LoggerFactory;  
  4. import org.springframework.batch.core.Job;  
  5. import org.springframework.batch.core.JobParametersBuilder;  
  6. import org.springframework.batch.core.launch.JobLauncher;  
  7. import org.springframework.beans.factory.annotation.Autowired;  
  8. import org.springframework.stereotype.Component;  
  9. import org.springframework.util.StopWatch;  
  10.   
  11. /** 
  12.  * 定时调度类 
  13.  * @author Fu Wei 
  14.  * 
  15.  */  
  16.   
  17. @Component("quartzLedgerJob")  
  18. public class QuartzLedgerJob {  
  19.   
  20.     private static final Logger LOG = LoggerFactory.getLogger(QuartzLedgerJob.class);  
  21.   
  22.     @Autowired  
  23.     private JobLauncher jobLauncher;  
  24.   
  25.     @Autowired  
  26.     private Job ledgerJob;  
  27.   
  28.     @Autowired  
  29.     JobParametersBuilder jobParameterBulider;  
  30.   
  31.     private static long counter = 0l;  
  32.       
  33.     /** 
  34.      * 执行业务方法 
  35.      * @throws Exception 
  36.      */  
  37.     public void execute() throws Exception {  
  38.         LOG.debug("start...");  
  39.         StopWatch sw = new StopWatch();  
  40.         sw.start();  
  41.         /* 
  42.          * Spring Batch Job同一个job instance,成功执行后是不允许重新执行的【失败后是否允许重跑, 
  43.          * 可通过配置Job的restartable参数来控制,默认是true】,如果需要重新执行,可以变通处理, 
  44.          * 添加一个JobParameters构建类,以当前时间作为参数,保证其他参数相同的情况下却是不同的job instance 
  45.          */  
  46.         jobParameterBulider.addDate("date", new Date());  
  47.         jobLauncher.run(ledgerJob, jobParameterBulider.toJobParameters());  
  48.         sw.stop();  
  49.         LOG.debug("Time elapsed:{},Execute quartz ledgerJob:{}", sw.prettyPrint(), ++counter);  
  50.     }  
  51. }  
 

 4.程序启动类 StartQuartz.java

 package net.etongbao.vasp.ac.quartz;

Java代码  收藏代码
  1. import java.io.FileNotFoundException;  
  2.   
  3. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  4.   
  5. /** 
  6.  * 启动定时调度 
  7.  * @author Fu Wei 
  8.  * 
  9.  */  
  10. public class StartQuartz {  
  11.   
  12.     public static void main(String[] args) throws FileNotFoundException {  
  13.   
  14.         new ClassPathXmlApplicationContext("/net/etongbao/vasp/ac/resources/quartz-context.xml");  
  15.     }  
  16. }  

5.pojo类 Ledger.java

Java代码  收藏代码
  1. package net.etongbao.vasp.ac.pojo;  
  2.   
  3. import java.io.Serializable;  
  4. import java.util.Date;  
  5.   
  6. public class Ledger implements Serializable {  
  7.   
  8.     private int id;  
  9.     private Date receiptDate;  
  10.     private String memberName;  
  11.     private String checkNumber;  
  12.     private Date checkDate;  
  13.     private String paymentType;  
  14.     private double depositAmount;  
  15.     private double paymentAmount;  
  16.     private String comments;  
  17.   
  18.     public Ledger() {  
  19.         super();  
  20.     }  
  21.   
  22.     public Ledger(int id, Date receiptDate, String memberName, String checkNumber, Date checkDate, String paymentType,  
  23.             double depositAmount, double paymentAmount, String comments) {  
  24.         super();  
  25.         this.id = id;  
  26.         this.receiptDate = receiptDate;  
  27.         this.memberName = memberName;  
  28.         this.checkNumber = checkNumber;  
  29.         this.checkDate = checkDate;  
  30.         this.paymentType = paymentType;  
  31.         this.depositAmount = depositAmount;  
  32.         this.paymentAmount = paymentAmount;  
  33.         this.comments = comments;  
  34.     }  
  35.   
  36.     public int getId() {  
  37.         return id;  
  38.     }  
  39.   
  40.     public void setId(int id) {  
  41.         this.id = id;  
  42.     }  
  43.   
  44.     public Date getReceiptDate() {  
  45.         return receiptDate;  
  46.     }  
  47.   
  48.     public void setReceiptDate(Date receiptDate) {  
  49.         this.receiptDate = receiptDate;  
  50.     }  
  51.   
  52.     public String getMemberName() {  
  53.         return memberName;  
  54.     }  
  55.   
  56.     public void setMemberName(String memberName) {  
  57.         this.memberName = memberName;  
  58.     }  
  59.   
  60.     public String getCheckNumber() {  
  61.         return checkNumber;  
  62.     }  
  63.   
  64.     public void setCheckNumber(String checkNumber) {  
  65.         this.checkNumber = checkNumber;  
  66.     }  
  67.   
  68.     public Date getCheckDate() {  
  69.         return checkDate;  
  70.     }  
  71.   
  72.     public void setCheckDate(Date checkDate) {  
  73.         this.checkDate = checkDate;  
  74.     }  
  75.   
  76.     public String getPaymentType() {  
  77.         return paymentType;  
  78.     }  
  79.   
  80.     public void setPaymentType(String paymentType) {  
  81.         this.paymentType = paymentType;  
  82.     }  
  83.   
  84.     public double getDepositAmount() {  
  85.         return depositAmount;  
  86.     }  
  87.   
  88.     public void setDepositAmount(double depositAmount) {  
  89.         this.depositAmount = depositAmount;  
  90.     }  
  91.   
  92.     public double getPaymentAmount() {  
  93.         return paymentAmount;  
  94.     }  
  95.   
  96.     public void setPaymentAmount(double paymentAmount) {  
  97.         this.paymentAmount = paymentAmount;  
  98.     }  
  99.   
  100.     public String getComments() {  
  101.         return comments;  
  102.     }  
  103.   
  104.     public void setComments(String comments) {  
  105.         this.comments = comments;  
  106.     }  
  107. }  

 6. LedgerDaoImpl.java

 package net.etongbao.vasp.ac.dao.impl;

Java代码  收藏代码
  1. import java.sql.PreparedStatement;  
  2. import java.sql.SQLException;  
  3. import net.etongbao.vasp.ac.dao.LedgerDao;  
  4. import net.etongbao.vasp.ac.pojo.Ledger;  
  5. import org.springframework.beans.factory.annotation.Autowired;  
  6. import org.springframework.jdbc.core.JdbcTemplate;  
  7. import org.springframework.jdbc.core.PreparedStatementSetter;  
  8. import org.springframework.stereotype.Repository;  
  9.   
  10. /** 
  11.  * ledger数据操作类 
  12.  *  
  13.  * @author Fu Wei 
  14.  *  
  15.  */  
  16.   
  17. @Repository  
  18. public class LedgerDaoImpl implements LedgerDao {  
  19.   
  20.     private static final String SAVE_SQL = "insert into ledger_temp (rcv_dt, mbr_nm, chk_nbr, chk_dt, pymt_typ, dpst_amt, pymt_amt, comments) values(?,?,?,?,?,?,?,?)";  
  21.   
  22.     @Autowired  
  23.     private JdbcTemplate jdbcTemplate;  
  24.   
  25.     @Override  
  26.     public void save(final Ledger item) {  
  27.         jdbcTemplate.update(SAVE_SQL, new PreparedStatementSetter() {  
  28.             public void setValues(PreparedStatement stmt) throws SQLException {  
  29.                 stmt.setDate(1, new java.sql.Date(item.getReceiptDate().getTime()));  
  30.                 stmt.setString(2, item.getMemberName());  
  31.                 stmt.setString(3, item.getCheckNumber());  
  32.                 stmt.setDate(4, new java.sql.Date(item.getCheckDate().getTime()));  
  33.                 stmt.setString(5, item.getPaymentType());  
  34.                 stmt.setDouble(6, item.getDepositAmount());  
  35.                 stmt.setDouble(7, item.getPaymentAmount());  
  36.                 stmt.setString(8, item.getComments());  
  37.             }  
  38.         });  
  39.     }  
  40.   
  41. }  
 

7.接口 LedgerDao .java

Java代码  收藏代码
  1. package net.etongbao.vasp.ac.dao;  
  2.   
  3. import net.etongbao.vasp.ac.pojo.Ledger;  
  4.   
  5. public interface LedgerDao {  
  6.     public void save(final Ledger item) ;  
  7. }  

 8. JdbcTemplete 需要的LedgerRowMapper.java

  package net.etongbao.vasp.ac.batch.writer;

Java代码  收藏代码
  1. import java.sql.ResultSet;  
  2. import java.sql.SQLException;  
  3.   
  4. import net.etongbao.vasp.ac.pojo.Ledger;  
  5.   
  6. import org.springframework.jdbc.core.RowMapper;  
  7. import org.springframework.stereotype.Component;  
  8.    
  9. /** 
  10.  * ledger行的映射类 
  11.  * @author Administrator 
  12.  * 
  13.  */  
  14. @Component("ledgerRowMapper")  
  15. public class LedgerRowMapper implements RowMapper {  
  16.     public Object mapRow(ResultSet rs, int rowNum) throws SQLException {  
  17.         Ledger ledger = new Ledger();  
  18.         ledger.setId(rs.getInt("id"));  
  19.         ledger.setReceiptDate(rs.getDate("rcv_dt"));  
  20.         ledger.setMemberName(rs.getString("mbr_nm"));  
  21.         ledger.setCheckNumber(rs.getString("chk_nbr"));  
  22.         ledger.setCheckDate(rs.getDate("chk_dt"));  
  23.         ledger.setPaymentType(rs.getString("pymt_typ"));  
  24.         ledger.setDepositAmount(rs.getDouble("dpst_amt"));  
  25.         ledger.setPaymentAmount(rs.getDouble("pymt_amt"));  
  26.         ledger.setComments(rs.getString("comments"));  
  27.         return ledger;  
  28.     }  
  29. }  

9.关键类LedgerWriter.java ,写入数据,负责数据的添加 

Java代码  收藏代码
  1. package net.etongbao.vasp.ac.batch.writer;  
  2.   
  3. import java.util.List;  
  4.   
  5. import net.etongbao.vasp.ac.dao.LedgerDao;  
  6. import net.etongbao.vasp.ac.pojo.Ledger;  
  7.   
  8. import org.springframework.batch.item.ItemWriter;  
  9. import org.springframework.beans.factory.annotation.Autowired;  
  10. import org.springframework.stereotype.Component;  
  11.   
  12. /** 
  13.  * ledger写入数据 
  14.  *  
  15.  * @author Fu Wei 
  16.  *  
  17.  */  
  18. @Component("ledgerWriter")  
  19. public class LedgerWriter implements ItemWriter<Ledger> {  
  20.   
  21.     @Autowired  
  22.     private LedgerDao ledgerDao;  
  23.   
  24.     /** 
  25.      * 写入数据 
  26.      *  
  27.      * @param ledgers 
  28.      */  
  29.     public void write(List<? extends Ledger> ledgers) throws Exception {  
  30.         for (Ledger ledger : ledgers) {  
  31.             ledgerDao.save(ledger);  
  32.         }  
  33.     }  
  34.   
  35. }  

 classPath:

 <?xml version="1.0" encoding="UTF-8"?>

Xml代码  收藏代码
  1. <classpath>  
  2.     <classpathentry kind="src" path="src"/>  
  3.     <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/jrockit-jdk1.6.0_24-R28.1.3-4.0.1"/>  
  4.     <classpathentry kind="lib" path="lib/aopalliance-1.0.jar"/>  
  5.     <classpathentry kind="lib" path="lib/c3p0-0.9.1.2.jar"/>  
  6.     <classpathentry kind="lib" path="lib/commons-collections-3.2.1.jar"/>  
  7.     <classpathentry kind="lib" path="lib/commons-lang-2.3.jar"/>  
  8.     <classpathentry kind="lib" path="lib/commons-logging-1.1.1.jar"/>  
  9.     <classpathentry kind="lib" path="lib/etb-log4j-1.2.16.jar"/>  
  10.     <classpathentry kind="lib" path="lib/etb-slf4j-api-1.5.8.jar"/>  
  11.     <classpathentry kind="lib" path="lib/etb-slf4j-log4j12-1.5.8.jar"/>  
  12.     <classpathentry kind="lib" path="lib/ojdbc6.jar"/>  
  13.     <classpathentry kind="lib" path="lib/org.springframework.aop-3.0.5.RELEASE.jar"/>  
  14.     <classpathentry kind="lib" path="lib/org.springframework.asm-3.0.5.RELEASE.jar"/>  
  15.     <classpathentry kind="lib" path="lib/org.springframework.aspects-3.0.5.RELEASE.jar"/>  
  16.     <classpathentry kind="lib" path="lib/org.springframework.beans-3.0.5.RELEASE.jar"/>  
  17.     <classpathentry kind="lib" path="lib/org.springframework.context-3.0.5.RELEASE.jar"/>  
  18.     <classpathentry kind="lib" path="lib/org.springframework.context.support-3.0.5.RELEASE.jar"/>  
  19.     <classpathentry kind="lib" path="lib/org.springframework.core-3.0.5.RELEASE.jar"/>  
  20.     <classpathentry kind="lib" path="lib/org.springframework.expression-3.0.5.RELEASE.jar"/>  
  21.     <classpathentry kind="lib" path="lib/org.springframework.instrument-3.0.5.RELEASE.jar"/>  
  22.     <classpathentry kind="lib" path="lib/org.springframework.instrument.tomcat-3.0.5.RELEASE.jar"/>  
  23.     <classpathentry kind="lib" path="lib/org.springframework.jdbc-3.0.5.RELEASE.jar"/>  
  24.     <classpathentry kind="lib" path="lib/org.springframework.jms-3.0.5.RELEASE.jar"/>  
  25.     <classpathentry kind="lib" path="lib/org.springframework.orm-3.0.5.RELEASE.jar"/>  
  26.     <classpathentry kind="lib" path="lib/org.springframework.oxm-3.0.5.RELEASE.jar"/>  
  27.     <classpathentry kind="lib" path="lib/org.springframework.test-3.0.5.RELEASE.jar"/>  
  28.     <classpathentry kind="lib" path="lib/org.springframework.transaction-3.0.5.RELEASE.jar"/>  
  29.     <classpathentry kind="lib" path="lib/quartz-all-1.6.5.jar"/>  
  30.     <classpathentry kind="lib" path="lib/spring-batch-core-2.1.6.RELEASE.jar"/>  
  31.     <classpathentry kind="lib" path="lib/spring-batch-infrastructure-2.1.6.RELEASE.jar"/>  
  32.     <classpathentry kind="lib" path="lib/spring-batch-test-2.1.6.RELEASE.jar"/>  
  33.     <classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>  
  34.     <classpathentry kind="output" path="bin"/>  
  35. </classpath>  
 

  总结: 测试数据8万多条,响应时间3分多钟。

关键在于quartz-context.xml 中<bean id="ledgerReader"

class="org.springframework.batch.item.database.JdbcCursorItemReader">

<property name="dataSource" ref="dataSource" />

<property name="sql" value="select * from ledger" /> 

<property name="rowMapper" ref="ledgerRowMapper" />

</bean> 负责读取数据 ,在程序执行时一次性抓取全部数据后在批量的交给LedgerWriter进行写操作。当然也可以使用分页读取JdbcPagingItemReader,但要分页数量与写入数量要大写相同,还可以对分页出来的数据进行添加悲观锁

LedgerWriter.java 负责写入数据,每次写入1000条。

原文地址:https://www.cnblogs.com/developer-ios/p/5828955.html