【Flink】kerberos认证下各种方式连接elasticsearch研究与方案

kerberos,一种反人类的安全验证模式,在大数据领域居然用的很多。

elasticsearch,一种存储引擎。目前研究下来,flink和spark都是居于http方式去读写的。

HttpClient方式

public static HttpClient buildSpengoHttpClient() {
        HttpClientBuilder builder = HttpClientBuilder.create();

        SSLContext sslContext = null;
        try {
            sslContext = SSLContexts.custom().loadTrustMaterial(null, new TrustAllStrategy()).build();
        } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
            e.printStackTrace();
        }

        SSLConnectionSocketFactory scsf = new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE);
        builder.setSSLSocketFactory(scsf);
        Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create().
                register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(true)).build();
        builder.setDefaultAuthSchemeRegistry(authSchemeRegistry);

        BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(new AuthScope(null, -1, AuthScope.ANY_REALM,AuthSchemes.SPNEGO), new Credentials() {
            @Override
            public Principal getUserPrincipal() {
                return null;
            }

            @Override
            public String getPassword() {
                return null;
            }
        });
        builder.setDefaultCredentialsProvider(credentialsProvider);
        return builder.build();

    }
public static void main(String[] args) {
        String user = "tss@HADOOP.COM";
        String keytab = "user.keytab";
        String krb5Location = "/etc/krb5.conf";
        String url = "https://fusioninsight02:24100";

        System.setProperty("sun.security.spnego.debug", "true");
        System.setProperty("sun.security.krb5.debug", "true");
        System.setProperty("java.security.krb5.conf", krb5Location);
        Configuration config = new Configuration() {
            @Override
            public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
                Map<String, Object> configMap = new HashMap<>();
                configMap.put("useTicketCache", "false");
                configMap.put("useKeyTab", "true");
                configMap.put("keyTab", keytab);
                //Krb5 in GSS API needs to be refreshed so it does not throw the error
                //Specified version of key is not available
                configMap.put("refreshKrb5Config", "true");
                configMap.put("principal", user);
                configMap.put("storeKey", "true");
                configMap.put("doNotPrompt", "true");
                configMap.put("isInitiator", "true");
                configMap.put("debug", "true");
                return new AppConfigurationEntry[]{
                        new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule",
                                AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
                                configMap)
                };
            }
        };


        Set<Principal> principals = new HashSet<>(1);
        principals.add(new KerberosPrincipal(user));
        Subject sub = new Subject(false, principals, new HashSet<>(), new HashSet<>());

        try {
            LoginContext loginContext = new LoginContext("Krb5Login", sub, null, config);
            loginContext.login();
            Subject serviceSubject = loginContext.getSubject();

            HttpResponse response = Subject.doAs(serviceSubject, new PrivilegedAction<HttpResponse>() {
                HttpResponse httpResponse = null;

                @Override
                public HttpResponse run() {

                    try {
                        HttpUriRequest request = new HttpGet(url);
                        HttpClient spnegoHttpClient = buildSpengoHttpClient();
                        httpResponse = spnegoHttpClient.execute(request);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }

                    return httpResponse;
                }
            });
            InputStream is = response.getEntity().getContent();
            System.out.println("Status code " + response.getStatusLine().getStatusCode());
            System.out.println("message is :"+ Arrays.deepToString(response.getAllHeaders()));
            System.out.println("string:
"+new String(IOUtils.toByteArray(is), "UTF-8"));


        } catch (LoginException | IOException e) {
            e.printStackTrace();
        }

    }

核心代码:

在构建httpClient时,将认证的schema改为SPNEGO。在注明该方式后,httpclient首先会使用basic方式请求,在返回401后,会切换为Negotiate

方式再次进行认证。

该方式有个明显的问题,Subject.doAs 可以保证代码块内的当前线程认证有效。但是多线程下会失效。

另外该方式经测试,httpclient:4.5.5以下版本可行。高版本使用方式目前研究阶段还没有找到

HttpAsyncClient方式

