阅读量:144
Kafka消费者组的配置主要包括以下几个方面:
基本配置
-
bootstrap.servers
- 指定Kafka集群的broker地址列表,多个broker之间用逗号分隔。
bootstrap.servers=broker1:9092,broker2:9092 -
group.id
- 消费者组的唯一标识符。
group.id=my-consumer-group -
enable.auto.commit
- 是否自动提交偏移量,默认为
true。
enable.auto.commit=true - 是否自动提交偏移量,默认为
-
auto.commit.interval.ms
- 自动提交偏移量的时间间隔。
auto.commit.interval.ms=5000 -
auto.offset.reset
- 当没有初始偏移量或当前偏移量不再存在时(例如数据被删除),新消费者如何处理。
earliest: 从最早的偏移量开始消费。latest: 从最新的偏移量开始消费。none: 如果没有找到合适的偏移量,则抛出异常。
auto.offset.reset=earliest - 当没有初始偏移量或当前偏移量不再存在时(例如数据被删除),新消费者如何处理。
高级配置
-
max.poll.records
- 每次调用
poll()方法返回的最大记录数。
max.poll.records=500 - 每次调用
-
fetch.min.bytes
- 服务器端一次返回给消费者的最小数据量。
fetch.min.bytes=1024 -
fetch.max.wait.ms
- 如果服务器端的数据不足
fetch.min.bytes,则等待的最长时间。
fetch.max.wait.ms=500 - 如果服务器端的数据不足
-
session.timeout.ms
- 消费者与Kafka集群之间的会话超时时间。
session.timeout.ms=30000 -
heartbeat.interval.ms
- 消费者发送心跳到协调者的频率。
heartbeat.interval.ms=3000 -
max.partition.fetch.bytes
- 每个分区返回的最大数据量。
max.partition.fetch.bytes=1048576 -
isolation.level
- 消费者读取数据的隔离级别。
read_committed: 只读取已提交的数据。read_uncommitted: 读取所有数据,包括未提交的数据。
isolation.level=read_committed - 消费者读取数据的隔离级别。
-
security.protocol
- 安全协议,如
SSL,SASL_PLAINTEXT,SASL_SSL等。
security.protocol=SASL_SSL - 安全协议,如
-
sasl.mechanism
- SASL认证机制,如
PLAIN,GSSAPI等。
sasl.mechanism=PLAIN - SASL认证机制,如
-
sasl.jaas.config
- SASL认证配置。
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="pass";
示例配置文件
bootstrap.servers=broker1:9092,broker2:9092
group.id=my-consumer-group
enable.auto.commit=true
auto.commit.interval.ms=5000
auto.offset.reset=earliest
max.poll.records=500
fetch.min.bytes=1024
fetch.max.wait.ms=500
session.timeout.ms=30000
heartbeat.interval.ms=3000
max.partition.fetch.bytes=1048576
isolation.level=read_committed
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="pass";
注意事项
- 配置项应根据实际需求和环境进行调整。
- 在生产环境中,建议关闭自动提交偏移量,并手动控制偏移量的提交,以确保数据的一致性和可靠性。
- 安全相关的配置(如
security.protocol和sasl.*)应根据实际的安全需求进行设置。
通过合理配置这些参数,可以优化Kafka消费者的性能和可靠性。