阅读量:0
Kafka 提供了两种方式来手动提交偏移量:
- 使用 commitSync() 方法同步提交偏移量:
import org.apache.kafka.clients.consumer.*;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
try {
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
// 处理消息
}
consumer.commitSync(); // 手动提交偏移量
}
} finally {
consumer.close();
}
- 使用 commitAsync() 方法异步提交偏移量:
import org.apache.kafka.clients.consumer.*;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
try {
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
// 处理消息
}
consumer.commitAsync(); // 异步提交偏移量
}
} finally {
consumer.close();
}
在这两种方式中,commitSync() 方法会一直阻塞直到偏移量提交成功或发生错误。而 commitAsync() 方法则会在提交请求发送后立即返回,不会等待确认。如果发生错误,可以在 commitAsync() 方法的回调函数中处理。
以上就是关于“kafka手动提交偏移量怎么实现”的相关介绍,筋斗云是国内较早的云主机应用的服务商,拥有10余年行业经验,提供丰富的云服务器、租用服务器等相关产品服务。云服务器资源弹性伸缩,主机vCPU、内存性能强悍、超高I/O速度、故障秒级恢复;电子化备案,提交快速,专业团队7×24小时服务支持!
简单好用、高性价比云服务器租用链接:https://www.jindouyun.cn/product/cvm