A5d16cf6ef70c9183745a7831deb70a7
从0开始搭建多节点kafka集群

环境及版本:mac系统,kafka 0.11,java8,zk3.4.10,100%可复现

[TOC]

安装Java

  不论是ZooKeeper还是kakfa都需要提前安装好Java,这里选择的是java8版本。下载地址:https://www.java.com/zh_CN/download/mac_download.jsp

  按提示步骤下载即可,下载完成后,我们需要配置一下java的环境变量。
  首先我们在终端中输入:/usr/libexec/java_home -V,用来查看我们安装的jdk地址

  选择要进行配置的java版本,复制路径。输入open -e .bash_profile,打开配置文件,按照以下格式输入,并将之前复制的路径,粘贴到JAVA_HOME处。保存文件

JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home
PATH=$JAVA_HOME/bin:$PATH
export 
export

  输入source .bash_profile进行保存,接下来我们就可以验证以下配置是否成功。
  输入echo $JAVA_HOME查看JAVA_HOME配置,输入java -version查看java版本。

安装ZooKeeper

  ZooKeeper是安装Kafka集群必要的组件,我们选择要安装的Zk版本是3.4.10。
  下载地址:
  https://archive.apache.org/dist/zookeeper/
  zookeeper的默认配置文件为zookeeper/conf/zoo_sample.cfg,需要将其修改为zoo.cfg,其中存放着zookeeper的配置信息,我们可以根据需要进行调整。最后我们可以在终端中进入zk的安装路径,输入bin/zkServer.sh start启动zk。你将看到如下输出

ZooKeeper JMX enabled by default
Using config: /Users/lijl/kafka/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

  还可以输入:bin/zkServer.sh status来查看当前zk的状态,来验证zk是否启动成功。

ZooKeeper JMX enabled by default
Using config: /Users/lijl/kafka/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: standalone

  从输出可以看出,我们已经成功启动了一个单点zk,我们可以输入bin/zkServer.sh stop来关闭zk。

ZooKeeper JMX enabled by default
Using config: /Users/lijl/kafka/zookeeper-3.4.10/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED

安装kafka

  接下来就是我们这篇文章的重点,安装kafka,我们要安装的是一个多点环境的kafka,因为kafka伪集群(多个kakfa部署在同一个服务器上)只能进行业务开发或者是验证功能。但是不能在生产环境直接运用,否则就会失去kafka的分布式特性,比如负载均衡、故障转移等。
  多点kafka集群,是由一套多节点ZooKeeper集群和一套多节点kafka集群组成。所以我们想要搭建多点的kafka集群,首先要搭建多点的Zookeeper集群。ZooKeeper想要正常服务,要满足一个条件,“半数以上服务器存活”,即一个2n+1个服务器搭建的集群,最多可以容忍n台服务器宕机而依然保持服务状态。(我们最好选取奇数个zk服务器搭建集群,否则在进行leader选举的时候,会出现脑裂现象),本文将搭建3个节点的zk集群,主机名分别是:zk1、zk2、zk3。
  下面我们给出一个经典的多节点环境配置文件zoo.cfg,并对其参数进行解释。

tickTime=2000
dataDir=/Users/lijl/kafka/dataDir
clientPort=2181
initLimit=5
syncLimit=2
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888

  tickTime:Zookeeper最小时间单位,用于丈量心跳时间和超时时间,通常设置成默认值:2秒即可
  dataDir:ZooKeeper会在内存中保存系统快照,并定期写入该路径的指定文件夹中,生产环境下应该转移该文件夹的磁盘占用情况。
  clientPort:Zookeeper监听客户端链接的端口,一般设置成默认的2181
  initLimit:指定follower节点初始时连接leader最大tick次数,假设是5,那么follower必须在5 * tickTime(2s) = 10s内连接上leader,否则超时。
  SyncLimit:设定了follower节点和leader节点进行同步的最大时间,算法和initLimit类似。
  server.X=host:port:port:X必须是一个全局唯一的数字,并且要与myid文件中的数字对应,X一般取1~255,这行的后面配置了2个端口,第一个端口用于follower节点连接leader节点,第二个端口用于leader选举。myid文件位于zoo.cfg中DataDir配置的目录下,内容仅有一个数字ID。
  接下来,我们进入conf目录,创建3个配置文件zoo1.cfg、zoo2.cfg、zoo3.cfg内容如下(注意:这里server.x配置中的zk是我们zk节点所在的主机名):

tickTime=2000
dataDir=/Users/lijl/kafka/dataDir/zk1
clientPort=2181
initLimit=5
syncLimit=2
server.1=zk:2888:3888
server.2=zk:2889:3889
server.3=zk:2890:3890

tickTime=2000
dataDir=/Users/lijl/kafka/dataDir/zk2
clientPort=2182
initLimit=5
syncLimit=2
server.1=zk:2888:3888
server.2=zk:2889:3889
server.3=zk:2890:3890

