activiti主要组件解析

Activiti内部实现中,各主要部件关系

image

对外,提供Service服务,它是无状态的。

这些Service包括:

  • protected RepositoryService repositoryService = new RepositoryServiceImpl();
  • protected RuntimeService runtimeService = new RuntimeServiceImpl();
  • protected HistoryService historyService = new HistoryServiceImpl();
  • protected IdentityService identityService = new IdentityServiceImpl();
  • protected TaskService taskService = new TaskServiceImpl();
  • protected FormService formService = new FormServiceImpl();
  • protected ManagementService managementService = new ManagementServiceImpl();

 

对service提供的服务,基本上每一个需要执行具体任务的方法,都有一个对应的Command实现与之对应。

  • 一般来说,一个Command对应一个完整事物;
  • 调用Command之前,都会经过CommandInterceptor,这个在初始化时就确定了,如:LogInterceptor >> CommandContextInterceptor >> CommandInvoker

 

如果Command不涉及节点相关内容,而是直接对数据库进行操作,则直接关联DB,入DbSqlSession的缓存;

否则,一般会通过ExecutionEntity来完成具体的操作,这里,封装了一些基本的原子操作,它们都是AtomicOperation的接口实现:

流程实例类:

  • AtomicOperationProcessStart
  • AtomicOperationProcessStartInitial
  • AtomicOperationProcessEnd

流程活动类:

  • AtomicOperationActivityStart
  • AtomicOperationActivityExecute
  • AtomicOperationActivityEnd

流程活动流转类:

  • AtomicOperationTransitionNotifyListenerEnd
  • AtomicOperationTransitionDestroyScope
  • AtomicOperationTransitionNotifyListenerTake
  • AtomicOperationTransitionCreateScope
  • AtomicOperationTransitionNotifyListenerStart

流程执行树清理类:

  • AtomicOperationDeleteCascade
  • AtomicOperationDeleteCascadeFireActivityEnd

 

其中,活动执行时,具体动作是由ActivityBehavior的实现类来决定的:

image

关于默认ID生成器

Activiti的默认生成器是DbIdGenerator,实际是一次从数据库中取一块数据,然后慢慢用,用完后再取。

/**
 * @author Tom Baeyens
 */
public class DbIdGenerator implements IdGenerator {

  protected int idBlockSize;
  protected long nextId = 0;
  protected long lastId = -1;
  
  protected CommandExecutor commandExecutor;
  protected CommandConfig commandConfig;
  
  public synchronized String getNextId() {
    if (lastId<nextId) {
      getNewBlock();
    }
    long _nextId = nextId++;
    return Long.toString(_nextId);
  }

  protected synchronized void getNewBlock() {
    IdBlock idBlock = commandExecutor.execute(commandConfig, new GetNextIdBlockCmd(idBlockSize));
    this.nextId = idBlock.getNextId();
    this.lastId = idBlock.getLastId();
  }
  • 在ProcessEngineConfiguration中定义的块默认大小100;
  • 在ProcessEngineComfigurationImpl中,完成初始化:
  // id generator /////////////////////////////////////////////////////////////
  
  protected void initIdGenerator() {
    if (idGenerator==null) {
      CommandExecutor idGeneratorCommandExecutor = null;
      if (idGeneratorDataSource!=null) {
        ProcessEngineConfigurationImpl processEngineConfiguration = new StandaloneProcessEngineConfiguration();
        processEngineConfiguration.setDataSource(idGeneratorDataSource);
        processEngineConfiguration.setDatabaseSchemaUpdate(DB_SCHEMA_UPDATE_FALSE);
        processEngineConfiguration.init();
        idGeneratorCommandExecutor = processEngineConfiguration.getCommandExecutor();
      } else if (idGeneratorDataSourceJndiName!=null) {
        ProcessEngineConfigurationImpl processEngineConfiguration = new StandaloneProcessEngineConfiguration();
        processEngineConfiguration.setDataSourceJndiName(idGeneratorDataSourceJndiName);
        processEngineConfiguration.setDatabaseSchemaUpdate(DB_SCHEMA_UPDATE_FALSE);
        processEngineConfiguration.init();
        idGeneratorCommandExecutor = processEngineConfiguration.getCommandExecutor();
      } else {
        idGeneratorCommandExecutor = getCommandExecutor();
      }
      
      DbIdGenerator dbIdGenerator = new DbIdGenerator();
      dbIdGenerator.setIdBlockSize(idBlockSize);
      dbIdGenerator.setCommandExecutor(idGeneratorCommandExecutor);
      dbIdGenerator.setCommandConfig(getDefaultCommandConfig().transactionRequiresNew());
      idGenerator = dbIdGenerator;
    }
  }

注意:此处对getNextId()方法加了synchronize关键字,它在单机部署下,确定不会出现网上分析的什么ID重复问题。

关于task的start_time_字段取值问题

在TaskEntity中:

