安装
pip3 intall kafka-python
脚本使用
kafka-console-producer.sh --broker-list 192.168.40.193:9092 --topic [topic]
kafka-configs.sh:配置管理脚本。
kafka-console-consumer.sh:kafka 消费者控制台。
kafka-console-producer.sh:kafka 生产者控制台。
kafka-consumer-groups.sh:kafka 消费者组相关信息。
kafka-delete-records.sh:删除低水位的日志文件。
kafka-log-dirs.sh:kafka 消息日志目录信息。
kafka-mirror-maker.sh:不同数据中心 kafka 集群复制工具。
kafka-preferred-replica-election.sh:触发 preferred replica 选举。
kafka-producer-perf-test.sh:kafka 生产者性能测试脚本。
kafka-reassign-partitions.sh:分区重分配脚本。
kafka-replica-verification.sh:复制进度验证脚本。
kafka-server-start.sh:启动 kafka 服务。
kafka-server-stop.sh:停止 kafka 服务。
kafka-topics.sh:topic 管理脚本。
kafka-verifiable-consumer.sh:可检验的 kafka 消费者。
kafka-verifiable-producer.sh:可检验的 kafka 生产者。
zookeeper-server-start.sh:启动 zk 服务。
zookeeper-server-stop.sh:停止 zk 服务。
zookeeper-shell.sh:zk 客户端。
消费者
消费
from kafka import KafkaConsumer
consumer = KafkaConsumer('NTC-CONN-RECORD-LOG', auto_offset_reset='earliest',bootstrap_servers=['192.168.10.28:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
其中auto_offset_reset
默认为读取latest,最早的消息
重置偏移量
from kafka.structs import TopicPartition
consumer.seek(TopicPartition(topic=u'test', partition=0), 5) #重置偏移量,从第5个偏移量消费
生产者
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['192.168.10.28:9092'])
producer.send('NTC-CONN-RECORD-LOG',test_data)
producer.close()