失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > [官方Flink入门笔记 ] 三 开发环境搭建和应用的配置 部署及运行

[官方Flink入门笔记 ] 三 开发环境搭建和应用的配置 部署及运行

时间:2023-03-17 21:20:17

相关推荐

[官方Flink入门笔记 ]  三 开发环境搭建和应用的配置 部署及运行

一、Flink 开发环境部署和配置

Flink 是一个以 Java 及 Scala 作为开发语言的开源大数据项目,代码开源在 GitHub 上,并使用 Maven 来编译和构建项目。对于大部分使用 Flink 的同学来说,Java、Maven 和 Git 这三个工具是必不可少的,另外一个强大的 IDE 有助于我们更快的阅读代码、开发新功能以及修复 Bug。

1.1. 环境准备&代码下载.

我使用的是mac系统.

前置环境下载flink代码

/downloads.html

我下载的是最新的代码

/dyn/closer.lua/flink/flink-1.12.1/flink-1.12.1-src.tgz

1.2. 编译

编译指令

在我们配置好之前的几个工具后,编译 Flink 就非常简单了,执行如下命令即可:

mvn clean install -DskipTests

或者

mvn clean package -DskipTests

如果你需要使用指定 hadoop 的版本,可以通过指定“-Dhadoop.version”来设置,编译命令如下:

mvn clean package -DskipTests -Dhadoop.version=3.2.1

其他参数:

常用编译参数:-Dfast 主要是忽略QA plugins和JavaDocs的编译-Dhadoop.version=2.6.1 指定hadoop版本--settings=${maven_file_path} 显式指定maven settings.xml配置文件

编译成功之后的信息如下:

