datax将mysql数据导入hive表

环境:CDH 5.12.1版本 ,mysql 5.7

1、mysql表结构

 2、mysql表数据(user)

 3、下载datax

wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

4、在datax的job目录编写一个mysql2hive.json文件

a) 下面是全量导入

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "age",
                            "create_time"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.75.101:3306/test"
                                ],
                                "table": [
                                    "user"
                                ]
                            }
                        ],
                        "password": "yang156122",
                        "username": "root",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "age",
                                "type": "INT"
                            },
                            {
                                "name": "create_time",
                                "type": "TIMESTAMP"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://192.168.75.101:8020",
                        "fieldDelimiter": "	",
                        "fileName": "user",
                        "fileType": "text",
                        "path": "/user/datax/data/ceshi",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
View Code

b) 下面是按指定的时间,增量导入

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "age",
                            "create_time"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.75.101:3306/test"
                                ],
                                "table": [
                                    "user"
                                ]
                            }
                        ],
                        "password": "yang156122",
                        "username": "root",
                        "where": "create_time >= '2020-10-21'"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "age",
                                "type": "INT"
                            },
                            {
                                "name": "create_time",
                                "type": "TIMESTAMP"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://192.168.75.101:8020",
                        "fieldDelimiter": "	",
                        "fileName": "user",
                        "fileType": "text",
                        "path": "/user/datax/data/ceshi",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
View Code

c)动态传参,增量导入(推荐看这个)

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
"id",
"name",
                            "age",
                            "create_time"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.75.101:3306/test"
                                ],
                                "table": [
                                    "user"
                                ]
                            }
                        ],
                        "password": "yang156122",
                        "username": "root",
                        "where": "create_time >= '$date'"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "age",
                                "type": "INT"
                            },
                            {
                                "name": "create_time",
                                "type": "TIMESTAMP"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://192.168.75.101:8020",
                        "fieldDelimiter": "	",
                        "fileName": "user",
                        "fileType": "text",
                        "path": "/user/datax/data/ceshi",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

5、创建hive的表

drop table if exists default.user;
create table default.user(id INT, name STRING , age INT , create_time TIMESTAMP
)ROW FORMAT DELIMITED FIELDS TERMINATED BY '	';

6、如果是增量导入(包括动态传参),每执行一次datax,都要进行load data

load data inpath '/user/datax/data/ceshi' into table default.user ;

7、这一步,仅针对动态传参,增量导入(可以忽略步骤6),vim start.sh

#! /bin/bash
echo "获取前一天的时间,时间格式为2020-10-21" a
=`date -d yesterday -u +%Y-%m-%d` echo "开始啦" python /root/data/soft/datax/datax/bin/datax.py -p "-Ddate=${a}" /root/data/soft/datax/datax/job/mysql2hive.json sleep 10 echo "开始将数据入hive表" hive -e "load data inpath '/user/datax/data/ceshi' into table default.user;"

8、执行 sh start.sh 

9、查看数据

hive 
use default;
select * from user;

仅供参考.....如有问题,请留言....

原文地址:https://www.cnblogs.com/ywjfx/p/13853461.html