由于Elasticsearch提供的RestClient是基于HttpAsyncClient进行的一层封装,所以如果在flink和spark中连接es,研究该方式连接是必要的。

由于该方式是异步的,Subject.doAs就不能满足了。

这里主要参考jdk文档:http://www.jtech.ua.es/j2ee/restringido/documents/jdk-6u21/technotes/guides/security/jgss/single-signon.html

使用doAsPrivileged,复用Subject生成的ticket

大致代码

package cn.es;

import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.KerberosCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.config.Lookup;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.impl.auth.SPNegoSchemeFactory;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClientBuilder;
import org.ietf.jgss.*;

import javax.net.ssl.SSLContext;
import javax.security.auth.Subject;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class SpnegoHttpClientConfigCallbackHandler implements RestClientBuilder.HttpClientConfigCallback {

    private static final String SUN_KRB5_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule";
    private static final String CRED_CONF_NAME = "ESClientLoginConf";
    private static final Oid SPNEGO_OID = getSpnegoOid();

    private static Oid getSpnegoOid() {
        Oid oid = null;
        try {
            oid = new Oid("1.3.6.1.5.5.2");
        } catch (GSSException e) {
            e.printStackTrace();
        }
        return oid;
    }

    private final String userPrincipalName = "tss@HADOOP.COM";
    private final String keytabPath = "user.keytab";
    private LoginContext loginContext;

    @Override
    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
        setupSpnegoAuthSchemeSupport(httpClientBuilder);
        return httpClientBuilder;
    }

    private void setupSpnegoAuthSchemeSupport(HttpAsyncClientBuilder httpClientBuilder) {
        final Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
                .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory()).build();

        final GSSManager gssManager = GSSManager.getInstance();

        try {

            final GSSName gssName = gssManager.createName("tss@HADOOP.COM", GSSName.NT_USER_NAME);
            login();
            final AccessControlContext acc = AccessController.getContext();
            final GSSCredential credential = doAsPrivilegedWrapper(loginContext.getSubject(), () -> gssManager.createCredential(gssName,
                    GSSCredential.DEFAULT_LIFETIME, SPNEGO_OID, GSSCredential.INITIATE_ONLY), acc);

            final KerberosCredentialsProvider credentialsProvider = new KerberosCredentialsProvider();
            credentialsProvider.setCredentials(new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM, AuthSchemes.SPNEGO), new KerberosCredentials(credential));
            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
        } catch (GSSException | PrivilegedActionException e) {
            e.printStackTrace();
        }

        SSLContext sc = null;
        try {
            javax.net.ssl.TrustManager[] trustAllCerts = new javax.net.ssl.TrustManager[1];
            javax.net.ssl.TrustManager tm = new miTM();
            trustAllCerts[0] = tm;
            sc = SSLContext.getInstance("SSL");
            sc.init(null, trustAllCerts, null);
        } catch (Exception ex) {
        }
        httpClientBuilder.setSSLContext(sc);
        httpClientBuilder.setSSLHostnameVerifier((s, sslSession) -> true);

        httpClientBuilder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
    }

    public synchronized LoginContext login() throws PrivilegedActionException {
        if (this.loginContext == null) {
            AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> {
                final Subject subject = new Subject(false, Collections.singleton(new KerberosPrincipal(userPrincipalName)),
                        Collections.emptySet(), Collections.emptySet());
                Configuration conf;
                final CallbackHandler callback;
                conf = new KeytabJaasConf(userPrincipalName, keytabPath, true);
                callback = null;
                loginContext = new LoginContext(CRED_CONF_NAME, subject, callback, conf);
                loginContext.login();
                return null;
            });
        }

        return loginContext;
    }

    static <T> T doAsPrivilegedWrapper(final Subject subject, final PrivilegedExceptionAction<T> action, final AccessControlContext acc)
            throws PrivilegedActionException {
        try {
            return AccessController.doPrivileged((PrivilegedExceptionAction<T>) () -> Subject.doAsPrivileged(subject, action, acc));
        } catch (PrivilegedActionException pae) {
            if (pae.getCause() instanceof PrivilegedActionException) {
                throw (PrivilegedActionException) pae.getCause();
            }
            throw pae;
        }
    }

    private static class KerberosCredentialsProvider implements CredentialsProvider {
        private AuthScope authScope;
        private Credentials credentials;

        @Override
        public void setCredentials(AuthScope authscope, Credentials credentials) {
            if (authscope.getScheme().regionMatches(true, 0, AuthSchemes.SPNEGO, 0, AuthSchemes.SPNEGO.length()) == false) {
                throw new IllegalArgumentException("Only " + AuthSchemes.SPNEGO + " auth scheme is supported in AuthScope");
            }
            this.authScope = authscope;
            this.credentials = credentials;
        }

        @Override
        public Credentials getCredentials(AuthScope authscope) {
            assert this.authScope != null && authscope != null;
            return authscope.match(this.authScope) > -1 ? this.credentials : null;
        }

        @Override
        public void clear() {
            this.authScope = null;
            this.credentials = null;
        }
    }

    private abstract static class AbstractJaasConf extends Configuration {
        private final String userPrincipalName;
        private final boolean enableDebugLogs;

        AbstractJaasConf(final String userPrincipalName, final boolean enableDebugLogs) {
            this.userPrincipalName = userPrincipalName;
            this.enableDebugLogs = enableDebugLogs;
        }

        @Override
        public AppConfigurationEntry[] getAppConfigurationEntry(final String name) {
            final Map<String, String> options = new HashMap<>();
            options.put("principal", userPrincipalName);
            options.put("refreshKrb5Config", Boolean.TRUE.toString());
            options.put("isInitiator", Boolean.TRUE.toString());
            options.put("storeKey", Boolean.TRUE.toString());
            options.put("renewTGT", Boolean.FALSE.toString());
            options.put("debug", Boolean.toString(enableDebugLogs));
            addOptions(options);
            return new AppConfigurationEntry[]{new AppConfigurationEntry(SUN_KRB5_LOGIN_MODULE,
                    AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, Collections.unmodifiableMap(options))};
        }

        abstract void addOptions(Map<String, String> options);
    }

    private static class KeytabJaasConf extends AbstractJaasConf {
        private final String keytabFilePath;

        KeytabJaasConf(final String userPrincipalName, final String keytabFilePath, final boolean enableDebugLogs) {
            super(userPrincipalName, enableDebugLogs);
            this.keytabFilePath = keytabFilePath;
        }

        public void addOptions(final Map<String, String> options) {
            options.put("useKeyTab", Boolean.TRUE.toString());
            options.put("keyTab", keytabFilePath);
            options.put("doNotPrompt", Boolean.TRUE.toString());
        }
    }

    public static class miTM implements javax.net.ssl.TrustManager, javax.net.ssl.X509TrustManager {
        public java.security.cert.X509Certificate[] getAcceptedIssuers() {
            return null;
        }

        public boolean isServerTrusted(java.security.cert.X509Certificate[] certs) {
            return true;
        }

        public boolean isClientTrusted(java.security.cert.X509Certificate[] certs) {
            return true;
        }

        public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType)
                throws java.security.cert.CertificateException {
            return;
        }

        public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType)
                throws java.security.cert.CertificateException {
            return;
        }
    }
}
public static void main(String[] args) {

        SpnegoHttpClientConfigCallbackHandler callbackHandler = new SpnegoHttpClientConfigCallbackHandler();
        RestClientBuilder restClientBuilder = RestClient.builder(HttpHost.create("https://fusioninsight02:24100"));
        restClientBuilder.setMaxRetryTimeoutMillis(600000);
        restClientBuilder.setHttpClientConfigCallback(callbackHandler);

        try (RestClient restClient = restClientBuilder.build()) {

            Request request = new Request("GET", "/jucun_eoi/_search");
            request.setJsonEntity("{
" +
                    "    "query":{
" +
                    "        "match_all":{}
" +
                    "    },
" +
                    "    "size":10
" +
                    "}");
            Response response = restClient.performRequest(request);
            System.out.println(EntityUtils.toString(response.getEntity()));


        } catch (IOException e) {
            e.printStackTrace();
        }

    }