[INFO] ------------------------------------------------------------------------[INFO] Reactor Summary for Flink : 1.12.1:[INFO][INFO] Flink : Tools : Force Shading ...................... SUCCESS [ 5.278 s][INFO] Flink : ............................................ SUCCESS [16:05 min][INFO] Flink : Annotations ................................ SUCCESS [05:03 min][INFO] Flink : Test utils : ............................... SUCCESS [ 0.216 s][INFO] Flink : Test utils : Junit ......................... SUCCESS [ 2.183 s][INFO] Flink : Metrics : .................................. SUCCESS [ 0.218 s][INFO] Flink : Metrics : Core ............................. SUCCESS [ 7.757 s][INFO] Flink : Core ....................................... SUCCESS [ 57.533 s][INFO] Flink : Java ....................................... SUCCESS [ 15.587 s][INFO] Flink : Queryable state : .......................... SUCCESS [ 0.229 s][INFO] Flink : Queryable state : Client Java .............. SUCCESS [ 6.333 s][INFO] Flink : FileSystems : .............................. SUCCESS [ 0.214 s][INFO] Flink : FileSystems : Hadoop FS .................... SUCCESS [ 10.626 s][INFO] Flink : Runtime .................................... SUCCESS [07:20 min][INFO] Flink : Scala ...................................... SUCCESS [ 55.450 s][INFO] Flink : FileSystems : Mapr FS ...................... SUCCESS [ 1.242 s][INFO] Flink : FileSystems : Hadoop FS shaded ............. SUCCESS [ 5.298 s][INFO] Flink : FileSystems : S3 FS Base ................... SUCCESS [ 5.545 s][INFO] Flink : FileSystems : S3 FS Hadoop ................. SUCCESS [ 12.824 s][INFO] Flink : FileSystems : S3 FS Presto ................. SUCCESS [ 8.112 s][INFO] Flink : FileSystems : Swift FS Hadoop .............. SUCCESS [ 20.853 s][INFO] Flink : FileSystems : OSS FS ....................... SUCCESS [ 5.931 s][INFO] Flink : FileSystems : Azure FS Hadoop .............. SUCCESS [01:13 min][INFO] Flink : Optimizer .................................. SUCCESS [ 7.321 s][INFO] Flink : Connectors : ............................... SUCCESS [ 0.262 s][INFO] Flink : Connectors : File Sink Common .............. SUCCESS [ 0.780 s][INFO] Flink : Streaming Java ............................. SUCCESS [ 26.660 s][INFO] Flink : Clients .................................... SUCCESS [ 4.578 s][INFO] Flink : Test utils : Utils ......................... SUCCESS [ 1.567 s][INFO] Flink : Runtime web ................................ SUCCESS [04:55 min][INFO] Flink : Examples : ................................. SUCCESS [ 0.241 s][INFO] Flink : Examples : Batch ........................... SUCCESS [ 16.848 s][INFO] Flink : Connectors : Hadoop compatibility .......... SUCCESS [ 6.698 s][INFO] Flink : State backends : ........................... SUCCESS [ 0.163 s][INFO] Flink : State backends : RocksDB ................... SUCCESS [ 30.661 s][INFO] Flink : Tests ...................................... SUCCESS [01:06 min][INFO] Flink : Streaming Scala ............................ SUCCESS [ 40.235 s][INFO] Flink : Connectors : HCatalog ...................... SUCCESS [ 9.601 s][INFO] Flink : Test utils : Connectors .................... SUCCESS [ 0.562 s][INFO] Flink : Connectors : Base .......................... SUCCESS [ 1.132 s][INFO] Flink : Connectors : Files ......................... SUCCESS [ 2.113 s][INFO] Flink : Table : .................................... SUCCESS [ 0.576 s][INFO] Flink : Table : Common ............................. SUCCESS [ 22.240 s][INFO] Flink : Table : API Java ........................... SUCCESS [ 4.569 s][INFO] Flink : Table : API Java bridge .................... SUCCESS [ 1.590 s][INFO] Flink : Table : API Scala .......................... SUCCESS [ 14.828 s][INFO] Flink : Table : API Scala bridge ................... SUCCESS [ 12.302 s][INFO] Flink : Table : SQL Parser ......................... SUCCESS [01:49 min][INFO] Flink : Libraries : ................................ SUCCESS [ 0.246 s][INFO] Flink : Libraries : CEP ............................ SUCCESS [ 6.107 s][INFO] Flink : Table : Planner ............................ SUCCESS [03:06 min][INFO] Flink : Table : SQL Parser Hive .................... SUCCESS [ 4.840 s][INFO] Flink : Table : Runtime Blink ...................... SUCCESS [ 12.232 s][INFO] Flink : Table : Planner Blink ...................... SUCCESS [03:58 min][INFO] Flink : Formats : .................................. SUCCESS [ 0.156 s][INFO] Flink : Formats : Json ............................. SUCCESS [ 1.644 s][INFO] Flink : Connectors : Elasticsearch base ............ SUCCESS [ 10.399 s][INFO] Flink : Connectors : Elasticsearch 5 ............... SUCCESS [ 17.862 s][INFO] Flink : Connectors : Elasticsearch 6 ............... SUCCESS [ 8.984 s][INFO] Flink : Connectors : Elasticsearch 7 ............... SUCCESS [ 15.580 s][INFO] Flink : Connectors : HBase base .................... SUCCESS [ 9.920 s][INFO] Flink : Connectors : HBase 1.4 ..................... SUCCESS [ 2.460 s][INFO] Flink : Connectors : HBase 2.2 ..................... SUCCESS [ 17.624 s][INFO] Flink : Formats : Hadoop bulk ...................... SUCCESS [ 0.961 s][INFO] Flink : Formats : Orc .............................. SUCCESS [ 2.646 s][INFO] Flink : Formats : Orc nohive ....................... SUCCESS [ 1.337 s][INFO] Flink : Formats : Avro ............................. SUCCESS [02:34 min][INFO] Flink : Formats : Parquet .......................... SUCCESS [ 18.528 s][INFO] Flink : Formats : Csv .............................. SUCCESS [ 1.416 s][INFO] Flink : Connectors : Hive .......................... SUCCESS [ 36.982 s][INFO] Flink : Connectors : JDBC .......................... SUCCESS [ 48.576 s][INFO] Flink : Connectors : RabbitMQ ...................... SUCCESS [ 2.067 s][INFO] Flink : Connectors : Twitter ....................... SUCCESS [ 2.434 s][INFO] Flink : Connectors : Nifi .......................... SUCCESS [ 2.316 s][INFO] Flink : Connectors : Cassandra ..................... SUCCESS [ 6.142 s][INFO] Flink : Metrics : JMX .............................. SUCCESS [ 0.886 s][INFO] Flink : Connectors : Kafka ......................... SUCCESS [ 47.380 s][INFO] Flink : Connectors : Google PubSub ................. SUCCESS [04:26 min][INFO] Flink : Connectors : Kinesis ....................... SUCCESS [03:05 min][INFO] Flink : Connectors : SQL : Elasticsearch 6 ......... SUCCESS [ 8.505 s][INFO] Flink : Connectors : SQL : Elasticsearch 7 ......... SUCCESS [ 8.725 s][INFO] Flink : Connectors : SQL : HBase 1.4 ............... SUCCESS [ 10.299 s][INFO] Flink : Connectors : SQL : HBase 2.2 ............... SUCCESS [ 14.018 s][INFO] Flink : Connectors : SQL : Hive 1.2.2 .............. SUCCESS [01:11 min][INFO] Flink : Connectors : SQL : Hive 2.2.0 .............. SUCCESS [01:12 min][INFO] Flink : Connectors : SQL : Hive 2.3.6 .............. SUCCESS [01:17 min][INFO] Flink : Connectors : SQL : Hive 3.1.2 .............. SUCCESS [ 9.660 s][INFO] Flink : Connectors : SQL : Kafka ................... SUCCESS [ 1.910 s][INFO] Flink : Connectors : SQL : Kinesis ................. SUCCESS [ 9.730 s][INFO] Flink : Formats : Avro confluent registry .......... SUCCESS [ 57.775 s][INFO] Flink : Formats : Sequence file .................... SUCCESS [ 0.800 s][INFO] Flink : Formats : Compress ......................... SUCCESS [ 0.637 s][INFO] Flink : Formats : SQL Orc .......................... SUCCESS [ 0.389 s][INFO] Flink : Formats : SQL Parquet ...................... SUCCESS [ 0.743 s][INFO] Flink : Formats : SQL Avro ......................... SUCCESS [ 1.113 s][INFO] Flink : Formats : SQL Avro Confluent Registry ...... SUCCESS [ 5.712 s][INFO] Flink : Examples : Streaming ....................... SUCCESS [ 16.690 s][INFO] Flink : Examples : Table ........................... SUCCESS [ 10.546 s][INFO] Flink : Examples : Build Helper : .................. SUCCESS [ 0.137 s][INFO] Flink : Examples : Build Helper : Streaming Twitter SUCCESS [ 0.601 s][INFO] Flink : Examples : Build Helper : Streaming State machine SUCCESS [ 0.649 s][INFO] Flink : Examples : Build Helper : Streaming Google PubSub SUCCESS [ 9.378 s][INFO] Flink : Container .................................. SUCCESS [ 0.395 s][INFO] Flink : Queryable state : Runtime .................. SUCCESS [ 1.123 s][INFO] Flink : Mesos ...................................... SUCCESS [ 26.902 s][INFO] Flink : Kubernetes ................................. SUCCESS [ 14.923 s][INFO] Flink : Yarn ....................................... SUCCESS [ 2.675 s][INFO] Flink : Libraries : Gelly .......................... SUCCESS [ 7.177 s][INFO] Flink : Libraries : Gelly scala .................... SUCCESS [ 17.671 s][INFO] Flink : Libraries : Gelly Examples ................. SUCCESS [ 11.207 s][INFO] Flink : External resources : ....................... SUCCESS [ 0.075 s][INFO] Flink : External resources : GPU ................... SUCCESS [ 0.273 s][INFO] Flink : Metrics : Dropwizard ....................... SUCCESS [ 0.422 s][INFO] Flink : Metrics : Graphite ......................... SUCCESS [ 0.238 s][INFO] Flink : Metrics : InfluxDB ......................... SUCCESS [07:16 min][INFO] Flink : Metrics : Prometheus ....................... SUCCESS [ 3.748 s][INFO] Flink : Metrics : StatsD ........................... SUCCESS [ 0.448 s][INFO] Flink : Metrics : Datadog .......................... SUCCESS [ 0.386 s][INFO] Flink : Metrics : Slf4j ............................ SUCCESS [ 0.383 s][INFO] Flink : Libraries : CEP Scala ...................... SUCCESS [ 14.679 s][INFO] Flink : Table : Uber ............................... SUCCESS [ 6.785 s][INFO] Flink : Table : Uber Blink ......................... SUCCESS [ 7.516 s][INFO] Flink : Python ..................................... SUCCESS [26:23 min][INFO] Flink : Table : SQL Client ......................... SUCCESS [ 3.099 s][INFO] Flink : Libraries : State processor API ............ SUCCESS [ 2.393 s][INFO] Flink : ML : ....................................... SUCCESS [ 0.132 s][INFO] Flink : ML : API ................................... SUCCESS [ 0.599 s][INFO] Flink : ML : Lib ................................... SUCCESS [ 1.354 s][INFO] Flink : ML : Uber .................................. SUCCESS [ 0.233 s][INFO] Flink : Scala shell ................................ SUCCESS [ 15.446 s][INFO] Flink : Dist ....................................... SUCCESS [ 21.568 s][INFO] Flink : Yarn Tests ................................. SUCCESS [ 5.727 s][INFO] Flink : E2E Tests : ................................ SUCCESS [09:56 min][INFO] Flink : E2E Tests : CLI ............................ SUCCESS [ 0.343 s][INFO] Flink : E2E Tests : Parent Child classloading program SUCCESS [ 0.326 s][INFO] Flink : E2E Tests : Parent Child classloading lib-package SUCCESS [ 0.274 s][INFO] Flink : E2E Tests : Dataset allround ............... SUCCESS [ 0.340 s][INFO] Flink : E2E Tests : Dataset Fine-grained recovery .. SUCCESS [ 0.315 s][INFO] Flink : E2E Tests : Datastream allround ............ SUCCESS [ 1.236 s][INFO] Flink : E2E Tests : Batch SQL ...................... SUCCESS [ 0.336 s][INFO] Flink : E2E Tests : Stream SQL ..................... SUCCESS [ 0.330 s][INFO] Flink : E2E Tests : Distributed cache via blob ..... SUCCESS [ 0.246 s][INFO] Flink : E2E Tests : High parallelism iterations .... SUCCESS [ 6.366 s][INFO] Flink : E2E Tests : Stream stateful job upgrade .... SUCCESS [ 0.640 s][INFO] Flink : E2E Tests : Queryable state ................ SUCCESS [ 1.633 s][INFO] Flink : E2E Tests : Local recovery and allocation .. SUCCESS [ 0.316 s][INFO] Flink : E2E Tests : Elasticsearch 5 ................ SUCCESS [ 5.663 s][INFO] Flink : E2E Tests : Elasticsearch 6 ................ SUCCESS [ 2.627 s][INFO] Flink : Quickstart : ............................... SUCCESS [ 1.424 s][INFO] Flink : Quickstart : Java .......................... SUCCESS [ 5.927 s][INFO] Flink : Quickstart : Scala ......................... SUCCESS [ 0.257 s][INFO] Flink : E2E Tests : Quickstart ..................... SUCCESS [ 0.573 s][INFO] Flink : E2E Tests : Confluent schema registry ...... SUCCESS [ 2.140 s][INFO] Flink : E2E Tests : Stream state TTL ............... SUCCESS [ 3.017 s][INFO] Flink : E2E Tests : SQL client ..................... SUCCESS [ 3.099 s][INFO] Flink : E2E Tests : File sink ...................... SUCCESS [ 0.881 s][INFO] Flink : E2E Tests : State evolution ................ SUCCESS [ 0.710 s][INFO] Flink : E2E Tests : RocksDB state memory control ... SUCCESS [ 0.754 s][INFO] Flink : E2E Tests : Common ......................... SUCCESS [ 0.997 s][INFO] Flink : E2E Tests : Metrics availability ........... SUCCESS [ 0.363 s][INFO] Flink : E2E Tests : Metrics reporter prometheus .... SUCCESS [ 0.463 s][INFO] Flink : E2E Tests : Heavy deployment ............... SUCCESS [ 6.358 s][INFO] Flink : E2E Tests : Connectors : Google PubSub ..... SUCCESS [ 4.792 s][INFO] Flink : E2E Tests : Streaming Kafka base ........... SUCCESS [ 0.502 s][INFO] Flink : E2E Tests : Streaming Kafka ................ SUCCESS [ 5.888 s][INFO] Flink : E2E Tests : Plugins : ...................... SUCCESS [ 0.123 s][INFO] Flink : E2E Tests : Plugins : Dummy fs ............. SUCCESS [ 0.230 s][INFO] Flink : E2E Tests : Plugins : Another dummy fs ..... SUCCESS [ 0.199 s][INFO] Flink : E2E Tests : TPCH ........................... SUCCESS [ 1.332 s][INFO] Flink : E2E Tests : Streaming Kinesis .............. SUCCESS [ 10.843 s][INFO] Flink : E2E Tests : Elasticsearch 7 ................ SUCCESS [ 3.079 s][INFO] Flink : E2E Tests : Common Kafka ................... SUCCESS [ 8.186 s][INFO] Flink : E2E Tests : TPCDS .......................... SUCCESS [ 1.140 s][INFO] Flink : E2E Tests : Netty shuffle memory control ... SUCCESS [ 0.273 s][INFO] Flink : E2E Tests : Python ......................... SUCCESS [ 5.884 s][INFO] Flink : E2E Tests : HBase .......................... SUCCESS [ 2.096 s][INFO] Flink : State backends : Heap spillable ............ SUCCESS [ 0.988 s][INFO] Flink : Contrib : .................................. SUCCESS [ 0.081 s][INFO] Flink : Contrib : Connectors : Wikiedits ........... SUCCESS [ 0.427 s][INFO] Flink : FileSystems : Tests ........................ SUCCESS [ 1.078 s][INFO] Flink : Docs ....................................... SUCCESS [ 1.369 s][INFO] Flink : Walkthrough : .............................. SUCCESS [ 0.151 s][INFO] Flink : Walkthrough : Common ....................... SUCCESS [ 1.470 s][INFO] Flink : Walkthrough : Datastream Java .............. SUCCESS [ 0.181 s][INFO] Flink : Walkthrough : Datastream Scala ............. SUCCESS [ 0.168 s][INFO] Flink : Tools : CI : Java .......................... SUCCESS [ 1.588 s][INFO] ------------------------------------------------------------------------[INFO] BUILD SUCCESS[INFO] ------------------------------------------------------------------------[INFO] Total time: 02:01 h[INFO] Finished at: -02-13T15:59:53+08:00[INFO] ------------------------------------------------------------------------

