浅墨散人 浅墨散人
  • 基础
  • 设计模式
  • JVM
  • Maven
  • SpringBoot
  • 基础
  • Flask
  • Diango
  • Pandas
  • SqlAlchemy
  • Sqoop
  • Flume
  • Flink
  • Hadoop
  • Hbase
  • Hive
  • Kafka
  • Kylin
  • Zookeeper
  • Tez
  • MySQL
  • Doris
  • Chrome
  • Eclipse
  • IDEA
  • iTerm2
  • Markdown
  • SublimeText
  • VirtualBox
  • WebStrom
  • Linux
  • Mac
  • Hexo
  • Git
  • Vue
  • VuePress
  • 区块链
  • 金融
数据仓库
数据治理
读书笔记
关于我
GitHub (opens new window)
  • 基础
  • 设计模式
  • JVM
  • Maven
  • SpringBoot
  • 基础
  • Flask
  • Diango
  • Pandas
  • SqlAlchemy
  • Sqoop
  • Flume
  • Flink
  • Hadoop
  • Hbase
  • Hive
  • Kafka
  • Kylin
  • Zookeeper
  • Tez
  • MySQL
  • Doris
  • Chrome
  • Eclipse
  • IDEA
  • iTerm2
  • Markdown
  • SublimeText
  • VirtualBox
  • WebStrom
  • Linux
  • Mac
  • Hexo
  • Git
  • Vue
  • VuePress
  • 区块链
  • 金融
数据仓库
数据治理
读书笔记
关于我
GitHub (opens new window)
  • Kafka

    • Kafka
    • kafka_2.11-0.10.1.1安装
      • 单机模式安装
        • 下载
        • 解压
        • 配置kafka环境变量
        • 配置kafka
        • 验证安装
      • 集群模式安装
        • kafka的server.properties配置
        • 拷贝kafka目录至其他从节点
        • 修改每个节点的borker.id
        • 修改每个节点的host.name
        • 启动kafka集群
      • 常见错误
        • No kafka server to stop
        • Unable to connect to zookeeper server within timeout: 6000
      • Kafka-manager的安装
        • 下载解压
        • 配置kafka-manager
        • 启动kafka-manager
        • 关闭kafka-manager
        • 访问kafka-manager
      • 附录
        • kafka常用命令
        • CentOS防火墙相关命令
  • BigData
  • Kafka
2018-10-13
目录

kafka_2.11-0.10.1.1安装

# 单机模式安装

官方文档 (opens new window)

# 下载

下载kafka_2.11-0.10.1.1.tgz (opens new window)

# 解压

将压缩包拷贝到/usr/local/hadoop然后解压

$ cp kafka_2.11-0.10.1.1.tgz /usr/local/hadoop
$ tar -zxvf kafka_2.11-0.10.1.1.tgz
1
2

# 配置kafka环境变量

编辑/etc/profile文件,增加如下配置

# kafka_2.11-0.10.1.1
export KAFKA_HOME=/usr/local/hadoop/kafka_2.11-0.10.1.1
export PATH=${PATH}:${KAFKA_HOME}/bin
1
2
3

使配置生效

$ source /etc/profile
1

# 配置kafka

编辑/usr/local/hadoop/kafka_2.11-0.10.1.1/config/server.properties文件,修改如下配置。第21行

 21 broker.id=1
 22 log.dirs=/usr/local/hadoop/kafka_2.11-0.10.1.1/log-2
1
2

注意

这里的【broker.id】需要在集群中唯一,log-2目录也分别设置

# 验证安装

# 启动kafka

注意:需要先启动zookeeper集群

启动命令:kafka-server-start.sh -daemon ../config/server.properties

[root@mini1 config]# cd /usr/local/hadoop/kafka_2.11-0.10.1.1/bin/
[root@mini1 bin]# kafka-server-start.sh -daemon ../config/server.properties
1
2

通过jps,如果能看到kafka进程,表示启动成功。

# 通过zookeeper查看

通过zookeeper客户端查看目录结构

启动zookeeper客户端: ./zkCli.sh

$ [root@mini1 config]# cd /usr/local/hadoop/zookeeper-3.4.10/bin/
$ [root@mini1 bin]# ./zkCli.sh
1
2

通过ls /brokers/ids查看当前已经启动的kafka代理节点

