尚硅谷2021最新Java版Flink
下面笔记来源(尚硅谷公开资料、网络博客、个人小结)
中间会把自己认为较重要的点做做标记(下划线、加粗等)
Flink的特点
Flink vs Spark Streaming
- 数据模型
- Spark采用RDD模型,spark streaming的DStream实际上也就是一组组小批数据RDD的集合
- flink基本数据模型是数据流,以及事件(Event)序列
- 运行时架构
- spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
- flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点处理
快速上手
批处理实现WordCount
flink-streaming-java_2.12:1.12.1 => org.apache.flink:flink-runtime_2.12:1.12.1 => com.typesafe.akka:akka-actor_2.12:2.5.21,akka就是用scala实现的。即使这里我们用java语言,还是用到了scala实现的包
pom依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.caicai</groupId> <artifactId>Flink</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.12.0</flink.version> <scala.binary.version>2.14</scala.binary.version> </properties>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
</project>
|
准备工作
首先准备一个文件,存放一些简单的数据,以便后续Flink计算分析。在resources
目录下新建一个hello.txt
文件,并存入一些数据
1 2 3 4 5 6 7 8
| hello java hello flink hello scala hello spark hello storm how are you fine thank you and you
|
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package com.caicai;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector;
public class WordCount { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String inputPath = "E:\\java\\WorkSpace\\Flink\\src\\main\\resources\\hello.txt"; DataSet<String> inputDataSet = env.readTextFile(inputPath);
DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper()) .groupBy(0) .sum(1); resultSet.print(); }
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = s.split(" "); for (String str : words) { out.collect(new Tuple2<>(str, 1)); } } } }
|
输出
1 2 3 4 5 6 7 8 9 10 11 12
| (thank,1) (spark,1) (and,1) (java,1) (storm,1) (flink,1) (fine,1) (you,3) (scala,1) (are,1) (how,1) (hello,5)
|
解决 Flink 升级1.12 报错 No ExecutorFactory found to execute the application
流处理实现
在2.1批处理的基础上,新建一个类进行改动。
- 批处理=>几组或所有数据到达后才处理;流处理=>有数据来就直接处理,不等数据堆叠到一定数量级
- 这里不像批处理有groupBy => 所有数据统一处理,而是用流处理的keyBy => 每一个数据都对key进行hash计算,进行类似分区的操作,来一个数据就处理一次,所有中间过程都有输出!
- 并行度:开发环境的并行度默认就是计算机的CPU逻辑核数
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package com.caicai;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;
public class StreamWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String inputPath = "E:\\java\\WorkSpace\\Flink\\src\\main\\resources\\hello.txt"; DataStream<String> inputDataStream = env.readTextFile(inputPath);
DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new MyFlatMapper()) .keyBy(item -> item.f0) .sum(1); resultStream.print();
env.execute(); }
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = s.split(" "); for (String str : words) { out.collect(new Tuple2<>(str, 1)); } } } }
|
输出:
这里因为是流处理,所以所有中间过程都会被输出,前面的序号就是并行执行任务的线程编号。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| 9> (how,1) 1> (scala,1) 6> (storm,1) 6> (are,1) 4> (hello,1) 4> (hello,2) 2> (java,1) 4> (hello,3) 1> (spark,1) 7> (you,1) 10> (flink,1) 4> (hello,4) 4> (hello,5) 7> (fine,1) 7> (you,2) 5> (thank,1) 11> (and,1) 7> (you,3)
|
流式数据源测试
- 通过
nc -lk <port>
打开一个socket服务,用于模拟实时的流数据
- 代码修改inputStream的部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package com.caicai;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;
public class StreamWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> inputDataStream = env.socketTextStream("192.168.200.130", 7777);
DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new MyFlatMapper()) .keyBy(item -> item.f0) .sum(1); resultStream.print();
env.execute(); }
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = s.split(" "); for (String str : words) { out.collect(new Tuple2<>(str, 1)); } } } }
|
在本地开启的socket中输入数据,观察IDEA的console输出。
本人测试后发现,同一个字符串,前面输出的编号是一样的,因为key => hashcode,同一个key的hash值固定,分配给相对应的线程处理。
优化修改
上面的代码,我们是把host
和port
写死在代码中的,这样其实不太好,我们可以设置在参数(args
)中,借助parameter tool
工具提取这些配置项
1 2 3 4 5 6 7 8
| import org.apache.flink.api.java.utils.ParameterTool;
ParameterTool parameterTool = ParameterTool.fromArgs(args); String host = parameterTool.get("host"); int port = parameterTool.getInt("port");
DataStream<String> inputDataStream = env.socketTextStream(host, port);
|
将配置项设置在args
参数中
输入
1
| --host 192.168.200.130 --port 7777
|
然后点击右下角的apply
,再次运行程序就可以了
Flink部署
Standalone模式
Flink任务调度原理之TaskManager 与Slots <= 下面内容出自该博文
- Flink 中每一个 TaskManager 都是一个JVM进程,它可能会在独立的线程上执行一个或多个 subtask
- 为了控制一个 TaskManager 能接收多少个 task, TaskManager 通过 task slot 来进行控制(一个 TaskManager 至少有一个 slot)
- 每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot(注:这里不会涉及CPU的隔离,slot仅仅用来隔离task的受管理内存)
- 可以通过调整task slot的数量去自定义subtask之间的隔离方式。如一个TaskManager一个slot时,那么每个task group运行在独立的JVM中。而当一个TaskManager多个slot时,多个subtask可以共同享有一个JVM,而在同一个JVM进程中的task将共享TCP连接和心跳消息,也可能共享数据集和数据结构,从而减少每个task的负载。
- 默认情况下,Flink 允许子任务共享 slot,即使它们是不同任务的子任务(前提是它们来自同一个job)。 这样的结果是,一个 slot 可以保存作业的整个管道。
- Task Slot 是静态的概念,是指 TaskManager 具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置;而并行度parallelism是动态概念,即TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置。 举例:如果总共有3个TaskManager,每一个TaskManager中分配了3个TaskSlot,也就是每个TaskManager可以接收3个task,这样我们总共可以接收9个TaskSot。但是如果我们设置parallelism.default=1,那么当程序运行时9个TaskSlot将只有1个运行,8个都会处于空闲状态,所以要学会合理设置并行度!具体图解如下:
conf/flink-conf.yaml
配置文件中
taskmanager.numberOfTaskSlots
parallelism.default
1 2 3 4 5 6 7
|
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
|
注:Flink存储State用的是堆外内存,所以web UI里JVM Heap Size
和Flink Managed MEM
是两个分开的值。