在 flink 的 code path 的根目录,执行:

cd build-target

查看 flink 的版本

./bin/flink -v

BoYi-Pro:flink-1.12.1 sysadmin$ ./bin/flink -vVersion: 1.12.1, Commit ID: 148bba5BoYi-Pro:flink-1.12.1 sysadmin$

1.3. 配置idea

导入idea
IntelliJ IDEA -> File -> New -> Project from existing sources…,选择 Flink 代码的根路径在“Import project from external model”中选择“Maven”,然后一路点击“next”直到结束IntelliJ IDEA -> File -> Project Structure… -> Project Settings -> Project,检查 ProjectSDK 是否符合预期(因为在之前的步骤中我们已经配置了 JAVA_HOME,所以一般是符合预期的),如果不是就点击“New”,然后选择之前步骤中安装的 JDK home 目录
添加 Java 的 Checkstyle

在 Intellij 中添加 Checkstyle 是很重要的,因为 Flink 在编译时会强制代码风格的检查,如果代码风格不符合规范,可能会直接编译失败。

Intellij 内置对 Checkstyle 的支持,可以检查一下 Checkstyle-IDEA plugin 是否安装(IntelliJ IDEA -> Preferences -> Plugins,搜索“Checkstyle-IDEA”)。

配置 Java Checkstyle:1. IntelliJ IDEA -> Preferences -> Other Settings -> Checkstyle2. 设置 “Scan Scope”为“Only Java sources (including tests)”3. 在“Checkstyle Version”下拉框中选择“8.9”4. 在“Configuration File”中点击“+”新增一个 flink 的配置:a. “Description”填“Flink”b. “Use a local Checkstyle file”选择本代码下的 tools/maven/checkstyle.xml 文件c. 勾选“Store relative to project location”,然后点击“Next”d. 配置“checkstyle.suppressions.file” 的值为"suppressions.xml",然后点击“Next”和“Finish”e. 勾选上“Flink”作为唯一生效的 checkstyle 配置,点击“Apply”和“OK”5. IntelliJ IDEA -> Preferences -> Editor -> Code Style -> Java,点击⚙齿轮按钮,选择“Import Scheme” -> “Checkstyle Configuration”,选择 checkstyle.xml 文件。这样配置后,Intellij 在自动 import 的时候会按照规则,把 import 代码添加到正确的位置。

