Flink On Standalone任务提交
Flink On Standalone 即Flink任务运行在Standalone集群中,Standlone集群部署时采用Session模式来构建集群,即:首先构建一个Flink集群,Flink集群资源就固定了,所有提交到该集群的Flink作业都运行在这一个集群中,如果集群中提交的任务多资源不够时,需要手动增加节点,所以Flink 基于Standalone运行任务一般用在开发测试或者企业实时业务较少的场景下。
Flink On Standalone 任务提交支持Session会话模式和Application应用模式,不支持Per-Job单作业模式。下面介绍基于Standalone 的Session会话模式和Application应用模式任务提交命令和原理,演示两类任务提交模式的代码还是以上一章节中读取Socket 数据进行实时WordCount统计代码为例,代码如下:
【资料图】
package com.lanson.flinkjava.code.chapter4;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * 读取Socket数据进行实时WordCount统计 */public class SocketWordCount { public static void main(String[] args) throws Exception { //1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.读取Socket数据 DataStreamSource ds = env.socketTextStream("node3", 9999); //3.准备K,V格式数据 SingleOutputStreamOperator> tupleDS = ds.flatMap((String line, Collector> out) -> { String[] words = line.split(","); for (String word : words) { out.collect(Tuple2.of(word, 1)); } }).returns(Types.TUPLE(Types.STRING, Types.INT)); //4.聚合打印结果 tupleDS.keyBy(tp -> tp.f0).sum(1).print(); //5.execute触发执行 env.execute(); }}
将以上代码进行打包,名称为"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar",并在node3节点上启动socket服务(nc -lk 9999)。
一、Standalone Session模式
1、任务提交命令
在Standalone集群搭建完成后,基于Standalone集群提交Flink任务方式就是使用的Session模式,提交任务之前首先启动Standalone集群($FLINK_HOME/bin/start-cluster.sh),然后再提交任务,Standalone Session模式提交任务命令如下:
[root@node1 ~]# cd /software/flink-1.16.0/bin/[root@node1 bin]# ./flink run -m node1:8081 -d -c com.lanson.flinkjava.code.chapter4.SocketWordCount /root/flink-jar-test/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
以上提交任务的参数解释如下:
参数 | 解释 |
---|---|
-m | --jobmanager,指定提交任务连接的JobManager地址。 |
-c | --class,指定运行的class主类。 |
-d | --detached,任务提交后在后台独立运行,退出客户端,也可不指定。 |
-p | --parallelism,执行程序的并行度。 |
以上任务提交完成后,我们可以登录Flink WebUI(https://node1:8081)查看启动一个任务:
再次按照以上命令提交Flink任务可以看到集群中会有2个任务,说明Standalone Session模式下提交的所有Flink任务共享集群资源,如下:
以上提交Flink流任务的名称默认为"Flink Streaming Job",也可以通过参数"pipeline.name"来自定义指定Job 名称,提交命令如下:
./flink run -m node1:8081 -d -Dpipeline.name=socket-wc1 -c com.lanson.flinkjava.code.chapter4.SocketWordCount /root/flink-jar-test/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
提交之后,可以看到页面中有三个任务,最后一个任务提交的名称改成了自定义任务名称。
2、任务提交流程
Standalone Session模式提交任务中首先需要创建Flink集群,集群创建启动的同时Dispatcher、JobMaster、ResourceManager对象一并创建、TaskManager也一并启动,TaskManager会向集群ResourceManager汇报Slot信息,Flink集群资源也就确定了。Standalone Session模式提交任务流程如下:
在客户端提交Flink任务,客户端会将任务转换成JobGraph提交给JobManager。Dispatcher将提交任务提交给JobMaster。JobMaster向ResourceManager申请Slot资源。ResourceManager会在对应的TaskManager上划分Slot资源。TaskManager向JobMaster offer Slot资源。JobMaster将任务对应的task发送到TaskManager上执行。二、Standalone Application模式
1、任务提交命令
Standalone Application模式中不会预先创建Flink集群,在提交Flink 任务的同时会创建JobManager,启动Flink集群,然后需要手动启动TaskManager连接该Flink集群,启动的TaskManager会根据$FLINK_HOME/conf/flink-conf.yaml配置文件中的"jobmanager.rpc.address"配置找JobManager,所以这里选择在node1节点上提交任务并启动JobManager,方便后续其他节点启动TaskManager后连接该节点。Standalone Appliction模式提交任务步骤和命令如下:
1.1、准备Flink jar包
在node1节点上将Flink 打好的"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar"jar包放在 $FLINK_HOME/lib目录下。
1.2、提交任务,在node1 节点上启动 JobManager
cd /software/flink-1.16.0/bin/
#执行如下命令,启动JobManager ./standalone-job.sh start --job-classname com.lanson.flinkjava.code.chapter4.SocketWordCount
执行以上命令后会自动从$FLINK_HOME/lib中扫描所有jar包,执行指定的入口类。命令执行后可以访问对应的Flink WebUI:https://node1:8081,可以看到提交的任务,但是由于还没有执行TaskManager任务无法执行。
1.3、启动TaskManager
在node1、node2、node3任意一台节点上启动taskManager,根据$FLINK_HOME/conf/flink-conf.yaml配置文件中"jobmanager.rpc.address"配置项会找到对应node1 JobManager。
#在node1节点上启动TaskManager[root@node1 ~]# cd /software/flink-1.16.0/bin/[root@node1 bin]# ./taskmanager.sh start#在node2节点上启动TaskManager[root@node2 ~]# cd /software/flink-1.16.0/bin/[root@node2 bin]# ./taskmanager.sh start
启动两个TaskManager后可以看到Flink WebUI中对应的有2个TaskManager,可以根据自己任务使用资源的情况,手动启动多个TaskManager。
1.4、停止集群
#停止启动的JobManager[root@node1 bin]# ./standalone-job.sh stop#停止启动的TaskManager[root@node1 bin]# ./taskmanager.sh stop[root@node2 bin]# ./taskmanager.sh stop
我们可以以同样的方式在其他节点上以Standalone Application模式提交先的Flink任务,但是每次提交都是当前提交任务独享集群资源。
2、任务提交流程
Standalone Application模式提交任务中提交任务的同时会启动JobManager创建Flink集群,但是需要手动启动TaskManager,这样提交的任务才能正常运行,如果提交的任务使用资源多,还可以启动多个TaskManager。Standalone Application模式提交任务流程如下:
在客户端提交Flink任务的同时启动JobManager,客户端会将任务转换成JobGraph提交给JobManager。Dispatcher会启动JobMaster,Dispatcher将提交任务提交给JobMaster。JobMaster向ResourceManager申请Slot资源。手动启动TaskManager,TaskManager会向ResourceManager注册Slot资源ResourceManager会在对应的TaskManager上划分Slot资源。TaskManager向JobMaster offer Slot资源。JobMaster将任务对应的task发送到TaskManager上执行。Standalone Application模式任务提交流程和Standalone Session模式类似,两者区别主要是Standalone Session模式中启动Flink集群时JobManager、TaskManager、JobMaster会预先启动;Standalone Application模式中提交任务时同时启动集群JobManager、JobMaster,需要手动启动TaskManager。