夜间模式暗黑模式
字体
阴影
滤镜
圆角
主题色
kafka2.8 raft集群搭建

kafka在历史版本中,由zookeeper集群来协调选举kafka leader。kafka集群中所有的topic以及partition信息都需要在zookeeper中注册。虽然zookeeper集群内部提供了对数据的强一致性,但是随着topic和partition逐渐增多的情况下,数据一致性问题就显著起来了,延迟会越来越大。维护问题也是一方面,一个消息队列还需要一个外部的集群来做到数据的一致性管理,这里也是一个比较突出的问题。

所以与其将kafka日志等信息存储在外部系统中,不如在本地集群中存储。这样就引入一个非常优秀的算法Raft协议。主要实现两个重要的功能,leader选举、日志复制,日志复制为多个副本提供数据强一致性,并且一个显著的特点是Raft节点是去中心化的架构,不依赖外部的组件,而是作为一个协议簇嵌入到应用中的,即与应用本身是融合为一体的。

集群配置

2.8.0舍弃zookeeper集群,但是也可以配置zookeeper,只是raft集群的测试版本,不推荐在生产环境中使用,预计在3.0以后舍弃zookeeper。raft集群中每个节点有2种角色broker和controller,controller节点就是负责替代zookeeper集群存在的。

nodebrokercontroller
node1
node2
node3
node4
node5

下载安装包

wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz

tar -xf kafka_2.13-2.8.0.tgz

cd kafka_2.13-2.8.0

配置节点信息

node1,node2,node3

vim config/kraft/server.properties

process.roles=broker,controller
#id唯一从1开始,其他节点这里需要修改 node2  node.id=2 node3 node.id=3
node.id=1
#这里写的是controller集群节点的信息   id号@节点端口号,id号@节点端口号
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER
#下面换成对应节点的ip
listeners=PLAINTEXT://node1:9092,CONTROLLER://node1:9093
advertised.listeners=PLAINTEXT://node1:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
#log数据存储位置,存在当前目录了
log.dirs=kraft-combined-logs

node4,node5

vim config/kraft/server.properties

process.roles=broker
#id唯一从1开始,其他节点这里需要修改 node5  node.id=5
node.id=4
#这里写的是controller集群节点的信息   id号@节点端口号,id号@节点端口号
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093
inter.broker.listener.name=PLAINTEXT
#下面换成对应节点的ip
listeners=PLAINTEXT://node4:9092
advertised.listeners=PLAINTEXT://node4:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
#log数据存储位置,存在当前目录了
log.dirs=kraft-combined-logs

集群启动操作

#node1
获取一个随机的uuid
bin/kafka-storage.sh random-uuid
BaaRquLXSW2MUVtP9QmbAw

#利用生成的uuid初始化每一个节点 node1-node5
bin/kafka-storage.sh format -t BaaRquLXSW2MUVtP9QmbAw -c config/kraft/server.properties

#启动所有的节点 node1-node5   (-daemon 代表后台启动)
bin/kafka-server-start.sh -daemon config/kraft/server.properties
由于我log存储的是当前目录,所以在当前目录生成了kraft-combined-logs文件
cd kraft-combined-logs
#这里存储的就是当前节点的信息
cat meta.properties

kafka操作

创建一个topic

#这里指定broker的地址或者是controller的地址都可以
bin/kafka-topics.sh --create --topic kraft-test --partitions 3 --replication-factor 3 --bootstrap-server node1:9092

#使用controller地址
bin/kafka-topics.sh --create --topic kraft-test --partitions 3 --replication-factor 3 --bootstrap-server node1:9093

查询topic

#利用borker端口进行查询
bin/kafka-topics.sh --bootstrap-server node1:9092 --list
#当我利用controller端口进行查询的时候返回的是空(不知道是新出的版本问题,还是我操作有问题后续解决了在这更新)
bin/kafka-topics.sh --bootstrap-server node1:9093 --list

查看topic信息

#查询这里也只能使用borker端口来使用,conntroller端口无法使用。
bin/kafka-topics.sh --bootstrap-server node1:9093 --describe --topic kraft-test

producer-consumer测试

#这里也是无法使用controller端口,我理解controller端口可能只做集群的选举,数据的同步等工作,以前觉得连接kafka既可以写broker集群的地址,又可以写zookeeper地址也挺鸡肋的。后续如果解决了我在这里更新,如果有人知道欢迎留言。

#发送端
bin/kafka-console-producer.sh --broker-list node1:9092 --topic kraft-test

#接收端
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic kraft-test

删除topic

#这里我使用controller的端口直接报错。
bin/kafka-topics.sh --delete --topic kraft-zsj1 --bootstrap-server node1:9092

查询metadata信息

#利用shell查询metadata信息,类似于操作zookeeper
bin/kafka-metadata-shell.sh  --snapshot kraft-combined-logs/\@metadata-0/00000000000000000000.log
#可以获取brokers节点的id
ls brokers/
#获取topic的信息
ls topics/
#获取tipic id的信息
ls topicIds/
#获得某个topic的信息
cat topics/kraft/-test/0/data
#获取leader的节点
cat metadateQuorum/leader
暂无评论

发送评论 编辑评论


				
上一篇
下一篇