Datax 将Mysql数据导入ES

1、mysql数据库格式

 

2、es的安装,可以看我以前的步骤

3、下载Datax

wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

4、由于Datax默认没有elasticsearchwriter,所以需要自己打jar包(注意:需要 jdk1.8; maven 3.x)

1、git clone https://github.com/alibaba/DataX.git

2、保留编译的子模块

<modules>
        <module>common</module>
        <module>core</module>
        <module>transformer</module>

        <!-- reader -->
        <module>mysqlreader</module>
        
        <!-- writer -->
        <module>elasticsearchwriter</module>

        <!-- common support module -->
        <module>plugin-rdbms-util</module>
        <module>plugin-unstructured-storage-util</module>
        <module>hbase20xsqlreader</module>
        <module>hbase20xsqlwriter</module>
</moudles>


3、mvn clean install -Dmaven.test.skip=true

4、复制打好的包/elasticsearchwriter/target/datax/plugin/writer/elasticsearchwriter目录到datax的plugin目录下

5、编写job.json

{
    "job": {
      "setting": {
          "speed": {
              "channel": 1
          },
          "errorLimit": {
              "percentage": 0
          }
      },
      "content": [
          {
            "reader": {
                "name": "mysqlreader",
                "parameter": {
                    "username": "root",
                    "password": "yang156122",
                    "connection": [{
                        "querySql": ["select * from user_t"],
                        "jdbcUrl": ["jdbc:mysql://localhost:3306/test"]
                    }]
                }
            },
              "writer": {
                  "name": "elasticsearchwriter",
                  "parameter": {
                      "endpoint":"http://hadoop100:9200",
                      "accessId":"root",
                      "accessKey":"root",
                      "index": "mysql2es", # es index
                      "type": "id",  # es table
                      "cleanup": false,
                      "discovery":false,
                      "column": [
                          {
                            "name": "id",
                            "type": "integer"
                          },
                        {
                            "name": "user_name",
                            "type": "keyword"
                        },
                                                 {
                            "name": "phone_number",
                            "type": "keyword"
                        },
                                                 {
                            "name": "password",
                            "type": "keyword"
                        },
                                                 {
                            "name": "age",
                            "type": "keyword"
                        }
                                                ,
                                                 {
                            "name": "sex",
                            "type": "keyword"
                        }

                      ]
                  }
              }
            }
        ]
        }
    }

6、创建es映射

 @Test
    public void mysql2Es() throws IOException {
        //1:settings
        HashMap<String, Object> settings_map = new HashMap<String, Object>(2);
        settings_map.put("number_of_shards", 3);
        settings_map.put("number_of_replicas", 2);

        //2:mappings(映射、schema)
        XContentBuilder builder = XContentFactory.jsonBuilder()
                .startObject()
                .field("dynamic", "true")
                //设置type中的属性
                .startObject("properties")
                //.startObject("pin")
                // .startObject("properties")
                .startObject("id")
                .field("type", "integer")
                .endObject()
                .startObject("user_name")
                .field("type", "string")
                .field("index", "not_analyzed")
                .endObject()

                .startObject("phone_number")
                .field("type", "string")
                .field("index", "not_analyzed")
                .endObject()

                .startObject("password")
                .field("type", "string")
                .field("index", "not_analyzed")
                .endObject()

                .startObject("age")
                .field("type", "string")
                .field("index", "not_analyzed")
                .endObject()
                .startObject("sex")
                .field("type", "string")
                .field("index", "not_analyzed")
                .endObject()
                // .endObject()
                // .endObject()
                .endObject()
                .endObject();

        CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("mysql2es");
        //管理索引(user_info)然后关联type(user)
        prepareCreate.setSettings(settings_map).addMapping("id", builder).get();
    }

}

7、运行

 python ../bin/datax.py  job.json 

8、实验结果

原文地址:https://www.cnblogs.com/ywjfx/p/13580123.html