在CentOS上消费Kafka消息,你需要遵循以下步骤:
-
安装Java: Kafka是基于Java开发的,因此首先需要在CentOS上安装Java。你可以使用以下命令来安装OpenJDK:
sudo yum install java-1.8.0-openjdk-devel安装完成后,可以通过
java -version命令来验证Java是否安装成功。 -
下载并解压Kafka: 你可以从Kafka官方网站下载最新版本的Kafka。下载完成后,使用以下命令解压:
tar -xzf kafka_2.13-*.tgz cd kafka_2.13-*请将
*替换为你下载的具体版本号。 -
启动Zookeeper: Kafka使用Zookeeper来管理集群和消息。在启动Kafka之前,需要先启动Zookeeper。在Kafka目录下,使用以下命令启动Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.propertiesZookeeper默认监听2181端口。
-
启动Kafka服务器: 在另一个终端窗口中,启动Kafka服务器:
bin/kafka-server-start.sh config/server.propertiesKafka默认监听9092端口。
-
创建Topic(如果尚未创建): 使用以下命令创建一个名为
my-topic的Topic:bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 -
消费消息: 使用以下命令来消费
my-topic中的消息:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning参数
--from-beginning表示从Topic的开始处消费消息。如果不加这个参数,消费者将从最新的消息开始消费。 -
运行消费者应用程序: 如果你想编写自己的消费者应用程序来消费Kafka消息,你可以使用Kafka提供的客户端库。以下是一个简单的Java消费者示例:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my-topic")); try { while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }); } } finally { consumer.close(); } } } 将上述代码保存为
SimpleConsumer.java,然后编译并运行它。确保你的CLASSPATH包含了Kafka客户端库。
以上步骤将帮助你在CentOS系统上设置和消费Kafka消息。记得在实际部署时,你需要根据实际情况调整配置文件中的参数,例如bootstrap.servers、group.id等。
以上就是关于“kafka消息centos如何消费”的相关介绍,筋斗云是国内较早的云主机应用的服务商,拥有10余年行业经验,提供丰富的云服务器、租用服务器等相关产品服务。云服务器资源弹性伸缩,主机vCPU、内存性能强悍、超高I/O速度、故障秒级恢复;电子化备案,提交快速,专业团队7×24小时服务支持!
简单好用、高性价比云服务器租用链接:https://www.jindouyun.cn/product/cvm