浅墨散人 浅墨散人
  • 基础
  • 设计模式
  • JVM
  • Maven
  • SpringBoot
  • 基础
  • Flask
  • Diango
  • Pandas
  • SqlAlchemy
  • Sqoop
  • Flume
  • Flink
  • Hadoop
  • Hbase
  • Hive
  • Kafka
  • Kylin
  • Zookeeper
  • Tez
  • MySQL
  • Doris
  • Chrome
  • Eclipse
  • IDEA
  • iTerm2
  • Markdown
  • SublimeText
  • VirtualBox
  • WebStrom
  • Linux
  • Mac
  • Hexo
  • Git
  • Vue
  • VuePress
  • 区块链
  • 金融
数据仓库
数据治理
读书笔记
关于我
GitHub (opens new window)
  • 基础
  • 设计模式
  • JVM
  • Maven
  • SpringBoot
  • 基础
  • Flask
  • Diango
  • Pandas
  • SqlAlchemy
  • Sqoop
  • Flume
  • Flink
  • Hadoop
  • Hbase
  • Hive
  • Kafka
  • Kylin
  • Zookeeper
  • Tez
  • MySQL
  • Doris
  • Chrome
  • Eclipse
  • IDEA
  • iTerm2
  • Markdown
  • SublimeText
  • VirtualBox
  • WebStrom
  • Linux
  • Mac
  • Hexo
  • Git
  • Vue
  • VuePress
  • 区块链
  • 金融
数据仓库
数据治理
读书笔记
关于我
GitHub (opens new window)
  • 数据仓库

    • 数据仓库
    • 数据仓库整体架构图及介绍
    • DW数据仓库与ODS的区别
    • 数仓开发规范
    • 如何保证数据质量问题?
    • 如何减少\导数\需求?
    • Canal+Kafka+Hbase+Hive集成
      • 前言
      • 依赖的环境
        • 安装MySQL
        • 安装Hadoop集群
        • 安装Kakfa集群
        • 安装Hbase集群
        • 安装Hive
      • 安装及配置
        • canal
        • 启动canal
        • kafka消费
        • 消费kafka数据然后写入到hbase中
        • 建立Hive外部表
        • 将Hbase数据库的数据映射到Hive中
    • 数据质量管理
  • DatawareHouse
2019-09-27
目录

Canal+Kafka+Hbase+Hive集成

# 前言

本文主要记录了将MySQL中的数据实时抽取到Hive中的过程。

# 依赖的环境

# 安装MySQL

Centos6.10安装mysql5.6 (opens new window)

# 安装Hadoop集群

Hadoop1.1.2伪分布式安装笔记 (opens new window)

# 安装Kakfa集群

kafka集群安装 (opens new window)

# 安装Hbase集群

Hbase介绍及安装 (opens new window)

# 安装Hive

Hive介绍及安装 (opens new window)

# 安装及配置

# canal

canal官网 (opens new window)

可参考canal的quick-start

以下是我配置的一些信息

#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

## 【【zookeeper配置】】
canal.zkServers = 192.168.56.111:2181,192.168.56.112:2181,192.168.56.113:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
# 【【模式改为kafka】】
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
######### 		destinations		#############
#################################################
#【【kafka的topic配置】】
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 		     MQ 		     #############
##################################################

#【【kafka配置】】
canal.mq.servers = 192.168.56.111:9092,192.168.56.112:9092,192.168.56.113:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =

##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144

# 启动canal

在/canal.deployer-1.1.4/bin目录下,执行startup.sh启动即可

# kafka消费

为了方便,这里直接使用在控制台的方式来消费kafka数据的方式

kafka-console-consumer.sh --zookeeper mini1:2181 --from-beginning --topic example
1

为了方便,设置了从头开始消费,可以从控制台看到kafka消费的内容

img

# 消费kafka数据然后写入到hbase中

写程序,将kafka数据解析成表字段形式,插入到hbase中

# 建立Hive外部表

建立Hive的外部表,从Hbase中映射数据。注意列族的配置

create external table pet_medical.hbase_hive_zfang(
    key string
   ,user_name string
   ,user_age int
   ,user_addr string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties ("hbase.columns.mapping"=":key,c1:user_name,c1:user_age,c1:user_addr")
tblproperties("hbase.table.name"="test_zfang");
1
2
3
4
5
6
7
8
9

# 将Hbase数据库的数据映射到Hive中

写程序自动完成。。

disable 'test_zfang'
create 'test_zfang','c1'
put 'test_zfang', 'row1', 'c1:user_name', 'zhangsan'
put 'test_zfang', 'row1', 'c1:user_age', '29'
put 'test_zfang', 'row1', 'c1:user_addr', '北京市'
1
2
3
4
5
#DataWareHouse#Canal#Kafka#Hbase#Hive
最后更新时间: 2022/7/23 10:17:11
如何减少\导数\需求?
数据质量管理

← 如何减少\导数\需求? 数据质量管理→

最近更新
01
分区分桶
08-21
02
数据模型(重要)
08-21
03
安装和编译
08-21
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式