添加 Scala 的 Checkstyle

1. 将“tools/maven/scalastyle-config.xml”文件拷贝到 flink 代码根目录的“.idea”子目录中2. IntelliJ IDEA -> Preferences -> Editor -> Inspections,搜索“Scala style inspections”,勾选这一项

1.4. 在 Intellij 中运行 example

flink 代码编译完成后,直接选择一个 example 即可运行,如:org.apache.flink.streaming.examples.windowing.WindowWordCount.java

异常 : IDEA运行flink 报Error:java: 无效的标记: --add-exports=java.base/sun.ne

原因:

在依赖jdk11的IDEA下,无论是否选择jdk11去运行maven编译flink1.9,IDEA在用maven编译flink的时候都会添加 --add-exports 选项。

具体原因不详,可能是IDEA的bug。

解决方法:

方法1、“Intellij” -> “View” -> “Tool Windows” ->“Maven” -> “Profiles” -> 取消 “java11” -> 重新导入 maven 项目。

方法2、下载with bundled JBR 8版的IDEA。

二 .运行 Flink 应用

2.1. 基本概念

运行 Flink 应用其实非常简单,但是在运行 Flink 应用之前,还是有必要了解 Flink 运行时的各个组件,因为这涉及到 Flink 应用的配置问题。如图所示,这是用户用 DataStream API 写的一个数据处理程序。可以看到,在一个 DAG 图中不能被 Chain 在一起的 Operator 会被分隔到不同的 Task 中,也就是说 Task 是 Flink 中资源调度的最小单位。

2.2. Flink Runtime 架构图

Flink 实际运行时包括两类进程:

JobManager(又称为 JobMaster):协调 Task 的分布式执行,包括调度 Task、协调创 Checkpoint 以及当 Job failover 时协调各个 Task 从 Checkpoint 恢复等。

TaskManager(又称为 Worker):执行 Dataflow 中的 Tasks,包括内存 Buffer 的分配、Data Stream 的传递等。

Task Slot 是一个 TaskManager 中的最小资源分配单位,一个 TaskManager 中有多少个 Task Slot 就意味着能支持多少并发的 Task 处理。

需要注意的是,一个 Task Slot 中可以执行多个 Operator,一般这些 Operator 是能被 Chain 在一起处理的。

三 .运行环境准备

运行环境准备

准备 Flink binary

○ 直接从 Flink [官网]( /downloads.html) 上下载 Flink binary 的压缩包○ 或者从 Flink 源码编译而来

安装 Java,并配置 JAVA_HOME 环境变量

四 .单机 Standalone 的方式运行 Flink

4.1. 基本的启动流程

最简单的运行 flink 应用的方法就是以单机 standalone 的方式运行。

进入 Flink binary 的目录下(一般情况下不需要修改配置文件),执行:

./bin/start-cluster.sh

输出结果如下,表示启动正常 :