  /** creates and initializes a new persistent task. */
  public static TaskEntity createAndInsert(ActivityExecution execution) {
    TaskEntity task = create();
    task.insert((ExecutionEntity) execution);
    return task;
  }

  public void insert(ExecutionEntity execution) {
    CommandContext commandContext = Context.getCommandContext();
    DbSqlSession dbSqlSession = commandContext.getDbSqlSession();
    dbSqlSession.insert(this);
    
    if(execution != null) {
      execution.addTask(this);
    }
    
    commandContext.getHistoryManager().recordTaskCreated(this, execution);
  }

/*
* 。。。。
*/

  /**  Creates a new task.  Embedded state and create time will be initialized.
   * But this task still will have to be persisted. See {@link #insert(ExecutionEntity)}. */
  public static TaskEntity create() {
    TaskEntity task = new TaskEntity();
    task.isIdentityLinksInitialized = true;
    task.createTime = ClockUtil.getCurrentTime();
    return task;
  }

由此知道,活动的时间是由ClockUtil.getCurrentTime()决定的。再来看看CockUtil的源码:

/**
 * @author Joram Barrez
 */
public class ClockUtil {
  
  private volatile static Date CURRENT_TIME = null;
  
  public static void setCurrentTime(Date currentTime) {
    ClockUtil.CURRENT_TIME = currentTime;
  }
  
  public static void reset() {
    ClockUtil.CURRENT_TIME = null;
  } 
  
  public static Date getCurrentTime() {
    if (CURRENT_TIME != null) {
      return CURRENT_TIME;
    }
    return new Date();
  }

}

注意:

因为可能多线程情况,而且只有set,不会执行类似++,--这样的操作,所以这里用volatile关键字完全满足需要。

默认实现在分布式中问题

在上面介绍了DbIdGenerator以及ClockUtil之后,可以清楚明白他们的原理,那么在分布式部署中,如果还是使用这种默认的实现而不加以改善,会出现什么问题。

1.DbIdGenerator的getNextId()/getNewBlock()两个方法,在分布式主机中,synchronize不能顺利实现锁控制;

2.ClockUtil严重依赖容器所在服务器时间,但是分布式主机的时间不可能达到完全的同步;

3.在分布式主机中,对同一个任务,可以同时执行,因为他们都是DbSqlSession缓存,不会立马入库;也就是说,可能存在一个任务被同时自行两次的情况。

对1,2两点分析,基本上是确定会存在的,至于第三点,限于猜想,不知道实际是否有相关的解决策略,目前对activiti关于此处的设置猜想还没有完全弄清楚。

其实之所以那么在乎任务Id以及任务执行时间,主要是在流程跟踪图中,需要根据有序的历史任务结果集模仿重现走过的路径,而做到有序,这两个要素是非常关键的。

对分布式应用,如果同库,那ID的生成问题都会是一个问题,常见的解决方式是把他扔给数据库去解决,比如一个序列、一个类似序列的自定义函数等都是不错的选择。

当然,放到DB中以后,那么频繁访问数据库,activiti中设计的blocksize也就基本失效了,这个也算是衡量的结果吧。

实际问题分析

image

上述数据,是在生产上出现的问题数据,环境是为了负载均衡做了两个应用Server,同时连接一个DB;

从数据可以分析出“活动节点会在双机中运行”;

61011(A) >> 60852(B) >> 60866(B) >> 61022(A) >> 61028(A) >> 60889(B) >> 60893(B)

A机器上的61011执行完毕以后,事件如何转到B机器上的60852,这个还不明白,待解决!!

原文地址:https://www.cnblogs.com/huntdream/p/6036125.html