浅墨散人 浅墨散人
  • 基础
  • 设计模式
  • JVM
  • Maven
  • SpringBoot
  • 基础
  • Flask
  • Diango
  • Pandas
  • SqlAlchemy
  • Sqoop
  • Flume
  • Flink
  • Hadoop
  • Hbase
  • Hive
  • Kafka
  • Kylin
  • Zookeeper
  • Tez
  • MySQL
  • Doris
  • Chrome
  • Eclipse
  • IDEA
  • iTerm2
  • Markdown
  • SublimeText
  • VirtualBox
  • WebStrom
  • Linux
  • Mac
  • Hexo
  • Git
  • Vue
  • VuePress
  • 区块链
  • 金融
数据仓库
数据治理
读书笔记
关于我
GitHub (opens new window)
  • 基础
  • 设计模式
  • JVM
  • Maven
  • SpringBoot
  • 基础
  • Flask
  • Diango
  • Pandas
  • SqlAlchemy
  • Sqoop
  • Flume
  • Flink
  • Hadoop
  • Hbase
  • Hive
  • Kafka
  • Kylin
  • Zookeeper
  • Tez
  • MySQL
  • Doris
  • Chrome
  • Eclipse
  • IDEA
  • iTerm2
  • Markdown
  • SublimeText
  • VirtualBox
  • WebStrom
  • Linux
  • Mac
  • Hexo
  • Git
  • Vue
  • VuePress
  • 区块链
  • 金融
数据仓库
数据治理
读书笔记
关于我
GitHub (opens new window)
  • Flink

    • 目录
    • 核心概念
    • 快速开始
      • WordCount
        • Flink基本maven项目构建(java版)
        • 批处理WordCount
        • 流处理WordCount
        • 批处理和流处理的区别
    • 部署方式
    • 运行时架构
    • API
    • Flink SQL
    • Flink 相关配置
    • 项目实战
  • BigData
  • Flink
2021-10-05
目录

快速开始

  • WordCount
    • Flink基本maven项目构建(java版)
    • 批处理WordCount
    • 流处理WordCount
    • 批处理和流处理的区别

开发Flink程序有固定的流程。

  1. 获得一个执行环境。
  2. 加载/创建初始化数据。
  3. 指定操作数据的Transaction算子。
  4. 指定计算好的数据的存放位置。
  5. 调用execute()触发执行程序。

# WordCount

# Flink基本maven项目构建(java版)

  1. IDEA中新建Maven项目
  2. 添加pom.xml依赖
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.13.2</flink.version>
</properties>

<dependencies>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>

</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  1. 准备WordCout源数据 在IDEA的resource文件夹下新建一个wordcount.txt文件
hello word
hello java
word count
hello python
python you
1
2
3
4
5

# 批处理WordCount


package tech.fz.batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * 批处理的Wordcount
 */
public class WordCount {
    public static void main(String[] args) throws Exception {
        // 1、创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2、创建数据源
        String wc_txt_file = "/Users/fangzheng/IdeaProjects/flink-study/src/main/resources/wordcount.txt";
        DataSet<String> inputDataSource = env.readTextFile(wc_txt_file);

        // 3、对数据集进行WordCount处理
        AggregateOperator<Tuple2<String, Integer>> resultSet = inputDataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> result) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    result.collect(new Tuple2<>(word, 1));
                }
            }
        })
                .groupBy(0) // 0代表位置(目前是word),代表按照当前的word分组
                .sum(1);    // 1代表位置(目前是每个分割后的数字),代表按照当前的数量求和

        resultSet.print(); // 打印结果

    }
    
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# 执行结果

# 流处理WordCount

package tech.fz.stream;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 流的Wordcount
 */
public class WordCount {
    public static void main(String[] args) throws Exception {
        // 1、创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、创建数据源
        String wc_txt_file = "/Users/fangzheng/IdeaProjects/flink-study/src/main/resources/wordcount.txt";
        DataStream<String> inputDataSource = env.readTextFile(wc_txt_file);

        // 3、对数据集进行WordCount处理
        DataStream<Tuple2<String, Integer>> inputWord = inputDataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> result) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    result.collect(new Tuple2<>(word, 1));
                }
            }
        });

        DataStream<Tuple2<String, Integer>> result = inputWord.keyBy(0).sum(1);

        result.print();

        // 4、启动流事件触发:注意这里比批处理多一个执行的过程
        env.execute();

    }
    
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

# 批处理和流处理的区别

#Flink
最后更新时间: 2022/7/23 10:17:11
核心概念
部署方式

← 核心概念 部署方式→

最近更新
01
分区分桶
08-21
02
数据模型(重要)
08-21
03
安装和编译
08-21
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式