BoYi-Pro:flink-1.12.1 sysadmin$ pwd/opt/workspace/apache/flink-1.12.1/flink-dist/target/flink-1.12.1-bin/flink-1.12.1BoYi-Pro:flink-1.12.1 sysadmin$ ./bin/start-cluster.shStarting cluster.Starting standalonesession daemon on host BoYi-Pro.lan.Starting taskexecutor daemon on host BoYi-Pro.lan.

启动成功后,打开 http://127.0.0.1:8081/ 就能看到 Flink 的 web 界面: 提交一个 Word Count 的任务,

命令如下:

./bin/flink run examples/streaming/WordCount.jar

BoYi-Pro:flink-1.12.1 sysadmin$ pwd/opt/workspace/apache/flink-1.12.1/flink-dist/target/flink-1.12.1-bin/flink-1.12.1BoYi-Pro:flink-1.12.1 sysadmin$BoYi-Pro:flink-1.12.1 sysadmin$ ./bin/flink run examples/streaming/WordCount.jarExecuting WordCount example with default input data set.Use --input to specify file input.Printing result to stdout. Use --output to specify output path.Job has been submitted with JobID 38382be084abe507ed181a005c84adb3Program execution finishedJob with JobID 38382be084abe507ed181a005c84adb3 has finished.Job Runtime: 963 ms

可以在 web 界面上看到 job 完成了

点击 job 信息,可以看到 Word Count 的执行图:

点进最下面的 vertex,可以通过 stdout 信息看到 Word Count 的结果:

可以指定自己本地的资源文件路径

我们尝试通过“--input”参数指定我们自己的本地文件作为输入,然后执行:./bin/flink run examples/streaming/WordCount.jar --input ${your_source_file}

4.2. 常用配置介绍

我们在本机上执行 jps 命令,可以看到 Flink 相关的进程主要有两个,

一个是 JobManager 进程,另一个是 TaskManager 进程。

我们可以进一步用 ps 命令看看进程的启动参数:

BoYi-Pro:flink-1.12.1 sysadmin$ jps -ml2356 org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir /opt/workspace/apache/flink-1.12.1/flink-dist/target/flink-1.12.1-bin/flink-1.12.1/conf -D taskmanager.memory.framework.off-heap.size=134217728b -D work.max=134217730b -D work.min=134217730b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=402653174b -D taskmanager.memory.task.off-heap.size=0b2122 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir /opt/workspace/apache/flink-1.12.1/flink-dist/target/flink-1.12.1-bin/flink-1.12.1/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=26592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=26592b

conf / slaves

conf / slaves 用于配置 TaskManager 的部署,默认配置下只会启动一个 TaskManager 进程,如果想增加一个 TaskManager 进程的,只需要文件中追加一行“localhost”。

也可以直接通过“ ./bin/taskmanager.sh start ”这个命令来追加一个新的 TaskManager:

./bin/taskmanager.sh start|start-foreground|stop|stop-all

结合 Flink binary 目录下的 conf 子目录中的 flink-conf.yaml 文件,常用的配置有下面几个:

# The heap size for the JobManager JVMjobmanager.heap.mb: 1024# The heap size for the TaskManager JVMtaskmanager.heap.mb: 1024# The number of task slots that each TaskManager offers. Each slot runs one parallelpipeline.taskmanager.numberOfTaskSlots: 1# the managed memory size for each task manager.taskmanager.managed.memory.size: 256# The parallelism used for programs that did not specify and other parallelism.parallelism.default: 1

JobManger对应的内存监控图:

Task Manger 监控信息

4.3. 修改配置

我们可以先用下面这个命令停掉 standalone 集群:

./bin/stop-cluster.sh

修改 flink-conf.yaml 中的这几个配置如下:

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.taskmanager.numberOfTaskSlots: 4

重启 standalone 集群:

./bin/start-cluster.sh

启动成功后,打开 http://127.0.0.1:8081/ 就能看到 Flink 的 web 界面:注意: 不要多次执行start-cluster.sh启动集群, 否则会多出Task Managers

五 .日志的查看和配置

JobManager 和 TaskManager 的启动日志可以在 Flink binary 目录下的 log 子目录中找到:

BoYi-Pro:log sysadmin$ pwd/opt/tools/flink-1.12.1/logBoYi-Pro:log sysadmin$ ls -altotal 696drwxr-xr-x@ 19 sysadmin staff 608 2 14 01:02 .drwxr-xr-x@ 14 sysadmin staff 448 2 13 22:41 ..-rw-r--r-- 1 sysadmin staff 5250 2 13 22:43 flink-sysadmin-client-BoYi-Pro.lan.log-rw-r--r-- 1 sysadmin staff 15518 2 14 01:03 flink-sysadmin-standalonesession-0-BoYi-Pro.lan.log-rw-r--r-- 1 sysadmin staff 16334 2 14 00:52 flink-sysadmin-standalonesession-0-BoYi-Pro.lan.log.1-rw-r--r-- 1 sysadmin staff 15671 2 14 01:02 flink-sysadmin-standalonesession-0-BoYi-Pro.lan.log.2-rw-r--r-- 1 sysadmin staff0 2 14 01:02 flink-sysadmin-standalonesession-0-BoYi-Pro.lan.out-rw-r--r-- 1 sysadmin staff 16834 2 14 00:46 flink-sysadmin-standalonesession-1-BoYi-Pro.lan.log-rw-r--r-- 1 sysadmin staff0 2 14 00:44 flink-sysadmin-standalonesession-1-BoYi-Pro.lan.out-rw-r--r-- 1 sysadmin staff 17713 2 14 01:03 flink-sysadmin-taskexecutor-0-BoYi-Pro.lan.log-rw-r--r-- 1 sysadmin staff 18932 2 14 00:52 flink-sysadmin-taskexecutor-0-BoYi-Pro.lan.log.1-rw-r--r-- 1 sysadmin staff 18260 2 14 01:02 flink-sysadmin-taskexecutor-0-BoYi-Pro.lan.log.2-rw-r--r-- 1 sysadmin staff0 2 14 01:02 flink-sysadmin-taskexecutor-0-BoYi-Pro.lan.out-rw-r--r-- 1 sysadmin staff 71001 2 14 00:46 flink-sysadmin-taskexecutor-1-BoYi-Pro.lan.log-rw-r--r-- 1 sysadmin staff0 2 14 00:41 flink-sysadmin-taskexecutor-1-BoYi-Pro.lan.out-rw-r--r-- 1 sysadmin staff 58360 2 14 00:46 flink-sysadmin-taskexecutor-2-BoYi-Pro.lan.log-rw-r--r-- 1 sysadmin staff0 2 14 00:42 flink-sysadmin-taskexecutor-2-BoYi-Pro.lan.out-rw-r--r-- 1 sysadmin staff 18813 2 14 00:46 flink-sysadmin-taskexecutor-3-BoYi-Pro.lan.log-rw-r--r-- 1 sysadmin staff0 2 14 00:44 flink-sysadmin-taskexecutor-3-BoYi-Pro.lan.outBoYi-Pro:log sysadmin$

