kafka connect rest api

1. 获取 Connect Worker 信息
curl -s http://127.0.0.1:8083/ | jq

lenmom@M1701:~/workspace/software/kafka_2.11-2.1.0/logs$ curl -s http://127.0.0.1:8083/ | jq
{
  "version": "2.1.0",
  "commit": "809be928f1ae004e",
  "kafka_cluster_id": "NGQRxNZMSY6Q53ktQABHsQ"
}


2.列出 Connect Worker 上所有 Connector
curl -s http://127.0.0.1:8083/connector-plugins | jq

lenmom@M1701:~/workspace/software/kafka_2.11-2.1.0/logs$ curl -s http://127.0.0.1:8083/connector-plugins | jq
[
  {
    "class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "type": "sink",
    "version": "5.2.1"
  },
  {
    "class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector",
    "type": "source",
    "version": "2.1.0"
  },
  {
    "class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
    "type": "source",
    "version": "2.1.0"
  },
  {
    "class": "io.debezium.connector.mongodb.MongoDbConnector",
    "type": "source",
    "version": "0.9.4.Final"
  },
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "0.9.4.Final"
  },
  {
    "class": "io.debezium.connector.oracle.OracleConnector",
    "type": "source",
    "version": "0.9.4.Final"
  },
  {
    "class": "io.debezium.connector.postgresql.PostgresConnector",
    "type": "source",
    "version": "0.9.4.Final"
  },
  {
    "class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "type": "source",
    "version": "0.9.4.Final"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "2.1.0"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "2.1.0"
  }
]

3.获取 Connector 上 Task 以及相关配置的信息
curl -s http://127.0.0.1:8083/connectors/<Connector名字>/tasks | jq

lenmom@M1701:~/workspace/software/kafka_2.11-2.1.0/logs$ curl -s localhost:8083/connectors/inventory-connector/tasks |jq
[
  {
    "id": {
      "connector": "inventory-connector",
      "task": 0
    },
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "database.user": "root",
      "database.server.id": "184054",
      "tasks.max": "1",
      "database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
      "database.history.kafka.topic": "dbhistory.inventory",
      "database.server.name": "127.0.0.1",
      "database.port": "3306",
      "task.class": "io.debezium.connector.mysql.MySqlConnectorTask",
      "database.hostname": "127.0.0.1",
      "database.password": "root",
      "name": "inventory-connector",
      "database.whitelist": "inventory"
    }
  }
]


4.获取 Connector 状态信息
curl -s http://127.0.0.1:8083/connectors/<Connector名字>/status | jq

lenmom@M1701:~/workspace/software/kafka_2.11-2.1.0/logs$ curl -s localhost:8083/connectors/inventory-connector/status |jq
{
  "name": "inventory-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "127.0.0.1:8083"
    }
  ],
  "type": "source"
}


5.获取 Connector 配置信息
curl -s http://127.0.0.1:8083/connectors/<Connector名字>/config | jq

lenmom@M1701:~/workspace/software/kafka_2.11-2.1.0/logs$ curl -s localhost:8083/connectors/inventory-connector/config |jq
{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "database.user": "root",
  "database.server.id": "184054",
  "tasks.max": "1",
  "database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
  "database.history.kafka.topic": "dbhistory.inventory",
  "database.server.name": "127.0.0.1",
  "database.port": "3306",
  "database.hostname": "127.0.0.1",
  "database.password": "root",
  "name": "inventory-connector",
  "database.whitelist": "inventory"
}

6.暂停 Connector
curl -s -X PUT http://127.0.0.1:8083/connectors/<Connector名字>/pause

7.重启 Connector
curl -s -X PUT http://127.0.0.1:8083/connectors/<Connector名字>/resume


8.删除 Connector
curl -s -X DELETE http://127.0.0.1:8083/connectors/<Connector名字>

9.创建新 Connector (以FileStreamSourceConnector举例)
curl -s -X POST -H "Content-Type: application/json" --data
'{

"name": "hdfs-hive-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "127.0.0.1.inventory.customers",
"hdfs.url": "hdfs://127.0.0.1:9000/inventory",
"flush.size": "10",
"format.class":"io.confluent.connect.hdfs.string.StringFormat",
"hive.integration": true,
"hive.database": "inventory",
"hive.metastore.uris": "thrift://127.0.0.1:9083",
"schema.compatibility": "BACKWARD"
}
}'


http://http://127.0.0.1:8083/connectors | jq

lenmom@M1701:~/workspace/software/kafka_2.11-2.1.0/logs$ curl -H "applicaiton/json"  http://127.0.0.1:8083/connectors/hdfs-hive-sink |jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   371  100   371    0     0  61833      0 --:--:-- --:--:-- --:--:-- 74200
{
  "name": "hdfs-hive-sink",
  "config": {
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "format.class": "io.confluent.connect.hdfs.string.StringFormat",
    "flush.size": "10",
    "tasks.max": "1",
    "topics": "127.0.0.1.inventory.customers",
    "hdfs.url": "hdfs://127.0.0.1:9000/inventory",
    "name": "hdfs-hive-sink"
  },
  "tasks": [
    {
      "connector": "hdfs-hive-sink",
      "task": 0
    }
  ],
  "type": "sink"
}


10.更新 Connector配置 (以FileStreamSourceConnector举例)
curl -s -X PUT -H "Content-Type: application/json" --data
'{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"key.converter.schemas.enable":"true",
"file":"demo-file.txt",
"tasks.max":"2",
"value.converter.schemas.enable":"true",
"name":"file-stream-demo-distributed",
"topic":"demo-2-distributed",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter":"org.apache.kafka.connect.json.JsonConverter"}'
http://127.0.0.1:8083/connectors/file-stream-demo-distributed/config | jq

原文地址:https://www.cnblogs.com/lenmom/p/10768278.html