binlog+canal+kakfa
# 介绍
本文主要记录了使用canal解析MySQL的binlog日志,然后将数据发送到kafka的过程。
# mysql开启binlog
参考Canal的QuickStart (opens new window)
编辑/etc/my.cnf
文件,增加如下配置
[mysqld]
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
[mysqld_safe]
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 1. 验证是否开启binlog
mysql> SHOW VARIABLES LIKE '%log_bin%';
+---------------------------------+--------------------------------+
| Variable_name | Value |
+---------------------------------+--------------------------------+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/mysql-bin |
| log_bin_index | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+--------------------------------+
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
提示
提示: log_bin等于on表示成功开启了binlog
# 2. 查看master状态
即最后(最新)一个binlog日志的编号名称,及其最后一个操作事件pos结束点(Position)值。
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------------------------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------------------------------------+
| mysql-bin.000020 | 56217764 | | | ad9a34e4-bd71-11e9-b5b2-00163e143f99:1-19076813 |
+------------------+----------+--------------+------------------+-------------------------------------------------+
1 row in set (0.03 sec)
1
2
3
4
5
6
7
2
3
4
5
6
7
# canal配置
参考Canal的QuickStart (opens new window)
# 1. 下载canal
下载canal.deployer-1.1.4.tar.gz
文件
解压到某个目录
# 2. 创建canal用户
下载好canal后,需要在MySQL中创建个canal用户,然后对canal用户授权(主要是slave
权限)
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
1
2
3
4
2
3
4
# 3. canal的基本配置
canal是多实例的特性,所以一个canal是可以监控多个MySQL服务的,每个MySQL服务代表一个instance
多实例配置,请参考
canal多instance配置方式 (opens new window)
# vi conf/example/instance.properties
编辑canal.deployer-1.1.4.tar.gz
中的vi conf/example/instance.properties
文件
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex【配置要监控的数据库】
# 常见例子:
# 1. 所有表:.* or .*\\..*
# 2. canal schema下所有表: canal\\..*
# 3. canal下的以canal打头的表:canal\\.canal.*
# 4. canal schema下的一张表:canal.test1
# 5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
# 注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤) .*\\..*
canal.instance.filter.regex = .\*\\\\..\*
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 4. canal的kafka配置
参考Canal Kafka/RocketMQ QuickStart (opens new window)
修改instance
配置文件vi conf/example/instance.properties
# 配置kafka的topic
canal.mq.topic=example
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
修改canal
配置文件vi /usr/local/canal/conf/canal.properties
# 1、设置为kafka
canal.serverMode = kafka
# 2、kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 10.15.1.17:9092,10.15.1.18:9092,10.15.1.19:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 5. 启动canal
启动
sh bin/startup.sh
1
查看 server 日志
vi logs/canal/canal.log</pre>
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
1
2
3
4
2
3
4
查看 instance 的日志
vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
1
2
3
4
5
2
3
4
5
关闭
sh bin/stop.sh
1
# 验证
# 1. 在MySQL中修改数据
# 2. 在kafka中消费数据
启动kafka的consumer
kafka-console-consumer --zookeeper 10.15.1.23:2181,10.15.1.18:2181,10.15.1.19:2181 --from-beginning --topic petmedical
1
在控制台查看数据
格式化json数据
最后更新时间: 2022/7/30 16:50:59