Flink Sql Client初探

注: 对应的sql_lib依赖jar,在参考博客的留言下面有

1、运行f'link sql 

1、首先进入flink目录,启动flink:bin/start-cluster.sh
2、其次启动Flink SQL Client:bin/sql-client.sh embedded -l sql_lib

2、启动界面

 3、测试demo

DROP TABLE IF  EXISTS  `user`;

CREATE TABLE `user` (
    id INT, 
    name STRING,
    create_time TIMESTAMP(3)
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/test',
    'connector.table' = 'user',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

CREATE TABLE user_info (
    id INT,
    username STRING,
    password STRING
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/test',
    'connector.table' = 'user_info',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

4、执行sql,查看结果

select u.id,u.name,u.create_time,ui.id,ui.username,ui.password  
from `user` as u 
left join user_info as ui 
on u.id = ui.id;

 

 ===============实现Datax效果========================

DROP TABLE IF  EXISTS  `user`;
CREATE TABLE `user` (
    id INT, 
    name STRING,
    create_time TIMESTAMP(3)
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/test',
    'connector.table' = 'user',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

DROP TABLE IF  EXISTS  `user_info`;
CREATE TABLE user_info (
    id INT,
    username STRING,
    password STRING
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/test',
    'connector.table' = 'user_info',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);


DROP TABLE IF  EXISTS  `user_all`;
CREATE TABLE user_all (
    id INT,
    name STRING,
    create_time TIMESTAMP(3),
    bid INT,
    username STRING,
    password STRING
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/test',
    'connector.table' = 'user_all',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);


insert into user_all
    select u.id,u.name,u.create_time,ui.id as bid,ui.username,ui.password  
    from `user` as u 
    left join user_info as ui 
    on u.id = ui.id group by u.id,u.name,u.create_time,ui.id,ui.username,ui.password  ;
原文地址:https://www.cnblogs.com/ywjfx/p/14276499.html