log 目录中以“flink-${user}-standalonesession-${id}-${hostname}”为前缀的文件对应的即是JobManager 的输出,其中有三个文件:

● flink-${user}-standalonesession-${id}-${hostname}.log:代码中的日志输出

● flink-${user}-standalonesession-${id}-${hostname}.out:进程执行时的 stdout 输出

● flink-${user}-standalonesession-${id}-${hostname}-gc.log:JVM 的 GC 的日志

log 目录中以“flink-${user}-taskexecutor-${id}-${hostname}”为前缀的文件对应的是 TaskManager的输出,也包括三个文件,和 JobManager 的输出一致。

日志的配置文件在 Flink binary 目录的 conf 子目录下:

BoYi-Pro:conf sysadmin$ pwd/opt/tools/flink-1.12.1/confBoYi-Pro:conf sysadmin$ ls -altotal 120drwxr-xr-x@ 14 sysadmin staff 448 1 10 08:29 .drwxr-xr-x@ 14 sysadmin staff 448 2 13 22:41 ..-rw-r--r--@ 1 sysadmin staff 10343 2 14 01:05 flink-conf.yaml-rw-r--r--@ 1 sysadmin staff 2761 1 9 21:45 log4j-cli.properties-rw-r--r--@ 1 sysadmin staff 2967 1 9 21:45 log4j-console.properties-rw-r--r--@ 1 sysadmin staff 1967 1 9 21:45 log4j-session.properties-rw-r--r--@ 1 sysadmin staff 2620 1 9 21:45 log4j.properties-rw-r--r--@ 1 sysadmin staff 2740 12 17 10:04 logback-console.xml-rw-r--r--@ 1 sysadmin staff 1550 12 17 10:04 logback-session.xml-rw-r--r--@ 1 sysadmin staff 2331 6 30 logback.xml-rw-r--r--@ 1 sysadmin staff15 6 30 masters-rw-r--r--@ 1 sysadmin staff 5441 1 9 21:45 sql-client-defaults.yaml-rw-r--r--@ 1 sysadmin staff10 12 17 10:04 workers-rw-r--r--@ 1 sysadmin staff 1434 6 30 zoo.cfgBoYi-Pro:conf sysadmin$

其中:

● log4j-cli.properties:用 Flink 命令行时用的 log 配置,比如执行“flink run”命令

● log4j-yarn-session.properties:是用 yarn-session.sh 启动时命令行执行时用的 log 配置

● log4j.properties:无论是 standalone 还是 yarn 模式,JobManager 和 TaskManager 上用的 log 配置都是 log4j.properties

这三个“log4j.*properties”文件分别有三个“logback.*xml”文件与之对应,如果想使用 logback 的同学,之需要把与之对应的“log4j.*properties”文件删掉即可,对应关系如下:

● log4j-cli.properties -> logback-console.xml

● log4j-yarn-session.properties -> logback-yarn.xml

● log4j.properties -> logback.xml

需要注意的是,“flink-${user}-standalonesession-${id}-${hostname}”和“flink-${user}-taskexecutor-${id}-${hostname}”都带有“${id}”,“${id}”表示本进程在本机上该角色(JobManager或 TaskManager)的所有进程中的启动顺序,默认从 0 开始。

六 .多机部署 Flink Standalone 集群

6.1. 部署前要注意的要点:

● 每台机器上配置好 java 以及 JAVA_HOME 环境变量● 最好挑选一台机器,和其他机器 ssh 打通● 每台机器上部署的 Flink binary 的目录要保证是同一个目录● 如果需要用 hdfs,需要配置 HADOOP_CONF_DIR 环境变量配置上

6.2. 根据你的集群信息修改 conf/masters 和 conf/slaves 配置。

6.3. 修改 conf/flink-conf.yaml 配置,注意要确保和 Masters 文件中的地址一致:

jobmanager.rpc.address: master01

6.4. 确保所有机器的 Flink binary 目录中 conf 中的配置文件相同,特别是以下三个:

conf/mastersconf/slavesconf/flink-conf.yaml

6.5. 然后启动 Flink 集群:

./bin/start-cluster.sh

6.6. 提交 WordCount 作业:

./bin/flink run examples/streaming/WordCount.jar

6.7. 上传 WordCount 的 Input 文件:

hdfs dfs -copyFromLocal story /test_dir/input_dir/story

6.8. 提交读写 HDFS 的 WordCount 作业:

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

6.9. 增加 WordCount 作业的并发度(注意输出文件重名会提交失败):

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output --parallelism 20

七 .Standalone 模式的 HighAvailability(HA)部署和配置

JobManager 是整个系统中最可能导致系统不可用的角色。如果一个 TaskManager 挂了,在资源足够的情况下,只需要把相关 Task 调度到其他空闲 TaskSlot 上,然后 Job 从 Checkpoint 中恢复即可。而如果当前集群中只配置了一个 JobManager,则一旦 JobManager 挂了,就必须等待这个 JobManager 重新恢复,如果恢复时间过长,就可能导致整个 Job 失败。

因此如果在生产业务使用 Standalone 模式,则需要部署配置 HighAvailability,这样同时可以有多个 JobManager 待命,从而使得 JobManager 能够持续服务。

