elasticsearch7.6.2 -canal1.1.4集成

canal1.1.4默认是不支持 elasticsearch7.x 的. 

这里要非常感谢阿里将这个项目开源出来, 让我们有机会站在人家的肩膀上, 来进行一些改动.

我们可以将 canal1.1.4的 client-adapter 源码下载下来, 然后进行调试.

1. elasticsearch推荐我们使用 rest 方式, 来进行es的操作, 所以, 在 canal 配置的时候, 我们使用 rest 方式. 

2. pom.xml文件修改, 将引用修改为 7.6.2

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.6.2</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>7.6.2</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>7.6.2</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.6.2</version>
</dependency>

3. 在调试的过程中, 可以发现, 在连接 elasticsearch7.6.2 的时候, com.alibaba.otter.canal.client.adapter.es.support.ESConnection#getMapping会抛异常.

所以需要先对这个方法进行一些修改:

原代码:

public MappingMetaData getMapping(String index, String type) {
    MappingMetaData mappingMetaData = null;
    if (mode == ESClientMode.TRANSPORT) {
        ......
    } else {
        ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings;
        try {
            GetMappingsRequest request = new GetMappingsRequest();
            request.indices(index);
            GetMappingsResponse response;
            // try {
            // response = restHighLevelClient
            // .indices()
            // .getMapping(request, RequestOptions.DEFAULT);
            // // 6.4以下版本直接使用该接口会报错
            // } catch (Exception e) {
            // logger.warn("Low ElasticSearch version for getMapping");
            response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT);
            // }

            mappings = response.mappings();
        } catch (NullPointerException e) {
            throw new IllegalArgumentException("Not found the mapping info of index: " + index);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
            return null;
        }
        mappingMetaData = mappings.get(index).get(type);
    }
    return mappingMetaData;
}

主要就是这个标红的部分, 会报错.

修改为:

public MappingMetaData getMapping(String index, String type) {
    MappingMetaData mappingMetaData = null;
    if (mode == ESClientMode.TRANSPORT) {
        ......
    } else {
        ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings;
        //Map<String, MappingMetaData> mappings;
        try {
            GetMappingsRequest request = new GetMappingsRequest();
            request.indices(index);
            // try {
            // response = restHighLevelClient
            // .indices()
            // .getMapping(request, RequestOptions.DEFAULT);
            // // 6.4以下版本直接使用该接口会报错
            // } catch (Exception e) {
            // logger.warn("Low ElasticSearch version for getMapping");
            //GetMappingsResponse mapping = restHighLevelClient.indices().getMapping(request, RequestOptions.DEFAULT);
            //response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT);
            // }
            GetMappingsResponse response = restHighLevelClient.indices()
                    .getMapping(request, RequestOptions.DEFAULT);
            mappings = response.mappings();
        } catch (NullPointerException e) {
            throw new IllegalArgumentException("Not found the mapping info of index: " + index);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
            return null;
        }
        mappingMetaData = mappings.get(index).get(type);
    }
    return mappingMetaData;
}

只要改这一句话, elasticsearch7.6.2的curd就没啥问题了. 这个是我亲测的. 至于父子文档这种, 复杂一些的情况, 我还没有测试过. 

原文地址:https://www.cnblogs.com/elvinle/p/13934356.html