姜鹏辉的个人博客 GreyNius

【Python】Python生产和消费kafka

2021-03-09

安装

pip3 intall kafka-python

脚本使用

kafka-console-producer.sh --broker-list 192.168.40.193:9092 --topic [topic]


kafka-configs.sh:配置管理脚本。

kafka-console-consumer.sh:kafka 消费者控制台。

kafka-console-producer.sh:kafka 生产者控制台。

kafka-consumer-groups.sh:kafka 消费者组相关信息。

kafka-delete-records.sh:删除低水位的日志文件。

kafka-log-dirs.sh:kafka 消息日志目录信息。

kafka-mirror-maker.sh:不同数据中心 kafka 集群复制工具。

kafka-preferred-replica-election.sh:触发 preferred replica 选举。

kafka-producer-perf-test.sh:kafka 生产者性能测试脚本。

kafka-reassign-partitions.sh:分区重分配脚本。

kafka-replica-verification.sh:复制进度验证脚本。

kafka-server-start.sh:启动 kafka 服务。

kafka-server-stop.sh:停止 kafka 服务。

kafka-topics.sh:topic 管理脚本。

kafka-verifiable-consumer.sh:可检验的 kafka 消费者。

kafka-verifiable-producer.sh:可检验的 kafka 生产者。

zookeeper-server-start.sh:启动 zk 服务。

zookeeper-server-stop.sh:停止 zk 服务。

zookeeper-shell.sh:zk 客户端。

消费者

消费

from kafka import KafkaConsumer
consumer = KafkaConsumer('NTC-CONN-RECORD-LOG', auto_offset_reset='earliest',bootstrap_servers=['192.168.10.28:9092'])                   
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

其中auto_offset_reset默认为读取latest,最早的消息

重置偏移量

from kafka.structs import TopicPartition
consumer.seek(TopicPartition(topic=u'test', partition=0), 5)  #重置偏移量,从第5个偏移量消费

生产者

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['192.168.10.28:9092']) 
producer.send('NTC-CONN-RECORD-LOG',test_data)
producer.close()

Comments

Content