7.1. (可选)使用 Flink 自带的脚本部署 ZK

Flink 目前支持基于 Zookeeper 的 HA。如果你的集群中没有部署 ZK,Flink 提供了启动 Zookeeper 集群的脚本。首先修改配置文件“conf/zoo.cfg”,根据你要部署的 Zookeeper Server 的机器数来配置“server.X=addressX:peerPort:leaderPort”,其中“X”是一个 Zookeeper Server的唯一 ID,且必须是数字。

修改conf/zoo.cfg

# The port at which the clients will connectclientPort=2181server.1=z05f06378.sqa.:4888:5888server.2=z05c19426.sqa.:4888:5888server.3=z05f10219.sqa.:4888:5888

然后启动 Zookeeper:

./bin/start-zookeeper-quorum.sh

jps 命令看到 ZK 进程已经启动:

jps -ml

停掉 Zookeeper 集群的命令:

./bin/stop-zookeeper-quorum.sh

7.2. 修改 Flink Standalone 集群的配置

停掉之前启动到 standalone 集群:

./bin/stop-cluster.sh

修改conf/master 配置

$vi conf/mastersmaster01:8081master02:8081

修改conf/slave配置

$vi conf/slavesslave01slave02slave03

修改 conf/flink-conf.yaml 文件:

# 配置high-availability modehigh-availability: zookeeper# 配置zookeeper quorum(hostname和端口需要依据对应zk的实际配置)high-availability.zookeeper.quorum: zk01:2181,zk02:2181,zk03:2181# (可选)设置zookeeper的root目录high-availability.zookeeper.path.root: /test_dir/test_standalone2_root# (可选)相当于是这个standalone集群中创建的zk node的namespacehigh-availability.cluster-id: /test_dir/test_standalone2# JobManager的meta信息放在dfs,在zk上主要会保存一个指向dfs路径的指针high-availability.storageDir: hdfs:///test_dir/recovery2/

需要注意的是,在 HA 模式下 conf/flink-conf.yaml 中的这两个配置都失效了

jobmanager.rpc.addressjobmanager.rpc.port

修改完成后,再把这几个文件同步到不同机器到相同 conf 目录下。

7.3. 启动&验证

启动服务

启动 zookeeper 集群:./bin/start-zookeeper-quorum.sh

再启动 standalone 集群: ./bin/start-cluster.sh

打开web页面

分别打开两个 Master 节点上的 JobManager Web 页面:

http://master01:8081

http://master02:8081

可以看到两个页面最后都转到了同一个地址上,这个地址就是当前主 JobManager 所在机器,另一个就是 Standby JobManager。

以上我们就完成了 Standalone 模式下 HA 的配置。

启动jobmanager

启动: ./bin/jobmanager.sh start master02 8081

八. 使用 Yarn 模式跑 Flink job

相对于 standalone 模式,yarn 模式允许 flink job 的好处有:

资源按需使用,提高集群的资源利用率任务有优先级,根据优先级运行作业基于 YARN 调度系统,能够自动化地处理各个角色的 failover

○ JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控

○ 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度JobManager 到其他机器

○ 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 YarnResourceManager 申请资源,重新启动 TaskManager

8.1.在 YARN 上启动 long running 的 flink 集群(yarn session)

查看命令参数:

./bin/yarn-session.sh -h

创建一个 Yarn 模式的 Flink 集群:

./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m

其中用到的参数是:

-n,–container Number of TaskManagers-jm,–jobManagerMemory Memory for JobManager Container with optional unit (default: MB)-tm,–taskManagerMemory Memory per TaskManager Container with optional unit (default: MB)-qu,–queue Specify YARN queue.-s,–slots Number of slots per TaskManager-t,–ship Ship files in the specified directory (t for transfer)

提交一个 Flink job 到 Flink 集群:

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

这次提交 Flink job,虽然没有指定对应 Yarn application 的信息,却可以提交到对应的 Flink 集群,原因在于“/tmp/.yarn-properties-${user}”文件中保存了上一次创建 Yarn session 的集群信息。所以如果同一用户在同一机器上再次创建一个 Yarn session,则这个文件会被覆盖掉。

如果删掉“/tmp/.yarn-properties-${user}”或者在另一个机器上提交作业能否提交到预期到yarn session中呢?

可以配置了“high-availability.cluster-id”参数,据此从 Zookeeper 上获取到 JobManager 的地址和端口,从而提交作业。

如果 Yarn session 没有配置 HA,又该如何提交呢?

这个时候就必须要在提交 Flink job 的命令中指明 Yarn 上的 Application ID,通过“-yid”参数传入:

/bin/flink run -yid application_1548056325049_0048 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

我们可以发现,每次跑完任务不久,TaskManager 就被释放了,下次在提交任务的时候,TaskManager 又会重新拉起来。如果希望延长空闲 TaskManager 的超时时间,可以在 conf/flink-conf.yaml 文件中配置下面这个参数,单位是 milliseconds:

slotmanager.taskmanager-timeout: 30000L # deprecated, used in release-1.5

resourcemanager.taskmanager-timeout: 30000L

8.2.在 Yarn 上运行单个 Flink job(Job Cluster 模式)

如果你只想运行单个 Flink Job 后就退出,那么可以用下面这个命令:

./bin/flink run -m yarn-cluster -yn 2 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

常用的配置有:

-yn,–yarncontainer Number of Task Managers-yqu,–yarnqueue Specify YARN queue.-ys,–yarnslots Number of slots per TaskManager-yqu,–yarnqueue Specify YARN queue.

可以通过 Help 命令查看 Run 的可用参数:

./bin/flink run -h

