Impala Catalog加载

转载:https://blog.csdn.net/hezh914/article/details/52810985

1. 问题
  在一个大型的应用系统,每天都有上百亿甚至上千亿的数据需要加载到Hadoop中,随着数据量达到海量的级别,原本可以轻松搞定的事情,现在都变得非常棘手,不管是在Oracle中还是以Impala作为实时查询引擎的Hadoop中,都会遇到很多让你日思夜想,难以入眠的问题。其中的一个问题就是Impala的元数据同步问题,比如:

为什么HIVE中有表,但在IMPALA中查询却提示表不存在,不能自动刷新
为什么HDFS中有数据文件,查询却没有数据,show partitions也不能看到完整的表分区
  这一个个诡异的问题都能让你茶不思,饭不想,下面我们就来分析一下Impala的元数据同步过程和它到底同步了什么

2. 分析
  Impala的源代码可以在GitHub上下载或在impala.io网站上在线查看。
  所有的Impala Catalog相关的操作都可以在fe目录下的代码包:com.cloudera.impala.catalog中找到。Impala有几种情况会涉及到元数据的同步操作:
  1. Impala启动时
  所有表都会放到 TableLoadingMgr.tableLoadingDeque_ 队列中,按队列进行加载,队列的声明为:

private final LinkedBlockingDeque<TTableName> tableLoadingDeque_ =
new LinkedBlockingDeque<TTableName>();

  因此可以在catalog.INFO日志中看到如下日志信息:
  TableLoadingMgr.java:278] Loading next table. Remaining items in queue: 16124
  TableLoadingMgr类的以下方法可以看到启动多线程进行表的Metadata加载工作:

private void startTableLoadingThreads() {
ExecutorService loadingPool = Executors.newFixedThreadPool(numLoadingThreads_);
try {
for (int i = 0; i < numLoadingThreads_; ++i) {
loadingPool.execute(new Runnable() {
@Override
public void run() {
while (true) {
try {
loadNextTable();
} catch (Exception e) {
LOG.error("Error loading table: ", e);
// Ignore exception.
}
}
}
});
}
} finally {
loadingPool.shutdown();
}
}


  2. 执行INVALIDATE METADATA, REFRESH SQL语句时
  这两个语句都会调用com.cloudera.impala.service.CatalogOpExecutor类的execResetMetadata方法,方法中的以下代码可以看出两个语句分别调用的方法:

if (req.isIs_refresh()) {
modifiedObjects.second = catalog_.reloadTable(req.getTable_name()); //执行REFRESH语句
} else {
wasRemoved = catalog_.invalidateTable(req.getTable_name(), modifiedObjects); //执行INVALIDATE语句,
}
//catalog_对象为:com.cloudera.impala.catalog.CatalogServiceCatalog

  3.执行DDL操作,除了会执行DDL操作外也会执行Metadata刷新操作
  比如执行增加分区操作,仍会调用com.cloudera.impala.service.CatalogOpExecutor的alterTableAddPartition方法,并在完成增加HIVE表分区后,调用本类的addHdfsPartition方法完成后续的Impala的分区刷新工作。

  不管那种操作,涉及到从Hive MetaData中加载表元数据信息时,最终都会调用到HdfsTable的load方法,从这个方法中就可以看到Impala到底需要加载和缓存那些元数据:

public void load(Table cachedEntry, HiveMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
numHdfsFiles_ = 0;
totalHdfsBytes_ = 0;
// 打印加载日志,该日志在catalog.INFO日志中是经常看见的
LOG.debug("load table: " + db_.getName() + "." + name_);

// turn all exceptions into TableLoadingException
try {
// set nullPartitionKeyValue from the hive conf.
nullPartitionKeyValue_ = client.getConfigValue(
"hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__");

// set NULL indicator string from table properties
nullColumnValue_ =
msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT);
if (nullColumnValue_ == null) nullColumnValue_ = DEFAULT_NULL_COLUMN_VALUE;

// populate with both partition keys and regular columns
List<FieldSchema> partKeys = msTbl.getPartitionKeys();
List<FieldSchema> tblFields = Lists.newArrayList();
String inputFormat = msTbl.getSd().getInputFormat();
if (HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO) {
.... 这里省去一段AVRO表的表列信息加载逻辑
} else {
fs.setType(avroType);
}
fs.setComment("from deserializer");
tblFields.add(fs);
i++;
}
}
} else {
tblFields.addAll(msTbl.getSd().getCols());
}
List<FieldSchema> fieldSchemas = new ArrayList<FieldSchema>(
partKeys.size() + tblFields.size());
fieldSchemas.addAll(partKeys);
fieldSchemas.addAll(tblFields);
// The number of clustering columns is the number of partition keys.
numClusteringCols_ = partKeys.size();
//这儿加载列以及列的统计信息,其中也包括分区列, 上面两行代码有添加
loadColumns(fieldSchemas, client);