tickTime=2000
dataDir=/Users/lijl/kafka/dataDir/zk3
clientPort=2183
initLimit=5
syncLimit=2
server.1=zk:2888:3888
server.2=zk:2889:3889
server.3=zk:2890:3890

  这里我们分别取了2181、2182、2183,其实取一个也是可以的,要确保端口没有冲突。文本只是在一台电脑上进行搭建,所以选了不同的端口,防止冲突,主机名也是相同的。
  创建好配置文件,接下来就要创建myid文件了,该文件必须在我们配置的dataDir中,在终端中输入以下内容:
  echo "1" > /Users/lijl/kafka/dataDir/zk1/myid
  echo "2" > /Users/lijl/kafka/dataDir/zk2/myid
  echo "3" > /Users/lijl/kafka/dataDir/zk3/myid
  进行快速的创建文件以及数据的写入,接下来我们来运行一下这3个zk实例。
  bin/zkServer.sh start conf/zoo1.cfg
  bin/zkServer.sh start conf/zoo2.cfg
  bin/zkServer.sh start conf/zoo3.cfg
  并分别查看他们的状态:bin/zkServer.sh status conf/zoo1.cfg(zoo2、zoo3类似)

ZooKeeper JMX enabled by default
Using config: conf/zoo1.cfg
Mode: follower

ZooKeeper JMX enabled by default
Using config: conf/zoo2.cfg
Mode: leader

conf/zoo3.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo3.cfg
Mode: follower

  至此,我们已经搭建了一个3个节点的zookeepr集群。接下来我们就在这个多节点的zk集群的基础上,搭建多节点kafka集群。
  首先我们下载kafka,选取的版本是0.11.3,下载地址:
  http://kafka.apache.org/downloads

  和搭建zk集群类似,我们也需要创建多份配置文件。(conf下创建3份配置文件,注意:这里listeners配置中的kafka1/2/3 代表我们kafka集群的主机名,zookeeper.connect中配置的zk,是我们zk节点所在的主机名)

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://kafka1:9002
log.dirs=/Users/lijl/kafka/dataDir/kafka1
zookeeper.connect=zk:2181,zk:2182,zk1:2183
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=6000

broker.id=1
delete.topic.enable=true
listeners=PLAINTEXT://kafka2:9003
log.dirs=/Users/lijl/kafka/dataDir/kafka2
zookeeper.connect=zk:2181,zk:2182,zk1:2183
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=6000

broker.id=2
delete.topic.enable=true
listeners=PLAINTEXT://kafka3:9004
log.dirs=/Users/lijl/kafka/dataDir/kafka3
zookeeper.connect=zk:2181,zk:2182,zk1:2183
unclean.leader.election.enable=false
zookeeper.connection.timeout.ms=6000

  我们简单的介绍一下这些配置的含义:
  broder.id:broder的唯一标识
  delete.topic.enable:是否可以删除主题
  listener:该参数用于客户端和broker进行连接,用来绑定私网ip,如果想绑定公网ip,应该设置的是:advertised.listeners这个参数。
  log.dirs:kafka消息持久化的路径
  zookeeper.connect:zk节点,这里注意,应该配置全部的zk节点
  unclean.leader.election.enable:消息同步落后太多的副本可不可以进行leader选举。
  zookeeper.connection.timeout.ms:zk连接超时时间。
  接着我们只需要执行命令启动kafka就可以了。
  bin/kafka-server-start.sh -daemon config/server1.properties
  bin/kafka-server-start.sh -daemon config/server2.properties
  bin/kafka-server-start.sh -daemon config/server3.properties
  我们可以查看位于kafka logs目录下的server.log,确认kafka broker已经启动成功。

  也可以通过jps |grep Kafka来确认broker进程是否启动成功(Kafka大写)

  如果没有正确启动,首先应该着重检查kafka配置是否正确,比如listeners少加了一个s,那么在启动kafka的时候,就会启动默认的主机和端口配置,会报端口占用,而不是帮你检查配置文件是否正确

验证

  此时我们的kakfa多点集群环境已经搭建完成,接下来我们来对这个集群做一些简单的验证。

测试topic创建

  首先我们创建一个测试用的topic,名为test-topic,为了充分利用搭建的3台服务器,我们选择创建3个分区,每一个分区分配3个副本。
  bin/kafka-topics.sh --zookeeper zk:2181,zk:2182,zk:2183 --create --topic test-topic --partitions 3 --replication-factor 3
  这里的zk指的是zookeeper节点主机名,执行成功后,会输出:Created topic "test-topic"
  接下来通过一些命令来做一下验证,查看所有的topic列表
  bin/kafka-topics.sh --zookeeper zk:2181,zk:2182,zk:2183 -list,执行后显示我们刚创建的test-topic
  bin/kafka-topics.sh --zookeeper zk:2181,zk:2182,zk:2183 --describe --topic test-topic,执行后将显示test-topic的详细信息,包括分区数,副本数,ISR,分区Leader信息等。

Topic:test-topic    PartitionCount:3    ReplicationFactor:3 Configs:
    Topic: test-topic   Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
    Topic: test-topic   Partition: 1    Leader: 2   Replicas: 2,0,1 Isr: 2,0,1
    Topic: test-topic   Partition: 2    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2

测试消息读写

  最后我们对消息的发送和消费进行测试,使用kafka提供的kafka-console-producer和kafka-console-consumer脚本进行数据的读写。具体的命令如下:
  bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test-topic --from-beginning
  bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test-topic --from-beginning
  我们可以同时开启两个终端进行操作,命令中的kafka1/2/3指的是kafka所在的节点主机名。我们可以观察出来,producer脚本产生的消息,consumer脚本马上就能感知到。

  最后,期待您的订阅和点赞,专栏每周都会更新,希望可以和您一起进步,同时也期待您的批评与指正!
image

© 著作权归作者所有
这个作品真棒,我要支持一下!
一个坚持原创的小专栏。分享编程知识,提升工作效率,致力于通过简单的语言,把编程这点事讲清楚。涵盖内容:java、设...
0条评论
top Created with Sketch.