使用python怎么对卡夫卡进行操作

  介绍

使用python怎么对卡夫卡进行操作?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

1,先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码。

# !/usr/bin/env  python   #,- *安康;编码:utf-8  - * -   import  json   得到kafka  import  KafkaProducer      时间=producer  KafkaProducer (bootstrap_servers=& # 39; xxxx: x # 39;)      msg_dict =, {   “sleep_time"才能:,,   “db_config"才能:,{   ,,,“database":,“test_1"   ,,,“host":,“xxxx"   ,,,“user":,“root"   ,,,“password":,“root"   ,,},   “table"才能:,“msg"   “msg"才能:,“Hello  World"   }   时间=msg  json.dumps (msg_dict)   producer.send (& # 39; test_rhj& # 39;,,味精、,分区=0)   producer.close ()

下面是消费者的简单代码:

得到kafka  import  KafkaConsumer      时间=consumer  KafkaConsumer (& # 39; test_rhj& # 39;,, bootstrap_servers=[& # 39; xxxx: x # 39;])   for  msg 消费者:拷贝   recv =,才能“% s: % d: % d:,键=% s 值=https://www.yisu.com/zixun/%s”%(味精。话题,味精。分区,味精。抵消,味精。键,msg.value)   打印recv

下面是结果:

使用python怎么对卡夫卡进行操作

2,如果想要完成负载均衡,就需要知道卡夫卡的分区机制,同一个主题,可以为其分区,在生产者不指定分区的情况,卡夫卡会将多个消息分发到不同的分区,消费者订阅时候如果不指定服务组,会收到所有分区的消息,如果指定了服务组,则同一服务组的消费者会消费不同的分区,如果2个分区两个消费者的消费者组消费,则,每个消费者消费一个分区,如果有三个消费者的服务组,则会出现一个消费者消费不到数据,如果想要消费同一分区,则需要用不同的服务组。以此为原理,我们对消费者做如下修改:

得到kafka  import  KafkaConsumer      时间=consumer  KafkaConsumer (& # 39; test_rhj& # 39;,, bootstrap_servers=[& # 39; xxxx: x # 39;])   for  msg 消费者:拷贝   recv =,才能“% s: % d: % d:,键=% s 值=https://www.yisu.com/zixun/%s”%(味精。话题,味精。分区,味精。抵消,味精。键,msg.value)   打印recv

然后我们开两个消费者进行消费,生产者分别往0分区和1分区发消息结果如下,可以看的到,一个消费者只能消费0分区,另一个只能消费1分区:

使用python怎么对卡夫卡进行操作“> <br/> </p> <p> <img src=

3,卡夫卡提供了偏移量的概念,允许消费者根据偏移量消费之前遗漏的内容,这基于卡夫卡名义上的全量存储,可以保留大量的历史数据,历史保存时间是可配置的,一般是7天,如果偏移量定位到了已删除的位置那也会有问题,但是这种情况可能很小,每个保存的数据文件都是以偏移量命名的,当前要查的偏移量减去文件名就是数据在该文件的相对位置。要指定偏移量消费数据,需要指定该消费者要消费的分区,否则代码会找不到分区而无法消费,代码如下:

得到kafka  import  KafkaConsumer   得到kafka.structs  import  TopicPartition      时间=consumer  KafkaConsumer (group_id=& # 39; 123456 & # 39;,, bootstrap_servers=[& # 39; 10.43.35.25:4531& # 39;])   consumer.assign ([TopicPartition(主题=& # 39;test_rhj& # 39;,,分区=0),,TopicPartition(主题=& # 39;test_rhj& # 39;,,分区=1)))   print  consumer.partitions_for_topic (“test_rhj"), #,获取测试主题的分区信息   print  consumer.assignment ()   print  consumer.beginning_offsets (consumer.assignment ())   consumer.seek (TopicPartition(主题=& # 39;test_rhj& # 39;,,分区=0),,0)   for  msg 消费者:拷贝   recv =,才能“% s: % d: % d:,键=% s 值=https://www.yisu.com/zixun/%s”%(味精。话题,味精。分区,味精。抵消,味精。键,msg.value)   打印recv

因为指定的便宜量为0,所以从一开始插入的数据都可以查到,而且因为指定了分区,指定的分区结果都可以消费,结果如下:

使用python怎么对卡夫卡进行操作

使用python怎么对卡夫卡进行操作