阅读量:79
要查看Kafka的偏移量,您可以使用Kafka的命令行工具或编程API。以下是两种常用方法:
方法1:使用Kafka命令行工具
Kafka提供了一个名为kafka-consumer-groups.sh的命令行工具,可以用来查看消费者组的偏移量。请按照以下步骤操作:
- 打开命令行终端。
- 使用以下命令格式运行
kafka-consumer-groups.sh工具:
./kafka-consumer-groups.sh --bootstrap-server --describe --group
将替换为您的Kafka代理地址(例如:localhost:9092),将替换为您要查询的消费者组ID。
运行此命令后,您将看到类似以下的输出:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-consumer-group my-topic 0 100 200 100 consumer-1-86753f8a-1234-4567-89ab-cdef12345678 /127.0.0.1 consumer-1
在这个例子中,您可以查看消费者组my-consumer-group在主题my-topic的每个分区的当前偏移量(CURRENT-OFFSET)、日志结束偏移量(LOG-END-OFFSET)和lag(LAG)。
方法2:使用编程API
您还可以使用Kafka客户端库(如Java、Python、Go等)编写程序来查询偏移量。以下是使用Java API的示例:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaOffsetViewer {
public static void main(String[] args) {
String kafkaBootstrapServer = "" ;
String consumerGroupId = "" ;
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBootstrapServer);
props.put("group.id", consumerGroupId);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("" ));
consumer.seekToEnd(consumer.assignment());
while (true) {
consumer.poll(Duration.ofMillis(1000));
consumer.position(consumer.assignment().iterator().next());
System.out.println("Topic: " + consumer.topic() + ", Partition: " + consumer.partition() + ", Offset: " + consumer.position());
}
}
}
将替换为您的Kafka代理地址,将替换为您要查询的消费者组ID,将替换为您要查询的主题名称。
运行此Java程序后,您将看到类似以下的输出:
Topic: my-topic, Partition: 0, Offset: 200
在这个例子中,您可以查看主题my-topic在分区0的当前偏移量。