阅读量:1
要使用 Maven 实现 Kafka 消息消费,需要按照以下步骤进行操作:
- 在 Maven 项目的
pom.xml文件中添加 Kafka 客户端的依赖:
<dependencies>
<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka-clientsartifactId>
<version>2.8.1version>
dependency>
dependencies>
- 创建一个 Kafka 消费者:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String groupId = "my-group";
String topic = "my-topic";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Consumer consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton(topic));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.println("Received message: " + record.value());
}
}
}
}
在上述代码中,需要根据实际情况修改 bootstrapServers、groupId 和 topic 的值。
- 运行上述代码,即可启动 Kafka 消费者并开始消费消息。
以上就是使用 Maven 实现 Kafka 消息消费的步骤。
以上就是关于“maven如何实现kafka消息消费”的相关介绍,筋斗云是国内较早的云主机应用的服务商,拥有10余年行业经验,提供丰富的云服务器、租用服务器等相关产品服务。云服务器资源弹性伸缩,主机vCPU、内存性能强悍、超高I/O速度、故障秒级恢复;电子化备案,提交快速,专业团队7×24小时服务支持!
简单好用、高性价比云服务器租用链接:https://www.jindouyun.cn/product/cvm