Kafka和Python-让我们一起学习

根据其官方的Apache页面:“ Kafka用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,快速的核心性能,并在成千上万的公司中投入生产。

根据正在使用Kafka的Stackshare.io的一瞥

消息被组织为“主题”。 生产者推送或发布消息。 消费者拉信息。 作为消费者,您订阅主题以获取消息。 Kafka在群集中运行,每个节点称为代理。

一个主题可以有多个分区,这些分区分布在多个代理中。 您可以并行化使用者以从不同的主题分区中提取。

每个分区实质上是一个顺序写入的日志文件。 您可以指定存储数据的时间。 每个代理都有许多分区,可以在其他代理之间复制。

每个分区都有一个领导者,这是发送写入的地方。 可以设置一致性和可用性。

当使用者使用来自Kafka的消息时,它将使用消息偏移量来跟踪已使用了哪些消息。 如果它消耗了主题中的前50条消息,则当收到新消息时,它将从第50个偏移键开始,并开始消耗那些未读的消息。 创建使用者时,可以通过应用程序/代码将偏移的处理方式和提交方式从全自动配置为手动。

消费者组由多个消费者组成。 每个使用者都接收一个唯一的数据分区,该分区作为一个整体进行处理,从而可以进行水平扩展。

简而言之ZooKeeper / Zetcd用于确保Kafka集群的所有移动部件无缝地协同工作。 它有助于跨所有代理进行同步,并跟踪代理系统状态(心跳),复制并管理代理主题注册表。

有几个适用于python的不同库(pykafka和confluent-kafka)是C客户端的包装。 为了利用C客户端,还需要其他安装步骤:librdkafka。

confluent-kafka的示例:

pip install confluent-kafka 

制片人

 从confluent_kafka import Producer#更改为指向您的kafka安装
 p =生产者({'bootstrap.servers':'localhost'})

 def delivery_report(err,msg):
     “”“为产生的每条消息调用一次以指示传递结果。
        由poll()或flush()触发。  “”
    如果err不是None:
         print('消息传递失败:{}'。format(err))
    其他:
         print('邮件传递到{} [{}]'。format(msg.topic(),msg.partition()))

用于some_data_source中的数据:
     #触发先前的Produce()调用中所有可用的交货报告回调
     p.poll(0)

     #异步产生一条消息,传递报告回调
    当邮件包含以下内容时,#将从上方的poll()或下方的flush()触发
     #已成功交付或永久失败。
     p.produce('mytopic',data.encode('utf-8'),callback = delivery_report)

 #等待任何未完成的邮件被发送并发送报告
 #要触发​​的回调。
 p.flush() 

消费者

 从confluent_kafka导入使用者,KafkaError


 c =消费者({
     'bootstrap.servers':'mybroker',
     'group.id':'mygroup',
     'default.topic.config':{
         'auto.offset.reset':'最小'
     }
 })

 c.subscribe(['mytopic'])

而True:
    味精= c.poll(1.0)

    如果msg为None:
        继续
    如果msg.error():
        如果msg.error()。code()== KafkaError._PARTITION_EOF:
            继续
        其他:
            打印(msg.error())
            打破

     print('收到的消息:{}'。format(msg.value()。decode('utf-8')))

 c.close()