该方式可以保证Subject是单例,相同的subject,它的登录状态会持续的保持

RestClient华为方式

上面的一种方式几乎可以注入到RestClient中了。但是还有另一种智障方式,参考华为的fusioninsight平台定制的Restclient。

这里美其名曰定制,实质上就是修改源码。根据反编译的结果可以看到,华为是使用单线程的Subject.doAs生成ticket,先通过System.property 获得jaas文件,然后将ticket做一个全局变量,最后塞到httpclient的header中。该方式代码侵入性大,且很容易造成全局污染,部署的时候还得单独把RestClient的jar抽出来。但就目前测试结果来看,该方式确实是比较简单且稳定的。

核心代码

 private void setKerberosConfig() throws IOException {
        String finalPath = "es.jaas.conf";
        File fileCopy = new File(finalPath);
        if (!fileCopy.exists()) {
            fileCopy.createNewFile();
        }
        FileWriter writer = new FileWriter(fileCopy);
        String jaasConfigContent = "EsClient {
" + this.jaas + "
};
";
        ElasticSearchSinkJob.logger.info("writing jaas config to {}, content: {}", fileCopy.getAbsolutePath(), jaasConfigContent);
        writer.write(jaasConfigContent);
        writer.flush();
        writer.close();
        System.setProperty("es.security.indication", "true");
        System.setProperty("java.security.auth.login.config", fileCopy.getAbsolutePath());
        Configuration.getConfiguration().refresh();
    }

