kafka_2.11-0.10.1.1安装
# 单机模式安装
# 下载
下载kafka_2.11-0.10.1.1.tgz (opens new window)
# 解压
将压缩包拷贝到/usr/local/hadoop
然后解压
$ cp kafka_2.11-0.10.1.1.tgz /usr/local/hadoop
$ tar -zxvf kafka_2.11-0.10.1.1.tgz
2
# 配置kafka环境变量
编辑/etc/profile
文件,增加如下配置
# kafka_2.11-0.10.1.1
export KAFKA_HOME=/usr/local/hadoop/kafka_2.11-0.10.1.1
export PATH=${PATH}:${KAFKA_HOME}/bin
2
3
使配置生效
$ source /etc/profile
# 配置kafka
编辑/usr/local/hadoop/kafka_2.11-0.10.1.1/config/server.properties
文件,修改如下配置。第21行
21 broker.id=1
22 log.dirs=/usr/local/hadoop/kafka_2.11-0.10.1.1/log-2
2
注意
这里的【broker.id】需要在集群中唯一,log-2目录也分别设置
# 验证安装
# 启动kafka
注意:需要先启动zookeeper集群
启动命令:kafka-server-start.sh -daemon ../config/server.properties
[root@mini1 config]# cd /usr/local/hadoop/kafka_2.11-0.10.1.1/bin/
[root@mini1 bin]# kafka-server-start.sh -daemon ../config/server.properties
2
通过jps,如果能看到kafka进程,表示启动成功。
# 通过zookeeper查看
通过zookeeper客户端查看目录结构
启动zookeeper客户端: ./zkCli.sh
$ [root@mini1 config]# cd /usr/local/hadoop/zookeeper-3.4.10/bin/
$ [root@mini1 bin]# ./zkCli.sh
2
通过ls /brokers/ids
查看当前已经启动的kafka代理节点
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[1]
2
可以看到,当前的代理节点只有一个1,也就是刚才我们配置的broker.id。
至此,kafka的单机模式已经安装完毕。
# 集群模式安装
当单机模式安装完成后,可以接着安装集群模式。这里是三台节点。
IP | hostname |
---|---|
192.168.56.121 | mini1 |
192.168.56.122 | mini2 |
192.168.56.123 | mini3 |
集群的安装也比较简单,只需要做一些配置,然后将安装目录拷贝到其他机器上即可。
# kafka的server.properties配置
编辑/usr/local/hadoop/kafka_2.11-0.10.1.1/config/server.properties
文件,增加kafka连接zookeeper集群的配置
在文件的第117行,修改如下配置。
zookeeper.connect=mini1:2181,mini2:2181,mini3:2181
# 拷贝kafka目录至其他从节点
将kafka目录拷贝至mini2和mini3节点上
$ [root@mini1 hadoop]# scp -r kafka_2.11-0.10.1.1/ mini2:/usr/local/hadoop/
$ [root@mini1 hadoop]# scp -r kafka_2.11-0.10.1.1/ mini3:/usr/local/hadoop/
2
将/etc/profile配置文件拷贝至mini2和mini3节点上
$ [root@mini1 config]# scp /etc/profile mini2:/etc/profile
$ [root@mini1 config]# scp /etc/profile mini3:/etc/profile
2
注意
然后分别在mini2和mini3上执行source /etc/profile
使配置文件立即生效
# 修改每个节点的borker.id
修改mini2的kafka的borker.id为2
修改mini3的kafka的borker.id为3
注意:当然也可以改为其他值,只要保证在集群中是唯一的即可。
# 修改每个节点的host.name
在该版本中,如果不设置host.name,则在创建主题及向主题发送消息时发生NOT_LEADER_FOR_PARTITION异常。
分别在mini1、mini2、mini3三台机器上修改server.properties文件,增加host.name=本机IP的配置
编辑/usr/local/hadoop/kafka_2.11-0.10.1.1/config/server.properties
文件,增加host.name=192.168.56.121的配置
在文件的第117行,修改如下配置。
例如:
mini1的server.properties
完整配置如下
broker.id=1
log.dirs=/usr/local/hadoop/kafka_2.11-0.10.1.1/log-1
advertised.host.name=192.168.56.121
2
3
mini2的server.properties
完整配置如下
broker.id=2
log.dirs=/usr/local/hadoop/kafka_2.11-0.10.1.1/log-2
advertised.host.name=192.168.56.122
2
3
mini3的server.properties
完整配置如下
broker.id=3
log.dirs=/usr/local/hadoop/kafka_2.11-0.10.1.1/log-3
advertised.host.name=192.168.56.123
2
3
# 启动kafka集群
# 分别启动
分别在mini2和mini3上执行如下命令启动kafka
[root@mini1 config]# cd /usr/local/hadoop/kafka_2.11-0.10.1.1/bin/
[root@mini1 bin]# kafka-server-start.sh -daemon ../config/server.properties
2
然后在三台节点上,分别通过jps命令查看是否有"kafka"进程
然后在zookeeper客户端上查看kafka的代理数
[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
[1, 2, 3]
[zk: localhost:2181(CONNECTED) 1]
2
3
可以看到,现在kafka的代理数已经是1,2,3了。
# 集群启动脚本
kafka本身没有提供集群启动脚本,不过可以自己实现。
注意:
brokers为你集群的所有hostname
KAFKA_HOME为你的安装路径
kafka-cluster-start.sh
#!/bin/bash
brokers=("mini1 mini2 mini3")
KAFKA_HOME="/bigdata/kafka/"
echo "INFO:Begin to start kafka cluster..."
for broker in ${brokers[@]}
do
echo " INFO:Start kafka on ${broker}"
ssh ${broker} -C "source /etc/profile; sh ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties"
if [ $? -eq 0 ]; then
echo " INFO:[${broker}] Start successfully "
fi
done
echo "INFO:Kafka cluster starts successfully !"
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kafka-cluster-stop.sh
#!/bin/bash
brokers=("mini1 mini2 mini3")
KAFKA_HOME="/bigdata/kafka/"
echo "INFO:Begin to stop kafka cluster..."
for broker in ${brokers[@]}
do
echo " INFO:stop kafka on ${broker}"
ssh ${broker} -C "source /etc/profile; sh ${KAFKA_HOME}/bin/kafka-server-stop.sh "
if [ $? -eq 0 ]; then
echo " INFO:[${broker}] stop successfully "
fi
done
echo "INFO:Kafka cluster stop successfully !"
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
至此,kafka的集群模式已经安装成功!
# 常见错误
# No kafka server to stop
在我通过kafka-server-stop.sh脚本关闭kafka时得到如下错误,通过jps能看到kafka进程,但是却stop不掉。
[root@mini1 bin]# jps
1809 SecondaryNameNode
1620 NameNode
3237 Kafka
1960 ResourceManager
3290 Jps
2653 ZooKeeperMain
2238 QuorumPeerMain
[root@mini1 bin]# kafka-server-stop.sh
No kafka server to stop
2
3
4
5
6
7
8
9
10
原因是有一些操作系统无法通过该命令找到pid,从而无法kill掉该进程。
kafka-server-stop.sh脚本内容如下,将17行注释,修改为18行内容即可。
17 #PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
18 PIDS=$(jps | grep -i 'Kafka' |awk '{print $1}')
2
注意:修改完成后,将kafka-server-stop.sh文件拷贝至其他从节点mini2和mini3上
[root@mini1 bin]# scp /usr/local/hadoop/kafka_2.11-0.10.1.1//bin/kafka-server-stop.sh mini2:/usr/local/hadoop/kafka_2.11-0.10.1.1/bin/
kafka-server-stop.sh 100% 1025 1.0KB/s 00:00
[root@mini1 bin]# scp /usr/local/hadoop/kafka_2.11-0.10.1.1//bin/kafka-server-stop.sh mini3:/usr/local/hadoop/kafka_2.11-0.10.1.1/bin/
kafka-server-stop.sh 100% 1025 1.0KB/s 00:00
2
3
4
# Unable to connect to zookeeper server within timeout: 6000
无法连接到zookeeper,请尝试查看zookeeper的端口是否可用
[2018-10-29 15:41:02,558] FATAL Fatal error during KafkaServerStartable startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1232)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:156)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:130)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:76)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:58)
at kafka.server.KafkaServer.initZk(KafkaServer.scala:327)
at kafka.server.KafkaServer.startup(KafkaServer.scala:200)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:39)
at kafka.Kafka$.main(Kafka.scala:67)
at kafka.Kafka.main(Kafka.scala)
2
3
4
5
6
7
8
9
10
11
12
CentOS开启防火墙及端口的命令:
查看防火墙状态:service iptables status
开启防火墙(重启后永久生效):chkconfig iptables on
关闭防火墙(重启后永久生效):chkconfig iptables off
开启防火墙(即时生效,重启后失效):service iptables start
关闭防火墙(即时生效,重启后失效):service iptables stop
重启防火墙:service iptables restartd
查看打开的端口:/etc/init.d/iptables status
开启6000端口:iptables -A INPUT -p tcp --dport 6000 -j ACCEPT
保存并重启防火墙:
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart
打开49152~65534之间的端口
iptables -A INPUT -p tcp --dport 49152:65534 -j ACCEPT
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
警告
例如刚才的错误,先打开6000端口,然后关闭防火墙。分别在其他从节点机器上都关闭防火墙
# Kafka-manager的安装
kafka-manager是雅虎开源的Kafka的web端管理工具
注意:上面下载的是源码,下载后需要按照后面步骤进行编译。如果觉得麻烦,可以直接从下面地址下载编译好的 kafka-manager-1.3.3.7.zip。 链接: http://pan.baidu.com/s/1qY8sGoO (opens new window) 密码: ye7b
# 下载解压
下载编译好的zip文件后,上传至服务器的任意节点。然后解压。例如:我把kafka-manager放到mini1节点上了。
[root@mini1 hadoop]# cd kafka-manager-1.3.3.7/
[root@mini1 kafka-manager-1.3.3.7]# ll
total 40
drwxr-xr-x. 3 root root 4096 Oct 13 19:00 application.home_IS_UNDEFINED
drwxr-xr-x. 2 root root 4096 Oct 13 18:58 bin
drwxr-xr-x. 2 root root 4096 Oct 13 19:00 conf
drwxr-xr-x. 2 root root 12288 Oct 13 18:58 lib
-rw-r--r--. 1 root root 6335 Jun 5 2017 README.md
-rw-r--r--. 1 root root 4 Oct 13 19:03 RUNNING_PID
drwxr-xr-x. 3 root root 4096 Oct 13 18:58 share
[root@mini1 kafka-manager-1.3.3.7]#
2
3
4
5
6
7
8
9
10
11
# 配置kafka-manager
编辑/usr/local/hadoop/kafka-manager-1.3.3.7/conf/application.conf
文件,修改如下内容。
修改kafka-manager连接的zookeeper地址,文件的第23行。
23 kafka-manager.zkhosts="mini1:2181,mini2:2181,mini3:2181"
# 启动kafka-manager
$ cd /usr/local/hadoop/kafka-manager-1.3.3.7
$ bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8090
2
注意端口占用的情况,默认使用9000端口。
如果启动不成功,则可以按提示删除如下文件即可。
[root@mini1 kafka-manager-1.3.3.7]# bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8090
This application is already running (Or delete /usr/local/hadoop/kafka-manager-1.3.3.7/RUNNING_PID file).
[root@mini1 kafka-manager-1.3.3.7]# rm -rf /usr/local/hadoop/kafka-manager-1.3.3.7/RUNNING_PID
2
3
# 关闭kafka-manager
kafka-manager并没有通过stop.sh脚本来停止kafka-manager,需要手工kill掉ProdServerStart
进程
kafka-manager进程:ProdServerStart
然后删除RUNNING_PID文件
[root@mini1 conf]# jps
3664 Kafka
1809 SecondaryNameNode
1620 NameNode
1960 ResourceManager
3736 ZooKeeperMain
4398 Jps
2238 QuorumPeerMain
4287 ProdServerStart
[root@mini1 conf]# kill -9 4287
[root@mini1 conf]# rm -rf /usr/local/hadoop/kafka-manager-1.3.3.7/RUNNING_PID
2
3
4
5
6
7
8
9
10
11
ps -ef | grep ProdServerStart
kill -9
进程kafka-manager的进程IDrm -rf /usr/local/hadoop/kafka-manager-1.3.3.7/RUNNING_PID
# 访问kafka-manager
# 附录
# kafka常用命令
# 启动kafka服务,三台主机分别输入此指令:
kafka-server-start.sh $KAFKA_HOME/config/server.properties &
# 以后台的方式启动
nohup kafka-server-start.sh $KAFKA_HOME/config/server.properties &
# 查看topic名
kafka-topics.sh --list --zookeeper hadoop-001:2181
# 创建topic名
kafka-topics.sh --create --zookeeper hadoop-001:2181 --replication-factor 3 --partitions 1 --topic first
# 创建生产者
kafka-console-producer.sh --broker-list hadoop-001:9092,hadoop-002:9092,hadoop-003:9092 --topic first
# 创建消费者
kafka-console-consumer.sh --zookeeper hadoop-001:2181 --from-beginning --topic first
# 1)查看当前服务器中的所有topic
kafka-topics.sh --list --zookeeper hadoop-001:2181
# 2)创建topic
kafka-topics.sh --create --zookeeper hadoop-001:2181 --replication-factor 3 --partitions 1 --topic first
# 选项说明:
# --topic 定义topic名
# --replication-factor 定义副本数
# --partitions 定义分区数
# 3) 删除topic
kafka-topics.sh --delete --zookeeper hadoop-001:2181 --topic first
# 需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
# 4)发送消息
kafka-console-producer.sh --broker-list hadoop-001:9092 --topic first
>hello world
>hadoop hadoop
# 5)消费消息
kafka-console-consumer.sh --zookeeper hadoop-001:2181 --from-beginning --topic first
# 6)查看某个Topic的详情
kafka-topics.sh --topic first --describe --zookeeper hadoop-002:2181
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# CentOS防火墙相关命令
# 查看防火墙状态:
service iptables status
# 开启防火墙(重启后永久生效):
chkconfig iptables on
# 关闭防火墙(重启后永久生效):
chkconfig iptables off
# 开启防火墙(即时生效,重启后失效):
service iptables start
# 关闭防火墙(即时生效,重启后失效):
service iptables stop
# 重启防火墙:
service iptables restartd
# 查看打开的端口:
/etc/init.d/iptables status
# 开启6000端口:
iptables -A INPUT -p tcp --dport 6000 -j ACCEPT
# 保存并重启防火墙:
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart
# 打开49152~65534之间的端口
iptables -A INPUT -p tcp --dport 49152:65534 -j ACCEPT
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30