solr dataimport 数据导入源码分析(十三)

本文接下来分析EntityProcessor相关类,我们可以称之为实体处理器,针对不同的数据源有不同的实体处理器,屏蔽了不同数据源的差异

本文只介绍针对数据库数据源的实体处理器,其他实体处理器类似

EntityProcessor类为抽象类,定义了获取数据源的Map类型数据的方法(针对添加 修改 删除的数据)

/**
 * <p>
 * An instance of entity processor serves an entity. It is reused throughout the
 * import process.
 * </p>
 * <p/>
 * <p>
 * Implementations of this abstract class must provide a public no-args constructor.
 * </p>
 * <p/>
 * <p>
 * Refer to <a
 * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
 * for more details.
 * </p>
 * <p/>
 * <b>This API is experimental and may change in the future.</b>
 *
 * @version $Id: EntityProcessor.java 824359 2009-10-12 14:31:54Z ehatcher $
 * @since solr 1.3
 */
public abstract class EntityProcessor {

  /**
   * This method is called when it starts processing an entity. When it comes
   * back to the entity it is called again. So it can reset anything at that point.
   * For a rootmost entity this is called only once for an ingestion. For sub-entities , this
   * is called multiple once for each row from its parent entity
   *
   * @param context The current context
   */
  public abstract void init(Context context);

  /**
   * This method helps streaming the data for each row . The implementation
   * would fetch as many rows as needed and gives one 'row' at a time. Only this
   * method is used during a full import
   *
   * @return A 'row'.  The 'key' for the map is the column name and the 'value'
   *         is the value of that column. If there are no more rows to be
   *         returned, return 'null'
   */
  public abstract Map<String, Object> nextRow();

  /**
   * This is used for delta-import. It gives the pks of the changed rows in this
   * entity
   *
   * @return the pk vs value of all changed rows
   */
  public abstract Map<String, Object> nextModifiedRowKey();

  /**
   * This is used during delta-import. It gives the primary keys of the rows
   * that are deleted from this entity. If this entity is the root entity, solr
   * document is deleted. If this is a sub-entity, the Solr document is
   * considered as 'changed' and will be recreated
   *
   * @return the pk vs value of all changed rows
   */
  public abstract Map<String, Object> nextDeletedRowKey();

  /**
   * This is used during delta-import. This gives the primary keys and their
   * values of all the rows changed in a parent entity due to changes in this
   * entity.
   *
   * @return the pk vs value of all changed rows in the parent entity
   */
  public abstract Map<String, Object> nextModifiedParentRowKey();

  /**
   * Invoked for each parent-row after the last row for this entity is processed. If this is the root-most
   * entity, it will be called only once in the import, at the very end.
   * 
   */
  public abstract void destroy();

  /**
   * Invoked after the transformers are invoked. EntityProcessors can add, remove or modify values
   * added by Transformers in this method.
   *
   * @param r The transformed row
   * @since solr 1.4
   */
  public void postTransform(Map<String, Object> r) {
  }

  /**
   * Invoked when the Entity processor is destroyed towards the end of import.
   *
   * @since solr 1.4
   */
  public void close() {
    //no-op
  }
}

