根据其官方的Apache页面:“ Kafka用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,快速的核心性能,并在成千上万的公司中投入生产。
根据正在使用Kafka的Stackshare.io的一瞥


消息被组织为“主题”。 生产者推送或发布消息。 消费者拉信息。 作为消费者,您订阅主题以获取消息。 Kafka在群集中运行,每个节点称为代理。


一个主题可以有多个分区,这些分区分布在多个代理中。 您可以并行化使用者以从不同的主题分区中提取。
每个分区实质上是一个顺序写入的日志文件。 您可以指定存储数据的时间。 每个代理都有许多分区,可以在其他代理之间复制。
每个分区都有一个领导者,这是发送写入的地方。 可以设置一致性和可用性。
当使用者使用来自Kafka的消息时,它将使用消息偏移量来跟踪已使用了哪些消息。 如果它消耗了主题中的前50条消息,则当收到新消息时,它将从第50个偏移键开始,并开始消耗那些未读的消息。 创建使用者时,可以通过应用程序/代码将偏移的处理方式和提交方式从全自动配置为手动。
消费者组由多个消费者组成。 每个使用者都接收一个唯一的数据分区,该分区作为一个整体进行处理,从而可以进行水平扩展。


简而言之 , ZooKeeper / Zetcd用于确保Kafka集群的所有移动部件无缝地协同工作。 它有助于跨所有代理进行同步,并跟踪代理系统状态(心跳),复制并管理代理主题注册表。
有几个适用于python的不同库(pykafka和confluent-kafka)是C客户端的包装。 为了利用C客户端,还需要其他安装步骤:librdkafka。
confluent-kafka的示例:
pip install confluent-kafka
制片人
从confluent_kafka import Producer#更改为指向您的kafka安装
p =生产者({'bootstrap.servers':'localhost'})
def delivery_report(err,msg):
“”“为产生的每条消息调用一次以指示传递结果。
由poll()或flush()触发。 “”
如果err不是None:
print('消息传递失败:{}'。format(err))
其他:
print('邮件传递到{} [{}]'。format(msg.topic(),msg.partition()))
用于some_data_source中的数据:
#触发先前的Produce()调用中所有可用的交货报告回调
p.poll(0)
#异步产生一条消息,传递报告回调
当邮件包含以下内容时,#将从上方的poll()或下方的flush()触发
#已成功交付或永久失败。
p.produce('mytopic',data.encode('utf-8'),callback = delivery_report)
#等待任何未完成的邮件被发送并发送报告
#要触发的回调。
p.flush()
消费者
从confluent_kafka导入使用者,KafkaError
c =消费者({
'bootstrap.servers':'mybroker',
'group.id':'mygroup',
'default.topic.config':{
'auto.offset.reset':'最小'
}
})
c.subscribe(['mytopic'])
而True:
味精= c.poll(1.0)
如果msg为None:
继续
如果msg.error():
如果msg.error()。code()== KafkaError._PARTITION_EOF:
继续
其他:
打印(msg.error())
打破
print('收到的消息:{}'。format(msg.value()。decode('utf-8')))
c.close()