kafka 部署

推荐该文章,亲测可用:

https://www.jianshu.com/p/bacc8eb03c4b

推荐使用 yml:

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2182:2181"
    networks:
      - hbl_net
  kafka:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    links:
      - zookeeper
    networks:
      - hbl_net
    ports:
      - "9095:9092"
    environment:
      KAFKA_BROKER_ID: 5
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://xx.xx.xx.xx:9095  #宿主机监听端口
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
networks:
  hbl_net:
    driver: bridge  # 生成一个桥接网络,用于容器内部通信,注意实际生成的网络名称会带有docker-compose.yml文件所在文件夹的前缀,比如我的.yml文件放在了hbl文件夹下,所以执行后生成的网络名为hbl_hbl_net
  # external: true 如果外部已有网络就用这个配置

update 2020-12-04

另外一个成功的案例:使用bitnami的镜像搭建的,感觉会正规点

version: '3'
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
      - '29092:29092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT 
      - KAFKA_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092 
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://IP:9092,PLAINTEXT_HOST://IP:29092
      - ALLOW_PLAINTEXT_LISTENER=yes

附上Python相关的调用代码:

# 生产者:
import json
from kafka import KafkaProducer
from decimal import Decimal
import decimal
def producer():
    host_port = '0.0.0.0:29092'

    topic = 'handle__price_cost'
    producer = KafkaProducer(bootstrap_servers=host_port,
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                             # security_protocol='SASL_PLAINTEXT',
                             # sasl_plain_username="user",
                             # sasl_plain_password="bitnami",
                             # sasl_mechanism="PLAIN",
                             max_block_ms=2000,
                             )

    msg_dict = { 'location_id': 193, 'product_id': 842892}

    msg = json.dumps(msg_dict, cls=DecimalEncoder)
    print(msg)
    producer.send(topic, msg)
    # record_metadata = future.get(timeout=10)
    print('send success')
    producer.close()
    print('end')
# 消费者:
from kafka import KafkaConsumer
import json
from pykafka import KafkaClient
def customer2():
    # '192.168.1.217:9092', '192.168.1.234:9092', '192.168.1.188:9092'
    # hosts = '39.108.194.209:29092'
    hosts = '0.0.0.0:29092'
    client = KafkaClient(hosts=hosts)
    topic = client.topics['handle_price_cost']
    # topic = client.topics['1111']
    consumer = topic.get_simple_consumer(consumer_group=b'sinfl00d51044-200', auto_commit_interval_ms=1,
                                         auto_commit_enable=True)
    with open('customer.txt','wb') as f:
        for msg in consumer:
            # recv = "%s:%d:%d: key=%s value=%s" % (msg.topics, msg.partition, msg.offset, msg.key, msg.value.decode('utf-8'))
            a = json.loads(msg.value,encoding='utf-8')
            f.write((str(a)+'
').encode('utf-8'))
            print(a)
原文地址:https://www.cnblogs.com/qianxunman/p/13744508.html