华为定制的restclient会读取java.security.auth.login.config所定义的路径,然后进行解析。写入的路径一般当前路径,因为yarn或standalone环境一般对当前路径都有读写权限

FLINK与SPARK集成

  • 目前flink的source 使用HttpAsyncClient方式已经能够正常运行。

fink目前测试未通过。跟踪发现

RestHighLevelClient rhlClient = new RestHighLevelClient(builder);

        if (LOG.isInfoEnabled()) {
            LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts);
        }

        if (!rhlClient.ping()) {
            throw new RuntimeException("There are no reachable Elasticsearch nodes!");
        }

rhlClient.ping 失败。原因是该方法的httpClient触发的HEAD请求方式。

原因在于我用的服务端是华为平台集成的elasticsearch。华为魔改的很臭,根据http协议,如果用HEAD方式请求,响应里面不能有body。但是华为魔改的,HEAD方式请求会返回body

报错

org.apache.http.impl.conn.DefaultHttpResponseParser] - Garbage in response: {"status":401,"error":{"reason":"Authentication required"}}

最终在org.apache.http.protocol.HttpService的handleRequest()方法中的 

if (canResponseHaveBody(request, response)) {
            conn.sendResponseEntity(response);
        }
private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
        if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
            return false;
        }
        final int status = response.getStatusLine().getStatusCode();
        return status >= HttpStatus.SC_OK
                && status != HttpStatus.SC_NO_CONTENT
                && status != HttpStatus.SC_NOT_MODIFIED
                && status != HttpStatus.SC_RESET_CONTENT;
    }

如果请求方式为HEAD,那么它将不会解析后面的内容。但如果服务端返回了body,而HEAD方式没有将body消费,下一次进来时则是消费的上次的body。并不是拿到真正的Header

最后就会认为这次请求状态仍然不是状态200

解决方案: 在RestClientBuilder的Header中加入 (connection,close) 。性能会有一定的影响

  • spark 参考官网:https://www.elastic.co/guide/en/elasticsearch/hadoop/6.7/kerberos.html

测试也没成功,官方配置不靠谱。源码跟踪之后发现多个参数不对

原文地址:https://www.cnblogs.com/zhouwenyang/p/14477427.html