BoYi-Pro:flink-1.12.1 sysadmin$ ./bin/flink run -hAction "run" compiles and runs a program.Syntax: run [OPTIONS] <jar-file> <arguments>"run" action options:-c,--class <classname>Class with the program entry point("main()" method). Only needed if theJAR file does not specify the class inits manifest.-C,--classpath <url> Adds a URL to each user codeclassloader on all nodes in thecluster. The paths must specify aprotocol (e.g. file://) and beaccessible on all nodes (e.g. by meansof a NFS share). You can use thisoption multiple times for specifyingmore than one URL. The protocol mustbe supported by the {@.URLClassLoader}.-d,--detached If present, runs the job in detachedmode-n,--allowNonRestoredState Allow to skip savepoint state thatcannot be restored. You need to allowthis if you removed an operator fromyour program that was part of theprogram when the savepoint wastriggered.-p,--parallelism <parallelism> The parallelism with which to run theprogram. Optional flag to override thedefault value specified in theconfiguration.-py,--python <pythonFile> Python script with the program entrypoint. The dependent resources can beconfigured with the `--pyFiles`option.-pyarch,--pyArchives <arg> Add python archive files for job. Thearchive files will be extracted to theworking directory of python UDFworker. Currently only zip-format issupported. For each archive file, atarget directory be specified. If thetarget directory name is specified,the archive file will be extracted toa name can directory with thespecified name. Otherwise, the archivefile will be extracted to a directorywith the same name of the archivefile. The files uploaded via thisoption are accessible via relativepath. '#' could be used as theseparator of the archive file path andthe target directory name. Comma (',')could be used as the separator tospecify multiple archive files. Thisoption can be used to upload thevirtual environment, the data filesused in Python UDF (e.g.: --pyArchivesfile:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutablepy37.zip/py37/bin/python). The datafiles could be accessed in Python UDF,e.g.: f = open('data/data.txt', 'r').-pyexec,--pyExecutable <arg> Specify the path of the pythoninterpreter used to execute the pythonUDF worker (e.g.: --pyExecutable/usr/local/bin/python3). The pythonUDF worker depends on Python 3.5+,Apache Beam (version == 2.23.0), Pip(version >= 7.1.0) and SetupTools(version >= 37.0.0). Please ensurethat the specified environment meetsthe above requirements.-pyfs,--pyFiles <pythonFiles> Attach custom python files for job.These files will be added to thePYTHONPATH of both the local clientand the remote python UDF worker. Thestandard python resource file suffixessuch as .py/.egg/.zip or directory areall supported. Comma (',') could beused as the separator to specifymultiple files (e.g.: --pyFilesfile:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).-pym,--pyModule <pythonModule> Python module with the program entrypoint. This option must be used inconjunction with `--pyFiles`.-pyreq,--pyRequirements <arg> Specify a requirements.txt file whichdefines the third-party dependencies.These dependencies will be installedand added to the PYTHONPATH of thepython UDF worker. A directory whichcontains the installation packages ofthese dependencies could be specifiedoptionally. Use '#' as the separatorif the optional parameter exists(e.g.: --pyRequirementsfile:///tmp/requirements.txt#file:///tmp/cached_dir).-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the jobfrom (for examplehdfs:///flink/savepoint-1537).-sae,--shutdownOnAttachedExit If the job is submitted in attachedmode, perform a best-effort clustershutdown when the CLI is terminatedabruptly, e.g., in response to a userinterrupt, such as typing Ctrl + C.Options for Generic CLI mode:-D <property=value> Allows specifying multiple generic configurationoptions. The available options can be found at/projects/flink/flink-docs-stable/ops/config.html-e,--executor <arg> DEPRECATED: Please use the -t option instead which isalso available with the "Application Mode".The name of the executor to be used for executing thegiven job, which is equivalent to the"execution.target" config option. The currentlyavailable executors are: "remote", "local","kubernetes-session", "yarn-per-job", "yarn-session".-t,--target <arg>The deployment target for the given application,which is equivalent to the "execution.target" configoption. For the "run" action the currently availabletargets are: "remote", "local", "kubernetes-session","yarn-per-job", "yarn-session". For the"run-application" action the currently availabletargets are: "kubernetes-application".Options for yarn-cluster mode:-m,--jobmanager <arg> Set to yarn-cluster to use YARN executionmode.-yid,--yarnapplicationId <arg> Attach to running YARN session-z,--zookeeperNamespace <arg> Namespace to create the Zookeepersub-paths for high availability modeOptions for default mode:-D <property=value> Allows specifying multiple genericconfiguration options. The availableoptions can be found at/projects/flink/flink-docs-stable/ops/config.html-m,--jobmanager <arg> Address of the JobManager to which toconnect. Use this flag to connect to adifferent JobManager than the one specifiedin the configuration. Attention: Thisoption is respected only if thehigh-availability configuration is NONE.-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-pathsfor high availability mode

8.3.Yarn 模式下的 HighAvailability 配置

首先要确保启动 Yarn 集群用的“yarn-site.xml”文件中的这个配置,这个是 Yarn 集群级别 AM 重启的上限。

<property><name>yarn.resourcemanager.am.max-attempts</name><value>100</value></property>

然后在 conf/flink-conf.yaml 文件中配置这个 Flink job 的 JobManager 能够重启的次数。

yarn.application-attempts: 10 # 1+ 9 retries

最后再在 conf/flink-conf.yaml 文件中配置上 ZK 相关配置,这几个配置的配置方法和 Standalone 的 HA 配置方法基本一致,如下所示。

# 配置high-availability modehigh-availability: zookeeper# 配置zookeeper quorum(hostname和端口需要依据对应zk的实际配置)high-availability.zookeeper.quorum: zk01:2181,zk02:2181# (可选)设置zookeeper的root目录high-availability.zookeeper.path.root: /test_dir/test_standalone2_root# 删除这个配置# high-availability.cluster-id: /test_dir/test_standalone2# JobManager的meta信息放在dfs,在zk上主要会保存一个指向dfs路径的指针high-availability.storageDir: hdfs:///test_dir/recovery2/

需要特别注意的是:“high-availability.cluster-id”这个配置最好去掉,因为在 Yarn(以及Mesos)模式下,cluster-id 如果不配置的话,会配置成 Yarn 上的 Application ID ,从而可以保证唯一性。

官方原文: /developers/flink-basic-tutorial-1-environmental-construction/

如果觉得《[官方Flink入门笔记 ] 三 开发环境搭建和应用的配置 部署及运行》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。