// Collect the list of partitions to use for the table. Partitions may be reused
// from the existing cached table entry (if one exists), read from the metastore,
// or a mix of both. Whether or not a partition is reused depends on whether
// the table or partition has been modified.
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
Lists.newArrayList();
if (cachedEntry == null || !(cachedEntry instanceof HdfsTable) ||
cachedEntry.lastDdlTime_ != lastDdlTime_) {
//如果还没有缓存或表上有更改,则从HIVE METASTORE中加载所有的分区信息
msPartitions.addAll(MetaStoreUtil.fetchAllPartitions(
client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES));
} else {
// The table was already in the metadata cache and it has not been modified.
Preconditions.checkArgument(cachedEntry instanceof HdfsTable);
HdfsTable cachedHdfsTableEntry = (HdfsTable) cachedEntry;
// Set of partition names that have been modified. Partitions in this Set need to
// be reloaded from the metastore.
Set<String> modifiedPartitionNames = Sets.newHashSet();

// If these are not the exact same object, look up the set of partition names in
// the metastore. This is to support the special case of CTAS which creates a
// "temp" table that doesn't actually exist in the metastore.
if (cachedEntry != this) {
// Since the table has not been modified, we might be able to reuse some of the
// old partition metadata if the individual partitions have not been modified.
// First get a list of all the partition names for this table from the
// metastore, this is much faster than listing all the Partition objects.
modifiedPartitionNames.addAll(
client.listPartitionNames(db_.getName(), name_, (short) -1));
}

int totalPartitions = modifiedPartitionNames.size();
// Get all the partitions from the cached entry that have not been modified.
for (HdfsPartition cachedPart: cachedHdfsTableEntry.getPartitions()) {
// Skip the default partition and any partitions that have been modified.
if (cachedPart.isDirty() || cachedPart.getMetaStorePartition() == null ||
cachedPart.getId() == DEFAULT_PARTITION_ID) {
continue; //跳过默认的或需要重新加载的分区
}
org.apache.hadoop.hive.metastore.api.Partition cachedMsPart =
cachedPart.getMetaStorePartition();
Preconditions.checkNotNull(cachedMsPart);

// This is a partition we already know about and it hasn't been modified.
// No need to reload the metadata.
String cachedPartName = cachedPart.getPartitionName();
if (modifiedPartitionNames.contains(cachedPartName)) {
//这儿虽然也将不需要刷新的分区增加到了列表中,但下面的刷新方法会跳过这些分区的一些操作。
msPartitions.add(cachedMsPart);
modifiedPartitionNames.remove(cachedPartName); //将不需要重新加载的分区都从列表中移除
}
}
LOG.info(String.format("Incrementally refreshing %d/%d partitions.",
modifiedPartitionNames.size(), totalPartitions)); //这一行日志打印了增量刷新了多少个分区

// No need to make the metastore call if no partitions are to be updated.
if (modifiedPartitionNames.size() > 0) { //列表中剩下的分区就是需要重新加载的分区
// Now reload the the remaining partitions.
msPartitions.addAll(MetaStoreUtil.fetchPartitionsByName(client,
Lists.newArrayList(modifiedPartitionNames), db_.getName(), name_));
}
}

Map<String, List<FileDescriptor>> oldFileDescMap = null;
if (cachedEntry != null && cachedEntry instanceof HdfsTable) {
HdfsTable cachedHdfsTable = (HdfsTable) cachedEntry;
oldFileDescMap = cachedHdfsTable.fileDescMap_;
hostIndex_.populate(cachedHdfsTable.hostIndex_.getList());
}
/*
* 加载分区,该方法完成的工作包括:加载分区信息,加载分区下的文件列表,及文件的Block信息
* 大家可以在Catalog.INFO日志中看到如下日志信息:
* HdfsTable.java:323] load block md for xxx_table_name file 000094_0
*/
loadPartitions(msPartitions, msTbl, oldFileDescMap);

// load table stats
numRows_ = getRowCount(msTbl.getParameters()); //加载表上的统计信息
LOG.debug("table #rows=" + Long.toString(numRows_));

// For unpartitioned tables set the numRows in its partitions //这个注释和下面的操作不符啊!!
// to the table's numRows.
if (numClusteringCols_ == 0 && !partitions_.isEmpty()) {
// Unpartitioned tables have a 'dummy' partition and a default partition.
// Temp tables used in CTAS statements have one partition.
Preconditions.checkState(partitions_.size() == 2 || partitions_.size() == 1);
for (HdfsPartition p: partitions_) { //这儿为每个分区设置分区记录数
p.setNumRows(numRows_);
}
}
} catch (TableLoadingException e) {
throw e;
} catch (Exception e) {
//这个异常非常非常恼火啊,因为经常看到它。
throw new TableLoadingException("Failed to load metadata for table: " + name_, e);
}
}

3. 最后
  从上面的源码分析,可以看出Impala Catalog干的事情非常多啊,要缓存的信息也非常多,虽然说能者多劳,但这样干早晚也得出事啊。它包括

表信息
表分区信息
表及分区下的文件及Block信息
表及分区的统计信息
  需要加载和缓存的信息非常多,所以在Impala开始的启动时,如果对象太多,加载的时间非常长,就会出现查询时表不存在,但过一会就正常了的情况。文档中说可以通过设置参数:load_catalog_in_background=false,来让Impala在加载CatalogLog完成前不接受客户端连接,来避免此问题发生,但会增加Impala的启动时长(不过经过测试没达到理想效果,版本:CDH5.5)。
  在表的元数据同步失败时,就会造成最开始提出的第2个问题,查不出数据或看不到分区,这种情况可能是内存溢出造成,此时可以在catalog.ERROR中看到OutOfMemoryError异常, 也可能是期问题引起,除了增大Catalog的运行内存外,需要从源头上避免该问题发生,应该在程序设计和执行时采用以下方法:

减少表数量
减少表上的分区数,作为了一个大数据系统,单个分区是可以包含亿级数据并同时保证查询效率的
减少表或分区中的文件数,可以定期命令表或分区中的小文件
及时删除历史数据

原文地址:https://www.cnblogs.com/to-here/p/15507896.html