kafka在历史版本中,由zookeeper集群来协调选举kafka leader。kafka集群中所有的topic以及partition信息都需要在zookeeper中注册。虽然zookeeper集群内部提供了对数据的强一致性,但是随着topic和partition逐渐增多的情况下,数据一致性问题就显著起来了,延迟会越来越大。维护问题也是一方面,一个消息队列还需要一个外部的集群来做到数据的一致性管理,这里也是一个比较突出的问题。
所以与其将kafka日志等信息存储在外部系统中,不如在本地集群中存储。这样就引入一个非常优秀的算法Raft协议。主要实现两个重要的功能,leader选举、日志复制,日志复制为多个副本提供数据强一致性,并且一个显著的特点是Raft节点是去中心化的架构,不依赖外部的组件,而是作为一个协议簇嵌入到应用中的,即与应用本身是融合为一体的。
集群配置
2.8.0舍弃zookeeper集群,但是也可以配置zookeeper,只是raft集群的测试版本,不推荐在生产环境中使用,预计在3.0以后舍弃zookeeper。raft集群中每个节点有2种角色broker和controller,controller节点就是负责替代zookeeper集群存在的。
node | broker | controller |
node1 | ✔ | ✔ |
node2 | ✔ | ✔ |
node3 | ✔ | ✔ |
node4 | ✔ | |
node5 | ✔ |
下载安装包
wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0
配置节点信息
node1,node2,node3
vim config/kraft/server.properties
process.roles=broker,controller
#id唯一从1开始,其他节点这里需要修改 node2 node.id=2 node3 node.id=3
node.id=1
#这里写的是controller集群节点的信息 id号@节点端口号,id号@节点端口号
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER
#下面换成对应节点的ip
listeners=PLAINTEXT://node1:9092,CONTROLLER://node1:9093
advertised.listeners=PLAINTEXT://node1:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
#log数据存储位置,存在当前目录了
log.dirs=kraft-combined-logs
node4,node5
vim config/kraft/server.properties
process.roles=broker
#id唯一从1开始,其他节点这里需要修改 node5 node.id=5
node.id=4
#这里写的是controller集群节点的信息 id号@节点端口号,id号@节点端口号
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093
inter.broker.listener.name=PLAINTEXT
#下面换成对应节点的ip
listeners=PLAINTEXT://node4:9092
advertised.listeners=PLAINTEXT://node4:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
#log数据存储位置,存在当前目录了
log.dirs=kraft-combined-logs
集群启动操作
#node1
获取一个随机的uuid
bin/kafka-storage.sh random-uuid
BaaRquLXSW2MUVtP9QmbAw
#利用生成的uuid初始化每一个节点 node1-node5
bin/kafka-storage.sh format -t BaaRquLXSW2MUVtP9QmbAw -c config/kraft/server.properties
#启动所有的节点 node1-node5 (-daemon 代表后台启动)
bin/kafka-server-start.sh -daemon config/kraft/server.properties
cd kraft-combined-logs
#这里存储的就是当前节点的信息
cat meta.properties
kafka操作
创建一个topic
#这里指定broker的地址或者是controller的地址都可以
bin/kafka-topics.sh --create --topic kraft-test --partitions 3 --replication-factor 3 --bootstrap-server node1:9092
#使用controller地址
bin/kafka-topics.sh --create --topic kraft-test --partitions 3 --replication-factor 3 --bootstrap-server node1:9093
查询topic
#利用borker端口进行查询
bin/kafka-topics.sh --bootstrap-server node1:9092 --list
#当我利用controller端口进行查询的时候返回的是空(不知道是新出的版本问题,还是我操作有问题后续解决了在这更新)
bin/kafka-topics.sh --bootstrap-server node1:9093 --list
查看topic信息
#查询这里也只能使用borker端口来使用,conntroller端口无法使用。
bin/kafka-topics.sh --bootstrap-server node1:9093 --describe --topic kraft-test
producer-consumer测试
#这里也是无法使用controller端口,我理解controller端口可能只做集群的选举,数据的同步等工作,以前觉得连接kafka既可以写broker集群的地址,又可以写zookeeper地址也挺鸡肋的。后续如果解决了我在这里更新,如果有人知道欢迎留言。
#发送端
bin/kafka-console-producer.sh --broker-list node1:9092 --topic kraft-test
#接收端
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic kraft-test
删除topic
#这里我使用controller的端口直接报错。
bin/kafka-topics.sh --delete --topic kraft-zsj1 --bootstrap-server node1:9092
查询metadata信息
#利用shell查询metadata信息,类似于操作zookeeper
bin/kafka-metadata-shell.sh --snapshot kraft-combined-logs/\@metadata-0/00000000000000000000.log
#可以获取brokers节点的id
ls brokers/
#获取topic的信息
ls topics/
#获取tipic id的信息
ls topicIds/
#获得某个topic的信息
cat topics/kraft/-test/0/data
#获取leader的节点
cat metadateQuorum/leader