【Flink系列五】Flink-hive-connector 使用的HiveMetastoreClient不支持Kerberos代理

Flink-hive-connector

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>compile</scope>
</dependency>

由于目前项目中连接的是Hive 1.1.0,而HADOOP_PROXY_USER是 2.3.0才支持的 HIVE-COMMIT

可以看到HiveMetaStoreClient.java中增加了如下代码:


    //If HADOOP_PROXY_USER is set in env or property,
    //then need to create metastore client that proxies as that user.
    String HADOOP_PROXY_USER = "HADOOP_PROXY_USER";
    String proxyUser = System.getenv(HADOOP_PROXY_USER);
    if (proxyUser == null) {
      proxyUser = System.getProperty(HADOOP_PROXY_USER);
    }
    //if HADOOP_PROXY_USER is set, create DelegationToken using real user
    if(proxyUser != null) {
      LOG.info(HADOOP_PROXY_USER + " is set. Using delegation "
          + "token for HiveMetaStore connection.");
      try {
        UserGroupInformation.getLoginUser().getRealUser().doAs(
            new PrivilegedExceptionAction<Void>() {
              @Override
              public Void run() throws Exception {
                open();
                return null;
              }
            });
        String delegationTokenPropString = "DelegationTokenForHiveMetaStoreServer";
        String delegationTokenStr = getDelegationToken(proxyUser, proxyUser);
        Utils.setTokenStr(UserGroupInformation.getCurrentUser(), delegationTokenStr,
            delegationTokenPropString);
        this.conf.setVar(ConfVars.METASTORE_TOKEN_SIGNATURE, delegationTokenPropString);
        close();
      } catch (Exception e) {
        LOG.error("Error while setting delegation token for " + proxyUser, e);
        if(e instanceof MetaException) {
          throw (MetaException)e;
        } else {
          throw new MetaException(e.getMessage());
        }
      }
    }
原文地址:https://www.cnblogs.com/slankka/p/14250440.html