[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[1]
1
2

可以看到,当前的代理节点只有一个1,也就是刚才我们配置的broker.id。

至此,kafka的单机模式已经安装完毕。

# 集群模式安装

当单机模式安装完成后,可以接着安装集群模式。这里是三台节点。

IP hostname
192.168.56.121 mini1
192.168.56.122 mini2
192.168.56.123 mini3

集群的安装也比较简单,只需要做一些配置,然后将安装目录拷贝到其他机器上即可。

# kafka的server.properties配置

编辑/usr/local/hadoop/kafka_2.11-0.10.1.1/config/server.properties文件,增加kafka连接zookeeper集群的配置

在文件的第117行,修改如下配置。

zookeeper.connect=mini1:2181,mini2:2181,mini3:2181
1

# 拷贝kafka目录至其他从节点

将kafka目录拷贝至mini2和mini3节点上

$ [root@mini1 hadoop]# scp -r kafka_2.11-0.10.1.1/ mini2:/usr/local/hadoop/
$ [root@mini1 hadoop]# scp -r kafka_2.11-0.10.1.1/ mini3:/usr/local/hadoop/
1
2

将/etc/profile配置文件拷贝至mini2和mini3节点上

$ [root@mini1 config]# scp /etc/profile mini2:/etc/profile
$ [root@mini1 config]# scp /etc/profile mini3:/etc/profile
1
2

注意

然后分别在mini2和mini3上执行source /etc/profile使配置文件立即生效

# 修改每个节点的borker.id

修改mini2的kafka的borker.id为2

修改mini3的kafka的borker.id为3

注意:当然也可以改为其他值,只要保证在集群中是唯一的即可。

# 修改每个节点的host.name

在该版本中,如果不设置host.name,则在创建主题及向主题发送消息时发生NOT_LEADER_FOR_PARTITION异常。

分别在mini1、mini2、mini3三台机器上修改server.properties文件,增加host.name=本机IP的配置

编辑/usr/local/hadoop/kafka_2.11-0.10.1.1/config/server.properties文件,增加host.name=192.168.56.121的配置

在文件的第117行,修改如下配置。

例如:

mini1的server.properties完整配置如下

broker.id=1
log.dirs=/usr/local/hadoop/kafka_2.11-0.10.1.1/log-1
advertised.host.name=192.168.56.121
1
2
3

mini2的server.properties完整配置如下

broker.id=2
log.dirs=/usr/local/hadoop/kafka_2.11-0.10.1.1/log-2
advertised.host.name=192.168.56.122
1
2
3

mini3的server.properties完整配置如下

broker.id=3
log.dirs=/usr/local/hadoop/kafka_2.11-0.10.1.1/log-3
advertised.host.name=192.168.56.123
1
2
3

# 启动kafka集群

# 分别启动

分别在mini2和mini3上执行如下命令启动kafka

[root@mini1 config]# cd /usr/local/hadoop/kafka_2.11-0.10.1.1/bin/
[root@mini1 bin]# kafka-server-start.sh -daemon ../config/server.properties
1
2

然后在三台节点上,分别通过jps命令查看是否有"kafka"进程

然后在zookeeper客户端上查看kafka的代理数

[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
[1, 2, 3]
[zk: localhost:2181(CONNECTED) 1]
1
2
3

可以看到,现在kafka的代理数已经是1,2,3了。

# 集群启动脚本

kafka本身没有提供集群启动脚本,不过可以自己实现。

注意:

  1. brokers为你集群的所有hostname

  2. KAFKA_HOME为你的安装路径

kafka-cluster-start.sh

#!/bin/bash

brokers=("mini1 mini2 mini3")
KAFKA_HOME="/bigdata/kafka/"

echo "INFO:Begin to start kafka cluster..."

for broker in ${brokers[@]}
do
	echo "  INFO:Start kafka on ${broker}"
	ssh ${broker} -C "source /etc/profile; sh ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties"
	if [ $? -eq 0 ]; then
		echo " INFO:[${broker}] Start successfully "
	fi
done
echo "INFO:Kafka cluster starts successfully !"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

kafka-cluster-stop.sh

#!/bin/bash

brokers=("mini1 mini2 mini3")
KAFKA_HOME="/bigdata/kafka/"

echo "INFO:Begin to stop kafka cluster..."

for broker in ${brokers[@]}
do
	echo "  INFO:stop kafka on ${broker}"
	ssh ${broker} -C "source /etc/profile; sh ${KAFKA_HOME}/bin/kafka-server-stop.sh "
	if [ $? -eq 0 ]; then
		echo " INFO:[${broker}] stop successfully "
	fi
done
echo "INFO:Kafka cluster stop successfully !"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

至此,kafka的集群模式已经安装成功!

# 常见错误

# No kafka server to stop

在我通过kafka-server-stop.sh脚本关闭kafka时得到如下错误,通过jps能看到kafka进程,但是却stop不掉。

[root@mini1 bin]# jps
1809 SecondaryNameNode
1620 NameNode
3237 Kafka
1960 ResourceManager
3290 Jps
2653 ZooKeeperMain
2238 QuorumPeerMain
[root@mini1 bin]# kafka-server-stop.sh
No kafka server to stop
1
2
3
4
5
6
7
8
9
10

原因是有一些操作系统无法通过该命令找到pid,从而无法kill掉该进程。

kafka-server-stop.sh脚本内容如下,将17行注释,修改为18行内容即可。

 17 #PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
 18 PIDS=$(jps | grep -i 'Kafka' |awk '{print $1}')
1
2

注意:修改完成后,将kafka-server-stop.sh文件拷贝至其他从节点mini2和mini3上

[root@mini1 bin]# scp /usr/local/hadoop/kafka_2.11-0.10.1.1//bin/kafka-server-stop.sh mini2:/usr/local/hadoop/kafka_2.11-0.10.1.1/bin/
kafka-server-stop.sh                                                                                                 100% 1025     1.0KB/s   00:00
[root@mini1 bin]# scp /usr/local/hadoop/kafka_2.11-0.10.1.1//bin/kafka-server-stop.sh mini3:/usr/local/hadoop/kafka_2.11-0.10.1.1/bin/
kafka-server-stop.sh                                                                                                 100% 1025     1.0KB/s   00:00
1
2
3
4

# Unable to connect to zookeeper server within timeout: 6000

无法连接到zookeeper,请尝试查看zookeeper的端口是否可用

[2018-10-29 15:41:02,558] FATAL Fatal error during KafkaServerStartable startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000
	at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1232)
	at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:156)
	at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:130)
	at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:76)
	at kafka.utils.ZkUtils$.apply(ZkUtils.scala:58)
	at kafka.server.KafkaServer.initZk(KafkaServer.scala:327)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:200)
	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:39)
	at kafka.Kafka$.main(Kafka.scala:67)
	at kafka.Kafka.main(Kafka.scala)
