DataX源码分析(2)

接着上一节…… 做好了切分工作,下一步当然就是对对应的各个任务进行任务托管和监控:schedule,post,postHandle,invokeHooks。 schedule首先完成的工作是把上一步reader和writer split的结果整合到具体的taskGroupContainer中。

int channelsPerTaskGroup = this.configuration.getInt(
        CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
int taskNumber = this.configuration.getList(
        CoreConstant.DATAX_JOB_CONTENT).size();

this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
PerfTrace.getInstance().setChannelNumber(needChannelNumber);

从上面的代码看出,在不配置splitPk的情况下,单表etl不管配置channel值为大于一的任何值,最后的channel数都为1。 公平的分配 task 到对应的 taskGroup中,返回Configuration集合。将执行模式置为STANDALONE,交给AbstractScheduler,启动所有的任务线程startAllTaskGroup(configurations)。

@Override
public void startAllTaskGroup(List<Configuration> configurations) {
	this.taskGroupContainerExecutorService = Executors
		.newFixedThreadPool(configurations.size());
	for (Configuration taskGroupConfiguration : configurations) {
	    TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
	    this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
	}
	this.taskGroupContainerExecutorService.shutdown();
}

真正担任执行的是TaskGroupContainer.start()方法,该方法参与状态汇报和task启动,并实现自动容错机制。具体的sql执行由plugin对应数据库代码完成。

 while (true) {
    	//1.判断task状态
    	boolean failedOrKilled = false;
    	Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap();
    	for(Map.Entry<Integer, Communication> entry : communicationMap.entrySet()){
    		Integer taskId = entry.getKey();
    		Communication taskCommunication = entry.getValue();
           	.............
    	
        // 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误
        if (failedOrKilled) {
            lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                    lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);

            throw DataXException.asDataXException(
                    FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());
        }
        //3.有任务未执行,且正在运行的任务数小于最大通道限制
        Iterator<Configuration> iterator = taskQueue.iterator();
        while(iterator.hasNext() && runTasks.size() < channelNumber){
            Configuration taskConfig = iterator.next();
            Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
            int attemptCount = 1;
            TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);
            if(lastExecutor!=null){
                attemptCount = lastExecutor.getAttemptCount() + 1;
                long now = System.currentTimeMillis();
                long failedTime = lastExecutor.getTimeStamp();
                if(now - failedTime < taskRetryIntervalInMsec){  //未到等待时间,继续留在队列
                    continue;
                }
		.............
        //4.任务列表为空,executor已结束, 搜集状态为success--->成功
        if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) {
        	// 成功的情况下,也需要汇报一次。否则在任务结束非常快的情况下,采集的信息将会不准确
            lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                    lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
            LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId);
            break;
        }
        // 5.如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报
        long now = System.currentTimeMillis();
        if (now - lastReportTimeStamp > reportIntervalInMillSec) {
            lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                    lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
            lastReportTimeStamp = now;
           //taskMonitor对于正在运行的task,每reportIntervalInMillSec进行检查
            for(TaskExecutor taskExecutor:runTasks){
                taskMonitor.report(taskExecutor.getTaskId(),this.containerCommunicator.getCommunication(taskExecutor.getTaskId()));
            }
        }
        Thread.sleep(sleepIntervalInMillSec);
    }

不断循环communication传过来的消息进行收集处理。

	while (true) {
	/**
	 * step 1: collect job stat
	 * step 2: getReport info, then report it
	 * step 3: errorLimit do check
	 * step 4: dealSucceedStat();
	 * step 5: dealKillingStat();
	 * step 6: dealFailedStat();
	 * step 7: refresh last job stat, and then sleep for next while
	 *
	 * above steps, some ones should report info to DS
	 *
	 */
	Communication nowJobContainerCommunication = this.containerCommunicator.collect();
	nowJobContainerCommunication.setTimestamp(System.currentTimeMillis());
	LOG.debug(nowJobContainerCommunication.toString());
	//汇报周期
	long now = System.currentTimeMillis();
	if (now - lastReportTimeStamp > jobReportIntervalInMillSec) {
	    Communication reportCommunication = CommunicationTool
		    .getReportCommunication(nowJobContainerCommunication, lastJobContainerCommunication, totalTasks);
	    this.containerCommunicator.report(reportCommunication);
	    lastReportTimeStamp = now;
	    lastJobContainerCommunication = nowJobContainerCommunication;
	}
	errorLimit.checkRecordLimit(nowJobContainerCommunication);
	if (nowJobContainerCommunication.getState() == State.SUCCEEDED) {
	    LOG.info("Scheduler accomplished all tasks.");
	    break;
	}
	if (isJobKilling(this.getJobId())) {
	    dealKillingStat(this.containerCommunicator, totalTasks);
	} else if (nowJobContainerCommunication.getState() == State.FAILED) {
	    dealFailedStat(this.containerCommunicator, nowJobContainerCommunication.getThrowable());
	}
	Thread.sleep(jobSleepIntervalInMillSec);
	}

post方法对于reader啥也没干,对writer对处理数据后要做的事儿。

 if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
    // 说明有 postSql 配置,则此处删除掉
    originalConfig.remove(Key.POST_SQL);

    Connection conn = DBUtil.getConnection(this.dataBaseType,
            jdbcUrl, username, password);

    LOG.info(
            "Begin to execute postSqls:[{}]. context info:{}.",
            StringUtils.join(renderedPostSqls, ";"), jdbcUrl);
    WriterUtil.executeSqls(conn, renderedPostSqls, jdbcUrl, dataBaseType);
    DBUtil.closeDBResources(null, null, conn);
}

好啦,大致重要的方法就这些

原文地址:https://www.cnblogs.com/muzhongjiang/p/13156967.html