Activiti 分布式方案实现探讨

一、运行环境

public interface ProcessEngine extends EngineServices {

  /** the version of the activiti library */
  public static String VERSION = "5.20.0.1";

  /** The name as specified in 'process-engine-name' in 
   * the activiti.cfg.xml configuration file.
   * The default name for a process engine is 'default */
  String getName();

  void close();
} 

二、不支持分布式原因分析

在Activiti工作流的act_ge_property表中通常情况下有3条记录:

  1. next.dbid
  2. schema.history
  3. schema.version
其中next.dbid对应的值为数据库中当前最近一次增长后的最大记录id,每次增长的步长为2500。
protected int idBlockSize = 2500;   //在ProcessEngineConfiguration类中

Activiti中所有的id(如:Task的id,Execution的id,ProcessInstance的id等)都是通过IdGenerator来生成的

/**
 * generates {@link IdBlock}s that are used to assign ids to new objects.
 * 
 * The scope of an instance of this class is process engine,
 * which means that there is only one instance in one process engine instance.
 */
public interface IdGenerator {

  String getNextId();

}

IdGenerator的默认实现是

/**
 * @author Tom Baeyens
 */
public class DbIdGenerator implements IdGenerator {

  protected int idBlockSize;
  protected long nextId = 0;
  protected long lastId = -1;
  
  protected CommandExecutor commandExecutor;
  protected CommandConfig commandConfig;
  
  public synchronized String getNextId() {
    if (lastId<nextId) {
      getNewBlock();
    }
    long _nextId = nextId++;
    return Long.toString(_nextId);
  }

  protected synchronized void getNewBlock() {
    IdBlock idBlock = commandExecutor.execute(commandConfig, new GetNextIdBlockCmd(idBlockSize));
    this.nextId = idBlock.getNextId();
    this.lastId = idBlock.getLastId();
  }
}
从上面的代码可以看出,获取下一个id的方法是加锁的,也就是在一台服务器上id的增长是没有问题的,但是如果将Activiti部署在多台服务器上就会有两个问题
  1. 从代码的第17,18行可以看出id是本地自增,如果有多台服务器就会出现id相同的情况(由并发写造成的);
  2. 获取lastId的方法是操作同一个数据库的,会有问题,代码22中通过执行GetNextIdBlockCmd来获取数据库中的next.dbid的值,如果在多台服务器上由于一台服务器修改后,其他服务器无法知道
/**
 * @author Tom Baeyens
 */
public class GetNextIdBlockCmd implements Command<IdBlock> {
  
  private static final long serialVersionUID = 1L;
  protected int idBlockSize;
  
  public GetNextIdBlockCmd(int idBlockSize) {
    this.idBlockSize = idBlockSize;
  }

  public IdBlock execute(CommandContext commandContext) {
    PropertyEntity property = (PropertyEntity) commandContext
      .getPropertyEntityManager()
      .findPropertyById("next.dbid");
    long oldValue = Long.parseLong(property.getValue());
    long newValue = oldValue+idBlockSize;
    property.setValue(Long.toString(newValue));
    return new IdBlock(oldValue, newValue-1);
  }
} 

三、解决方案

要想解决Activiti分布式的问题,就需要解决id生成的问题,也就是要自己实现IdGenerator接口,因此要有一个地方来生成一个全局唯一的id才行。可以通过redis来实现,redis做集群不需要考虑单点问题,具体方案如下:
/**
 * 分布式id生成器
 * 
 */
public class DistributedIdGenerator implements IdGenerator {

    public DistributedIdGenerator(RedisService redisService) {
        this.redisService = redisService;
    }

    private RedisService redisService;

    @Override
    public String getNextId() {
        return String.format("%sX%s", D.formatDate(Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_PREFIX),
            this.redisService.incrby(Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_KEY, 1L));
    }
}
其中,D.formatDate(Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_PREFIX)是通过服务器时间来生成id的前缀,this.redisService.incrby(MainRK.ACTIVITI_ENGINE_DISTRIBUTED_ID_KEY, 1L)在redis中获取key (Constants.ACTIVITI_ENGINE_DISTRIBUTED_ID_KEY)对应的值,并自增1。 在实际工作中通过该方案可以解决Activiti分布式问题。
原文地址:https://www.cnblogs.com/johnvwan/p/15630018.html