kafak-python函数使用详解

时间:2019-01-02 22:54:11   收藏:0   阅读:292

 

 

from kafka.structs import TopicPartition,OffsetAndMetadata
configs = {
            bootstrap_servers: 10.57.19.60,
            enable_auto_commit: False,
            group_id: test,
            api_version: (0, 8, 2),
            ssl_check_hostname: False,
            consumer_timeout_ms: 3000,  # 若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待
            # ssl_certfile: ssl_certfile,
            # security_protocol: SSL,
            # ssl_cafile: ssl_cafile
        }
topics=(test, )
# 注意指定分区将会失去故障转移/负载均衡的支持,当然也没有了自动分配分区的功能(因为已经人为指定了嘛)
topic_partition = TopicPartition(topic=test,partition=0) 
# 
consumer = KafkaConsumer(**configs)
# 参数必须是列表,表示订阅的topic/partition列表
consumer.assign([topic_partition])
# 获取分给当前用户的topic/partition信息
consumer.assignment()
# 提交偏移量:可以告知服务器当前偏移量,也可以设置偏移量
consumer.commit({TopicPartition(topic=test, partition=0): OffsetAndMetadata(offset=280, metadata=‘‘)})
# 异步提交
consumer.commit_async()
# 获取服务器的最后确认的偏移量,即最新数据开始读取的地方
consumer.committed(TopicPartition(topic=test, partition=0))
# 获取服务器当前最新的偏移量,读到这个偏移量后,所有数据都读取完了
consumer.highwater(TopicPartition(topic=test, partition=0))
# 获取消费的性能
consumer.metrics()
# 获取某个topic的partition信息
consumer.partitions_for_topic(topic)
# 获取下一条数据开始读取的偏移量,即从这个便宜量开始继续读取数据
consumer.position(TopicPartition(topic=test, partition=0))
# 从指定偏移量位置开始读取数据 
consumer.seek(TopicPartition(topic=test, partition=0), 283)
# 从头开始读取数据
consumer.seek_to_beginning()
# 从最后开始读取数据
consumer.seek_to_end()
# 订阅topic,可以订阅多个,可以使用正则表达式匹配多个
consumer.subscribe()
# 获取订阅的信息,无法获取使用assign分配的topic/partition信息
consumer.subscription()
# 获取当前用户授权的topic信息
consumer.topics()
# 取消消息的订阅
consumer.unsubscribe()
# 一起消费多条消息,最多等待时间timeout_ms,最多消费max_records
consumer.poll(self, timeout_ms=0, max_records=None) # 获取指定分区第一个偏移量 consumer.beginning_offsets([topic_partition]) # 获取指定分区最后一个偏移量,最新的偏移量 consumer.end_offsets([topic_partition]) # 关闭连接 consumer.close() # #consumer.seek(topic_partition,
284) for message in consumer: print(message)

 

重复消费是如何产生的?

消费者设置为自动提交偏移量时,需要同时设置自动提交偏移量的时间间隔。如果消费完若干消息后,还没有到自动提交偏移量的时间时,应用挂了,则系统记录的偏移量还是之前的值,那么刚才消费的若干消息,会在应用重连之后重新消费

如何保证不会重复消费?

消费段记录下发送给服务器的偏移量,获取最新数据时再判断这个偏移量是否正确

生产的消息队列长度,会堆积吗?

消费的信息队列长度,会堆积吗?

生产者速度大于消费者速度怎么处理?

kafka 认证与授权机制

Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三种认证机制。目前支持以下安全措施:

 

kafka偏移量的相关配置

enable.auto.commit

true(默认):自动提交偏移量,可以通过配置 auto.commit.interval.ms属性来控制提交偏移量的频率。(基于时间间隔)

false:手动控制偏移量。可以在程序逻辑必要的时候提交偏移量,而不是基于时间隔。此时可以进行同步,异步,同步异步组合(参考相应api)。

auto.offset.reset

无法读取偏移量时候读取消息的设置

latest(默认):从最新记录读取数据。

earliest:从起始位置读取数据

参考:

1、https://zhuanlan.zhihu.com/p/33238750

2、https://help.aliyun.com/document_detail/68331.html

3、https://blog.csdn.net/xiaoguozi0218/article/details/80513849

4、https://zhuanlan.zhihu.com/p/38330574

5、https://blog.csdn.net/ZhongGuoZhiChuang/article/details/79550570

6、https://help.aliyun.com/document_detail/67233.html

原文:https://www.cnblogs.com/shengulong/p/10211681.html

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!