1
2
3
4
5
6
7
8
9
10
11
12

CentOS开启防火墙及端口的命令:

查看防火墙状态:service iptables status

开启防火墙(重启后永久生效):chkconfig iptables on

关闭防火墙(重启后永久生效):chkconfig iptables off

开启防火墙(即时生效,重启后失效):service iptables start

关闭防火墙(即时生效,重启后失效):service iptables stop

重启防火墙:service iptables restartd

查看打开的端口:/etc/init.d/iptables status

开启6000端口:iptables -A INPUT -p tcp --dport 6000 -j ACCEPT 

保存并重启防火墙:
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart

打开49152~65534之间的端口
iptables -A INPUT -p tcp --dport 49152:65534 -j ACCEPT  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

警告

例如刚才的错误,先打开6000端口,然后关闭防火墙。分别在其他从节点机器上都关闭防火墙

# Kafka-manager的安装

kafka-manager是雅虎开源的Kafka的web端管理工具

官网地址 (opens new window)

注意:上面下载的是源码,下载后需要按照后面步骤进行编译。如果觉得麻烦,可以直接从下面地址下载编译好的 kafka-manager-1.3.3.7.zip。 链接: http://pan.baidu.com/s/1qY8sGoO (opens new window) 密码: ye7b

# 下载解压

下载编译好的zip文件后,上传至服务器的任意节点。然后解压。例如:我把kafka-manager放到mini1节点上了。

[root@mini1 hadoop]# cd kafka-manager-1.3.3.7/
[root@mini1 kafka-manager-1.3.3.7]# ll
total 40
drwxr-xr-x. 3 root root  4096 Oct 13 19:00 application.home_IS_UNDEFINED
drwxr-xr-x. 2 root root  4096 Oct 13 18:58 bin
drwxr-xr-x. 2 root root  4096 Oct 13 19:00 conf
drwxr-xr-x. 2 root root 12288 Oct 13 18:58 lib
-rw-r--r--. 1 root root  6335 Jun  5  2017 README.md
-rw-r--r--. 1 root root     4 Oct 13 19:03 RUNNING_PID
drwxr-xr-x. 3 root root  4096 Oct 13 18:58 share
[root@mini1 kafka-manager-1.3.3.7]#
1
2
3
4
5
6
7
8
9
10
11

