Kafka安装
下载kafka
wget https://downloads.apache.org/kafka/2.6.0/kafka_2.13-2.6.0.tgz
解压缩
tar -xf kafka_2.13-2.6.0.tgz -C /data/kafka/
创建kafka用户及用户组
gruopadd kafka
useradd -g kafka kafka
赋予文件用户权限
chown -R kafka.kafka /data/kafka
配置kafka集群信息
#进入目录
cd /data/kafka/kafka_2.13-2.6.0
#编辑配置文件
vim config/server.properties
#broker id 唯一不可重复!,如果不设置会自动设置。
broker.id=1
#监听地址端口,改成对应节点的IP
listeners=PLAINTEXT://node1:9092
#接收发送的网络线程数
num.network.threads=3
#请求的线程数,包括磁盘I/O
num.io.threads=8
#发送的缓冲区大小
socket.send.buffer.bytes=102400
#接收的缓冲区大小
socket.receive.buffer.bytes=102400
#接收的最大请求大小
socket.request.max.bytes=104857600
############# Log Basics #############
#日志存储路径
log.dirs=/data/kafka/kafka_2.13-2.6.0/logs
#默认日志分区
num.partitions=1
#恢复及关闭时刷新的线程数
num.recovery.threads.per.data.dir=1
############# Internal Topic Settings #############
recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############# Log Retention Policy #############
#日志文件删除前保留的时间
log.retention.hours=168
#保留文件的大小
log.retention.bytes=1073741824
#单个日志文件大小
log.segment.bytes=1073741824
#检查日志时间间隔
log.retention.check.interval.ms=300000
############# Zookeeper #############
#zookeeper集群的地址
zookeeper.connect=node1:2181,node2:2181,node3:2181,node4:2181,node5:2181/kafka-2.6.0
#zookeeper的超时时间
zookeeper.connection.timeout.ms=18000
将配置好的kafka文件传到其他节点中
#nodex代表其他节点
scp -r /data/kafka root@nodex:/data
修改配置文件
vim config/server.properties
#broker id 唯一不可重复!,如果不设置会自动设置。
broker.id=x
#监听地址端口,改成对应节点的IP
listeners=PLAINTEXT://nodex:9092
将kafka做成服务
vim /lib/systemd/system/kafka.service
[Unit]
Description=kafka
#因为kafka依靠zookeeper,如果没有将zookeeper做成服务,可以去掉手动启动zookeeper.service 或者是参考博客将zookeeper也做成服务。
After=network.target zookeeper.service
[Service]
#后台启动
Type=forking
#启动的用户
User=kafka
#启动的用户组
Group=kafka
Environment=JAVA_HOME=/usr/java/jdk-11.0.9
Environment=/data/kafka/kafka_2.13-2.6.0
#开启kafka命令
ExecStart=/data/kafka/kafka_2.13-2.6.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.6.0/config/server.properties
#关闭kafka命令
ExecStop=/data/kafka/kafka_2.13-2.6.0/bin/kafka-server-stop.sh
ExecReload=/bin/kill -s HUP $MAINPID
#是否分配单独空间
PrivateTmp=true
[Install]
WantedBy=multi-user.target
重新加载systemctl服务配置文件
systemctl daemon-reload
开启关闭kafka
#开启kafka
systemctl start kafka.service
#关闭kafka
systemctl stop kafka.service
#查看是否启动
systemctl status kafka.service
Kafka可视化界面安装CMAK(kafka manager)
下载CMAK(在kafka集群任意节点中)
git clone https://github.com/yahoo/CMAK.git
编译安装CMAK
#进入到CMAK目录中
cd CMAK
#sbt是CMAK目录中的内置命令,不需要安装sbt,编译需要java11 由于CMAK是scala编写,所以通过sbt进行编译。
#编译时间需要很久,期间可以撸撸猫。
./sbt clean dist
#进去编译完成的目录
cd target/universal
#解压文件到指定目录
unzip cmak-3.0.0.5.zip -d /data/
配置CMAK
#进入cmak目录
cd /data/cmak-3.0.0.5/
#配置cmak
vim conf/application.conf
#与kafka集群中配置的zk相同
kafka-manager.zkhosts="node1:2181,node2:2181,node3:2181,node4:2181,node5:2181/kafka-2.6.0"
cmak.zkhosts="node1:2181,node2:2181,node3:2181,node4:2181,node5:2181/kafka-2.6.0"
启动
#-Dhttp.port 指定端口号,默认为9000
bin/cmak -Dhttp.port=9000
访问CMAK IP:9000
添加一个需要监控的kafka集群(Add cluster)
这里简单进行一些配置还有一些集群的参数配置,完成后可以监控一个集群。
进去集群中可以看到Topic的数量,Brokers的数量。以及集群、topic的详细信息,创建删除topic的一些操作。