浅墨散人 浅墨散人
  • 基础
  • 设计模式
  • JVM
  • Maven
  • SpringBoot
  • 基础
  • Flask
  • Diango
  • Pandas
  • SqlAlchemy
  • Sqoop
  • Flume
  • Flink
  • Hadoop
  • Hbase
  • Hive
  • Kafka
  • Kylin
  • Zookeeper
  • Tez
  • MySQL
  • Doris
  • Chrome
  • Eclipse
  • IDEA
  • iTerm2
  • Markdown
  • SublimeText
  • VirtualBox
  • WebStrom
  • Linux
  • Mac
  • Hexo
  • Git
  • Vue
  • VuePress
  • 区块链
  • 金融
数据仓库
数据治理
读书笔记
关于我
GitHub (opens new window)
  • 基础
  • 设计模式
  • JVM
  • Maven
  • SpringBoot
  • 基础
  • Flask
  • Diango
  • Pandas
  • SqlAlchemy
  • Sqoop
  • Flume
  • Flink
  • Hadoop
  • Hbase
  • Hive
  • Kafka
  • Kylin
  • Zookeeper
  • Tez
  • MySQL
  • Doris
  • Chrome
  • Eclipse
  • IDEA
  • iTerm2
  • Markdown
  • SublimeText
  • VirtualBox
  • WebStrom
  • Linux
  • Mac
  • Hexo
  • Git
  • Vue
  • VuePress
  • 区块链
  • 金融
数据仓库
数据治理
读书笔记
关于我
GitHub (opens new window)
  • MySQL

    • README
    • Centos6.10安装mysql5.6
    • MySQL的binlog日志
    • binlog+canal+kakfa
      • 介绍
      • mysql开启binlog
        • 1. 验证是否开启binlog
        • 2. 查看master状态
      • canal配置
        • 1. 下载canal
        • 2. 创建canal用户
        • 3. canal的基本配置
        • 4. canal的kafka配置
        • 5. 启动canal
      • 验证
        • 1. 在MySQL中修改数据
        • 2. 在kafka中消费数据
    • MySQL执行计划
  • DataBase
  • MySQL
2019-09-26
目录

binlog+canal+kakfa

# 介绍

本文主要记录了使用canal解析MySQL的binlog日志,然后将数据发送到kafka的过程。

canal官网 (opens new window)

# mysql开启binlog

参考Canal的QuickStart (opens new window)

编辑/etc/my.cnf文件,增加如下配置

[mysqld]
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock

log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

[mysqld_safe]
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
1
2
3
4
5
6
7
8
9
10
11

# 1. 验证是否开启binlog

mysql> SHOW VARIABLES LIKE '%log_bin%';
+---------------------------------+--------------------------------+
| Variable_name                   | Value                          |
+---------------------------------+--------------------------------+
| log_bin                         | ON                             |
| log_bin_basename                | /var/lib/mysql/mysql-bin       |
| log_bin_index                   | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF                            |
| log_bin_use_v1_row_events       | OFF                            |
| sql_log_bin                     | ON                             |
+---------------------------------+--------------------------------+
1
2
3
4
5
6
7
8
9
10
11

提示

提示: log_bin等于on表示成功开启了binlog

# 2. 查看master状态

即最后(最新)一个binlog日志的编号名称,及其最后一个操作事件pos结束点(Position)值。

mysql> show master status;
+------------------+----------+--------------+------------------+-------------------------------------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set                               |
+------------------+----------+--------------+------------------+-------------------------------------------------+
| mysql-bin.000020 | 56217764 |              |                  | ad9a34e4-bd71-11e9-b5b2-00163e143f99:1-19076813 |
+------------------+----------+--------------+------------------+-------------------------------------------------+
1 row in set (0.03 sec)
1
2
3
4
5
6
7

# canal配置

参考Canal的QuickStart (opens new window)

# 1. 下载canal

canal下载地址 (opens new window)

下载canal.deployer-1.1.4.tar.gz文件

解压到某个目录

# 2. 创建canal用户

下载好canal后,需要在MySQL中创建个canal用户,然后对canal用户授权(主要是slave权限)

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
1
2
3
4

# 3. canal的基本配置

canal是多实例的特性,所以一个canal是可以监控多个MySQL服务的,每个MySQL服务代表一个instance 多实例配置,请参考 canal多instance配置方式 (opens new window)

# vi conf/example/instance.properties

编辑canal.deployer-1.1.4.tar.gz中的vi conf/example/instance.properties文件

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8

#table regex【配置要监控的数据库】
# 常见例子:
# 1. 所有表:.* or .*\\..*
# 2. canal schema下所有表: canal\\..*
# 3. canal下的以canal打头的表:canal\\.canal.*
# 4. canal schema下的一张表:canal.test1
# 5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
# 注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)  .*\\..*
canal.instance.filter.regex = .\*\\\\..\*
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 4. canal的kafka配置

参考Canal Kafka/RocketMQ QuickStart (opens new window)

修改instance配置文件vi conf/example/instance.properties


# 配置kafka的topic
canal.mq.topic=example
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
1
2
3
4
5
6
7
8
9
10
11

修改canal 配置文件vi /usr/local/canal/conf/canal.properties

# 1、设置为kafka
canal.serverMode = kafka

# 2、kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 10.15.1.17:9092,10.15.1.18:9092,10.15.1.19:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 5. 启动canal

启动

sh bin/startup.sh
1

查看 server 日志

vi logs/canal/canal.log</pre>
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
1
2
3
4

查看 instance 的日志

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
1
2
3
4
5

关闭

sh bin/stop.sh
1

# 验证

# 1. 在MySQL中修改数据

# 2. 在kafka中消费数据

启动kafka的consumer

kafka-console-consumer --zookeeper 10.15.1.23:2181,10.15.1.18:2181,10.15.1.19:2181 --from-beginning --topic petmedical
1

在控制台查看数据

格式化json数据

#MySQL#binlog#canal
最后更新时间: 2022/7/30 16:50:59
MySQL的binlog日志
MySQL执行计划

← MySQL的binlog日志 MySQL执行计划→

最近更新
01
分区分桶
08-21
02
数据模型(重要)
08-21
03
安装和编译
08-21
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式