# 配置kafka-manager

编辑/usr/local/hadoop/kafka-manager-1.3.3.7/conf/application.conf文件,修改如下内容。

修改kafka-manager连接的zookeeper地址,文件的第23行。

23 kafka-manager.zkhosts="mini1:2181,mini2:2181,mini3:2181"
1

# 启动kafka-manager

$ cd /usr/local/hadoop/kafka-manager-1.3.3.7
$ bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8090
1
2

注意端口占用的情况,默认使用9000端口。

如果启动不成功,则可以按提示删除如下文件即可。

[root@mini1 kafka-manager-1.3.3.7]# bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8090
This application is already running (Or delete /usr/local/hadoop/kafka-manager-1.3.3.7/RUNNING_PID file).
[root@mini1 kafka-manager-1.3.3.7]# rm -rf /usr/local/hadoop/kafka-manager-1.3.3.7/RUNNING_PID
1
2
3

# 关闭kafka-manager

kafka-manager并没有通过stop.sh脚本来停止kafka-manager,需要手工kill掉ProdServerStart进程

kafka-manager进程:ProdServerStart

然后删除RUNNING_PID文件

[root@mini1 conf]# jps
3664 Kafka
1809 SecondaryNameNode
1620 NameNode
1960 ResourceManager
3736 ZooKeeperMain
4398 Jps
2238 QuorumPeerMain
4287 ProdServerStart
[root@mini1 conf]# kill -9 4287
[root@mini1 conf]# rm -rf /usr/local/hadoop/kafka-manager-1.3.3.7/RUNNING_PID
1
2
3
4
5
6
7
8
9
10
11
  1. ps -ef | grep ProdServerStart
  2. kill -9进程kafka-manager的进程ID
  3. rm -rf /usr/local/hadoop/kafka-manager-1.3.3.7/RUNNING_PID

# 访问kafka-manager

# 附录

# kafka常用命令


# 启动kafka服务,三台主机分别输入此指令:
kafka-server-start.sh $KAFKA_HOME/config/server.properties &

# 以后台的方式启动
nohup kafka-server-start.sh $KAFKA_HOME/config/server.properties &

# 查看topic名
kafka-topics.sh --list --zookeeper hadoop-001:2181

# 创建topic名
kafka-topics.sh --create --zookeeper hadoop-001:2181 --replication-factor 3 --partitions 1 --topic first

# 创建生产者
kafka-console-producer.sh --broker-list hadoop-001:9092,hadoop-002:9092,hadoop-003:9092 --topic first

# 创建消费者
kafka-console-consumer.sh --zookeeper hadoop-001:2181 --from-beginning --topic first

# 1)查看当前服务器中的所有topic
kafka-topics.sh --list --zookeeper hadoop-001:2181

# 2)创建topic
kafka-topics.sh --create --zookeeper hadoop-001:2181 --replication-factor 3 --partitions 1 --topic first

# 选项说明:
# --topic 定义topic名
# --replication-factor  定义副本数
# --partitions  定义分区数

# 3)  删除topic
kafka-topics.sh --delete --zookeeper hadoop-001:2181 --topic first
# 需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

# 4)发送消息
kafka-console-producer.sh --broker-list hadoop-001:9092 --topic first
>hello world

>hadoop  hadoop

# 5)消费消息
kafka-console-consumer.sh --zookeeper hadoop-001:2181 --from-beginning --topic first

# 6)查看某个Topic的详情
kafka-topics.sh --topic first --describe --zookeeper hadoop-002:2181
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

# CentOS防火墙相关命令

# 查看防火墙状态:
service iptables status

# 开启防火墙(重启后永久生效):
chkconfig iptables on

# 关闭防火墙(重启后永久生效):
chkconfig iptables off

# 开启防火墙(即时生效,重启后失效):
service iptables start

# 关闭防火墙(即时生效,重启后失效):
service iptables stop

# 重启防火墙:
service iptables restartd

# 查看打开的端口:
/etc/init.d/iptables status

# 开启6000端口:
iptables -A INPUT -p tcp --dport 6000 -j ACCEPT 

# 保存并重启防火墙:
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart

# 打开49152~65534之间的端口
iptables -A INPUT -p tcp --dport 49152:65534 -j ACCEPT  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#kafka
最后更新时间: 2022/7/23 10:17:11
Kafka

← Kafka

最近更新
01
分区分桶
08-21
02
数据模型(重要)
08-21
03
安装和编译
08-21
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式