继承类EntityProcessorBase是所有具体实体处理器的基类,定义了公用方法,其中最重要的是Map<String, Object> getNext(),从数据迭代器Iterator<Map<String, Object>> rowIterator获取Map类型数据记录(其中DIHCacheSupport cacheSupport对象用于缓存

protected Map<String, Object> getNext() {
    if(cacheSupport==null) {
      try {
        if (rowIterator == null)
          return null;
        if (rowIterator.hasNext())
          return rowIterator.next();
        query = null;
        rowIterator = null;
        return null;
      } catch (Exception e) {
        SolrException.log(log, "getNext() failed for query '" + query + "'", e);
        query = null;
        rowIterator = null;
        wrapAndThrow(DataImportHandlerException.WARN, e);
        return null;
      }
    } else  {
      return cacheSupport.getCacheData(context, query, rowIterator);
    }      
  }

SqlEntityProcessor类为数据库数据源的实体处理器

/**
 * <p>
 * An {@link EntityProcessor} instance which provides support for reading from
 * databases. It is used in conjunction with {@link JdbcDataSource}. This is the default
 * {@link EntityProcessor} if none is specified explicitly in data-config.xml
 * </p>
 * <p/>
 * <p>
 * Refer to <a
 * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
 * for more details.
 * </p>
 * <p/>
 * <b>This API is experimental and may change in the future.</b>
 *
 * @version $Id: SqlEntityProcessor.java 1065312 2011-01-30 16:08:25Z rmuir $
 * @since solr 1.3
 */
public class SqlEntityProcessor extends EntityProcessorBase {
  private static final Logger LOG = LoggerFactory.getLogger(SqlEntityProcessor.class);
  //数据源
  protected DataSource<Iterator<Map<String, Object>>> dataSource;
  //初始化数据源
  @Override
  @SuppressWarnings("unchecked")
  public void init(Context context) {
    super.init(context);
    dataSource = context.getDataSource();
  }
  //初始化数据迭代器(根据查询语句从数据源获取)
  protected void initQuery(String q) {
    try {
      DataImporter.QUERY_COUNT.get().incrementAndGet();
      rowIterator = dataSource.getData(q);
      this.query = q;
    } catch (DataImportHandlerException e) {
      throw e;
    } catch (Exception e) {
      LOG.error( "The query failed '" + q + "'", e);
      throw new DataImportHandlerException(DataImportHandlerException.SEVERE, e);
    }
  }

  @Override
  public Map<String, Object> nextRow() {    
    if (rowIterator == null) {
      String q = getQuery();
      initQuery(context.replaceTokens(q));
    }
    return getNext();
  }

  @Override
  public Map<String, Object> nextModifiedRowKey() {
    if (rowIterator == null) {
      String deltaQuery = context.getEntityAttribute(DELTA_QUERY);
      if (deltaQuery == null)
        return null;
      initQuery(context.replaceTokens(deltaQuery));
    }
    return getNext();
  }

  @Override
  public Map<String, Object> nextDeletedRowKey() {
    if (rowIterator == null) {
      String deletedPkQuery = context.getEntityAttribute(DEL_PK_QUERY);
      if (deletedPkQuery == null)
        return null;
      initQuery(context.replaceTokens(deletedPkQuery));
    }
    return getNext();
  }

  @Override
  public Map<String, Object> nextModifiedParentRowKey() {
    if (rowIterator == null) {
      String parentDeltaQuery = context.getEntityAttribute(PARENT_DELTA_QUERY);
      if (parentDeltaQuery == null)
        return null;
      LOG.info("Running parentDeltaQuery for Entity: "
              + context.getEntityAttribute("name"));
      initQuery(context.replaceTokens(parentDeltaQuery));
    }
    return getNext();
  }
  
  public String getQuery() {
    String queryString = context.getEntityAttribute(QUERY);
    if (Context.FULL_DUMP.equals(context.currentProcess())) {
      return queryString;
    }
    if (Context.DELTA_DUMP.equals(context.currentProcess())) {
      String deltaImportQuery = context.getEntityAttribute(DELTA_IMPORT_QUERY);
      if(deltaImportQuery != null) return deltaImportQuery;
    }
    LOG.warn("'deltaImportQuery' attribute is not specified for entity : "+ entityName);
    return getDeltaImportQuery(queryString);
  }

  public String getDeltaImportQuery(String queryString) {    
    StringBuilder sb = new StringBuilder(queryString);
    if (SELECT_WHERE_PATTERN.matcher(queryString).find()) {
      sb.append(" and ");
    } else {
      sb.append(" where ");
    }
    boolean first = true;
    String[] primaryKeys = context.getEntityAttribute("pk").split(",");
    for (String primaryKey : primaryKeys) {
      if (!first) {
        sb.append(" and ");
      }
      first = false;
      Object val = context.resolve("dataimporter.delta." + primaryKey);
      if (val == null) {
        Matcher m = DOT_PATTERN.matcher(primaryKey);
        if (m.find()) {
          val = context.resolve("dataimporter.delta." + m.group(1));
        }
      }
      sb.append(primaryKey).append(" = ");
      if (val instanceof Number) {
        sb.append(val.toString());
      } else {
        sb.append("'").append(val.toString()).append("'");
      }
    }
    return sb.toString();
  }

  private static Pattern SELECT_WHERE_PATTERN = Pattern.compile(
          "^\\s*(select\\b.*?\\b)(where).*", Pattern.CASE_INSENSITIVE);

  public static final String QUERY = "query";

  public static final String DELTA_QUERY = "deltaQuery";

  public static final String DELTA_IMPORT_QUERY = "deltaImportQuery";

  public static final String PARENT_DELTA_QUERY = "parentDeltaQuery";

  public static final String DEL_PK_QUERY = "deletedPkQuery";

  public static final Pattern DOT_PATTERN = Pattern.compile(".*?\\.(.*)$");
}

 我们接下来分析EntityProcessorWrapper类,该类继承自抽象类EntityProcessor,用于装饰具体的实体处理器(装饰模式)

其重要成员如下

 //被装饰的实体处理器 
 EntityProcessor delegate;
  private DocBuilder docBuilder;

  String onError;
  Context context;
  protected VariableResolverImpl resolver;
  String entityName;

  protected List<Transformer> transformers;

  protected List<Map<String, Object>> rowcache;

在它的构造方法里面,初始化被装饰的成员对象

public EntityProcessorWrapper(EntityProcessor delegate, DocBuilder docBuilder) {
    this.delegate = delegate;
    this.docBuilder = docBuilder;
  }

初始化方法里面调用被装饰对象的初始化方法(获取数据源) 

@Override
  public void init(Context context) {
    rowcache = null;
    this.context = context;
    resolver = (VariableResolverImpl) context.getVariableResolver();
    //context has to be set correctly . keep the copy of the old one so that it can be restored in destroy
    if (entityName == null) {
      onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
      if (onError == null) {
          onError = ABORT;
      }
      entityName = context.getEntityAttribute(DataConfig.NAME);
    }
    delegate.init(context);

  }

其他相关方法均为调用被装饰的具体实体处理器的相应方法,另外添加了数据转换等功能,本文不再具体分析 

---------------------------------------------------------------------------

本系列solr dataimport 数据导入源码分析系本人原创

转载请注明出处 博客园 刺猬的温驯

本文链接 http://www.cnblogs.com/chenying99/archive/2013/05/04/3059397.html

原文地址:https://www.cnblogs.com/chenying99/p/3059397.html