Kafka在Linux下的安装和使用
Kafka简介
?? Tips:本文主要介绍在Linux系统中安装和使用Lafka的操作步骤。
安装Kafka
访问Kafka官网,下载安装包版本(https://kafka.apache.org/downloads),下载 kafka_2.12-3.3.2.tgz,前面的 2.12 是 Scala的版本号,后面的3.3.2是kafka的版本号。https://downloads.apache.org/kafka/3.3.2/kafka_2.12-3.3.2.tgz
下载后,按照命令进行安装:
cd /home/guozhong wget https://downloads.apache.org/kafka/3.3.2/kafka_2.12-3.3.2.tgz tar -zxvf kafka_2.12-3.3.2.tgz -C /opt/modules/app cd /opt/modules/app mv kafka_2.12-3.3.2 kafka chown -R hadoop ./kafka
启动Kafka
安装完成后,首先要启动Kafka。登录Linux系统(本教程使用已经创建的hadoop用户),打开一个终端,输入命令启动Zookeeper服务:
cd /opt/modules/app/kafka ./bin/zookeeper-server-start.sh config/zookeeper.properties
注意:执行完上面的命令后,终端窗口会返回一堆信息,然后停止不动了,没有回到shell命令提示符状态,这时,不要误以为是程序任务死掉了,而是Zookeeper服务器已经启动了,正处于运行状态下。所以,千万不要关闭该终端窗口,一旦关闭,Zookeeper服务就停止了。
请另打开一个终端,然后输入以下命令启动kafka服务:
cd /opt/modules/app/kafka ./bin/kafka-server-start.sh config/server.properties
同样的,执行上面的命令后,终端窗口也返回一堆信息,然后就会停止不动,没有回到shell命令提示符状态,同样不要以为死机了,而是Kafka服务器已经启动,正处于运行状态。所以不要关闭终端窗口。否则,关闭后Kafka服务就停止了。
当然,要想kafka在后台运行,可以采用在结尾增加"&"的命令:
cd /opt/modules/app/kafka ./bin/kafka-server-start.sh config/server.properties &
这样,Kafka启动后就会在后台运行,即使关闭了终端窗口,服务也一直在运行。
查看当前启动的服务:
[root@hadoop01 kafka]# jps 2711 Kafka 4600 Jps 2255 QuorumPeerMain
创建Topic
再打开第三个终端,然后输入以下命令,创建一个自定义名称为“testsender”的Topic:(这2.12版本之后的创建方式,老版本的创建方式命令不同,可以查看??常见问题 说明)
./bin/kafka-topics.sh --create --topic testsender --bootstrap-server localhost:9092 Created topic testsender.
可执行如下命令,查看"testsender"的Topic是否已成功创建:
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092 testsender
启动生产者和消费者
启动生产者发送消息:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testsender >hello >123456
启动消费者接收消息:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testsender --from-beginning 接收到消息: hello 123456
??常见问题
解决zookeeper is not a recognized option问题
出现如下错误,这时不要慌
分析这个问题可能有两点:
第一点:当我们使用如下命令创建 topic 时,会报此错误。比如要创建一个名称为“testsender”的topic,当前安装的版本是2.12,这时我们要使用新版本的创建topic命令来执行,老版本是报以上错误的。
老版本的创建topic方式:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testsender
我们采用新版本的创建topic方式:
./bin/kafka-topics.sh --create --topic testsender --bootstrap-server localhost:9092
第二点:如果采用新版本的创建方式还是报错,这时要考虑的问题多数在配置文件上,这个问题多数是因为:kafka 的配置文件和 zookeeper 的配置文件,我们都按照默认,并没有修改,这里可能需要修改就是这两个文件:kafka-server-start.sh 和 zookeeper-server-start.sh中默认的启动内存,初始值有点大,如果服务器的配置比较低的话,会报内存不足的错误,这个问题不太容易发现,这里我们可以把启动内存调小一点。
kafka-server-start.sh文件: Xms1G 调整为 Xms128M
默认:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
修改后:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms128M" fi EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
zookeeper-server-start.sh文件:Xmx512M 调整为 Xms128M
默认:
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties" fi if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M" fi EXTRA_ARGS=${EXTRA_ARGS-'-name zookeeper -loggc'}
修改为:
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties" fi if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M -Xms128M" fi EXTRA_ARGS=${EXTRA_ARGS-'-name zookeeper -loggc'}
接下来重启服务,再试一下:
./bin/kafka-topics.sh --create --topic testsender --bootstrap-server localhost:9092 Created topic testsender.