阿里云kafka使用记录(python版本)

kafka端
 
consumer vpc版代码
 
import socket
from kafka import KafkaConsumer
from kafka.errors import KafkaError

# context.check_hostname = True

consumer = KafkaConsumer(bootstrap_servers=['192.168.xx.xx:9092'],
                        group_id='xx',
                        api_version = (0,10)
                        )

print('consumer start to consuming...')
consumer.subscribe(('xx',))
for message in consumer:
    print(message.topic)
    print(message.offset)
    print(message.key)
    print(message.value)
    print(message.partition)

producer vpc版代码

#!/usr/bin/env python
# encoding: utf-8

import socket
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['192.168.xx.xx:9092'],
                        api_version = (0,10),
                        retries=5)

partitions = producer.partitions_for('xx')
print('Topic下分区: %s' % partitions)

try:
    future = producer.send(topic='xx', value=b'hello aliyun-kafka!')
    future.get()
    print('send message succeed.')
except KafkaError as e:
    print('send message failed.')
    print(e)

consumer公网版代码

import ssl
import socket
from kafka import KafkaConsumer
from kafka.errors import KafkaError


context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
# context.check_hostname = True
context.load_verify_locations("/tmp/ca-cert")

consumer = KafkaConsumer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],
                        group_id='xxx',
                        sasl_mechanism="PLAIN",
                        ssl_context=context,
                        security_protocol='SASL_SSL',
                        api_version = (0,10),
                        sasl_plain_username='xxx',
                        sasl_plain_password='1234567890')

print('consumer start to consuming...')
consumer.subscribe(('xxx', ))
for message in consumer:
    print(message.topic)
    print(message.offset)
    print(message.value)
    break
 
producer 公网版代码
#!/usr/bin/env python
# encoding: utf-8

import ssl
import socket
from kafka import KafkaProducer
from kafka.errors import KafkaError

context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_REQUIRED
# context.check_hostname = True
context.load_verify_locations("/tmp/ca-cert")
#这个文件参考https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-python-demo

producer = KafkaProducer(bootstrap_servers=['kafka-ons-internet.aliyun.com:8080'],
                        sasl_mechanism="PLAIN",
                        ssl_context=context,
                        security_protocol='SASL_SSL',
                        api_version = (0,10),
                        retries=5,
                        sasl_plain_username='xx',
                        sasl_plain_password='1234567890'#注意是access-key的最后十位)

partitions = producer.partitions_for('xxx')
print ('Topic下分区: %s' % partitions)

try:
    future = producer.send('xxx', b'hello aliyun-kafka!')
    future.get()
    print('send message succeed.')
except KafkaError as e:
    print('send message failed.')
    print(e)

从阿里云控台获得连接信息

原文地址:https://www.cnblogs.com/castlevania/p/10370803.html