大数据 flink KNOWU 2019-08-08 2024-11-13 前言 发展历史
官方介绍
组件栈
应用场景 所有的流式计算
Flink安装部署 local本地模式-了解 原理
操作
1.下载安装包
https://archive.apache.org/dist/flink/
2.上传flink-1.12.0-bin-scala_2.12.tgz到node1的指定目录
3.解压
tar -zxvf flink-1.12.0-bin-scala_2.12.tgz
4.如果出现权限问题,需要修改权限
chown -R root:root /export/server/flink-1.12.0
5.改名或创建软链接
mv flink-1.12.0 flink
ln -s /export/server/flink-1.12.0 /export/server/flink
测试
1.准备文件/root/words.txt
vim /root/words.txt
1 2 3 4 hello me you her hello me you hello me hello
2.启动Flink本地“集群”
/export/server/flink/bin/start-cluster.sh
3.使用jps可以查看到下面两个进程
- TaskManagerRunner
- StandaloneSessionClusterEntrypoint
4.访问Flink的Web UI
http://node1:8081/#/overview
slot在Flink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分配到slot来并行执行程序。
5.执行官方示例
1 /export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar --input /root/words.txt --output /root/out
6.停止Flink
/export/server/flink/bin/stop-cluster.sh
启动shell交互式窗口(目前所有Scala 2.12版本的安装包暂时都不支持 Scala Shell)
/export/server/flink/bin/start-scala-shell.sh local
执行如下命令
1 benv.readTextFile("/root/words.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()
退出shell
:quit
Standalone独立集群模式-了解 原理
操作
1.集群规划:
- 服务器: node1(Master + Slave): JobManager + TaskManager
- 服务器: node2(Slave): TaskManager
- 服务器: node3(Slave): TaskManager
2.修改flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
1 2 3 4 5 6 7 8 9 jobmanager.rpc.address: node1 taskmanager.numberOfTaskSlots: 2 web.submit.enable: true # 历史服务器 jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/ historyserver.web.address: node1 historyserver.web.port: 8082 historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/
2.修改masters
vim /export/server/flink/conf/masters
3.修改slaves
vim /export/server/flink/conf/workers
4.添加HADOOP_CONF_DIR环境变量
vim /etc/profile
1 export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
5.分发
scp -r /export/server/flink node2:/export/server/flink
scp -r /export/server/flink node3:/export/server/flink
scp /etc/profile node2:/etc/profile
scp /etc/profile node3:/etc/profile
或
1 for i in {2..3}; do scp -r flink node$i:$PWD; done
6.source
source /etc/profile
测试
1.启动集群,在node1上执行如下命令
/export/server/flink/bin/start-cluster.sh
或者单独启动
/export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
/export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all
2.启动历史服务器
/export/server/flink/bin/historyserver.sh start
3.访问Flink UI界面或使用jps查看
http://node1:8081/#/overview
http://node1:8082/#/overview
4.执行官方测试案例
1 /export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
6.停止Flink集群
/export/server/flink/bin/stop-cluster.sh
Standalone-HA高可用集群模式-了解 原理
操作
1.集群规划
- 服务器: node1(Master + Slave): JobManager + TaskManager
- 服务器: node2(Master + Slave): JobManager + TaskManager
- 服务器: node3(Slave): TaskManager
2.启动ZooKeeper
zkServer.sh status
zkServer.sh stop
zkServer.sh start
3.启动HDFS
/export/serves/hadoop/sbin/start-dfs.sh
4.停止Flink集群
/export/server/flink/bin/stop-cluster.sh
5.修改flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
增加如下内容G
1 2 3 4 5 state.backend: filesystem state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints high-availability: zookeeper high-availability.storageDir: hdfs://node1:8020/flink/ha/ high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
6.修改masters
vim /export/server/flink/conf/masters
7.同步
1 2 3 4 scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/ scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/ scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/ scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/
8.修改node2上的flink-conf.yaml
vim /export/server/flink/conf/flink-conf.yaml
1 jobmanager.rpc.address: node2
9.重新启动Flink集群,node1上执行
/export/server/flink/bin/stop-cluster.sh
/export/server/flink/bin/start-cluster.sh
10.使用jps命令查看
发现没有Flink相关进程被启动
11.查看日志
cat /export/server/flink/log/flink-root-standalonesession-0-node1.log
发现如下错误
因为在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar
12.下载jar包并在Flink的lib目录下放入该jar包并分发使Flink能够支持对Hadoop的操作
下载地址
https://flink.apache.org/downloads.html
13.放入lib目录
cd /export/server/flink/lib
14.分发
for i in {2..3}; do scp -r flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node$i:$PWD; done
15.重新启动Flink集群,node1上执行
/export/server/flink/bin/stop-cluster.sh
/export/server/flink/bin/start-cluster.sh
16.使用jps命令查看,发现三台机器已经ok
测试
1.访问WebUI
http://node1:8081/#/job-manager/config
http://node2:8081/#/job-manager/config
2.执行wc
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
3.kill掉其中一个master
4.重新执行wc,还是可以正常执行
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
3.停止集群
/export/server/flink/bin/stop-cluster.sh
Flink-On-Yarn-开发使用 原理
两种模式 Session会话模式
Job分离模式
操作
1.关闭yarn的内存检查
vim /export/server/hadoop/etc/hadoop/yarn-site.xml
1 2 3 4 5 6 7 8 9 <!-- 关闭yarn内存检查 --> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
2.分发
1 2 scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node2:/export/server/hadoop/etc/hadoop/yarn-site.xml scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node3:/export/server/hadoop/etc/hadoop/yarn-site.xml
3.重启yarn
/export/server/hadoop/sbin/stop-yarn.sh
/export/server/hadoop/sbin/start-yarn.sh
测试
Session会话模式
在Yarn上启动一个Flink集群,并重复使用该集群,后续提交的任务都是给该集群,资源会被一直占用,除非手动关闭该集群—-适用于大量的小任务
1.在yarn上启动一个Flink集群/会话,node1上执行以下命令
/export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
说明:
申请2个CPU、1600M内存
# -n 表示申请2个容器,这里指的就是多少个taskmanager
# -tm 表示每个TaskManager的内存大小
# -s 表示每个TaskManager的slots数量
# -d 表示以后台程序方式运行
注意:
该警告不用管
WARN org.apache.hadoop.hdfs.DFSClient - Caught exception
java.lang.InterruptedException
2.查看UI界面
http://node1:8088/cluster
3.使用flink run提交任务:
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
运行完之后可以继续运行其他的小任务
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
4.通过上方的ApplicationMaster可以进入Flink的管理界面
==5.关闭yarn-session:==
yarn application -kill application_1609508087977_0005
Job分离模式–用的更多
针对每个Flink任务在Yarn上启动一个独立的Flink集群并运行,结束后自动关闭并释放资源,—-适用于大任务
1.直接提交job
/export/server/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /export/server/flink/examples/batch/WordCount.jar
# -m jobmanager的地址
# -yjm 1024 指定jobmanager的内存信息
# -ytm 1024 指定taskmanager的内存信息
2.查看UI界面
http://node1:8088/cluster
参数说明
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 /export/server/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /export/server/flink/examples/batch/WordCount.jar /export/server/flink/bin/flink --help SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/export/server/flink/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/export/server/hadoop-2.7.5/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] ./flink <ACTION> [OPTIONS] [ARGUMENTS] The following actions are available: Action "run" compiles and runs a program. Syntax: run [OPTIONS] <jar-file> <arguments> "run" action options: -c,--class <classname> Class with the program entry point ("main()" method). Only needed if the JAR file does not specify the class in its manifest. -C,--classpath <url> Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}. -d,--detached If present, runs the job in detached mode -n,--allowNonRestoredState Allow to skip savepoint state that cannot be restored. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered. -p,--parallelism <parallelism> The parallelism with which to run the program. Optional flag to override the default value specified in the configuration. -py,--python <pythonFile> Python script with the program entry point. The dependent resources can be configured with the `--pyFiles` option. -pyarch,--pyArchives <arg> Add python archive files for job. The archive files will be extracted to the working directory of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory be specified. If the target directory name is specified, the archive file will be extracted to a name can directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. '#' could be used as the separator of the archive file path and the target directory name. Comma (',') could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files used in Python UDF (e.g.: --pyArchives file:///tmp/py37.zip,file:///tmp/data. zip#data --pyExecutable py37.zip/py37/bin/python). The data files could be accessed in Python UDF, e.g.: f = open('data/data.txt', 'r'). -pyexec,--pyExecutable <arg> Specify the path of the python interpreter used to execute the python UDF worker (e.g.: --pyExecutable /usr/local/bin/python3). The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.23.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements. -pyfs,--pyFiles <pythonFiles> Attach custom python files for job. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. The standard python resource file suffixes such as .py/.egg/.zip or directory are all supported. Comma (',') could be used as the separator to specify multiple files (e.g.: --pyFiles file:///tmp/myresource.zip,hdfs:///$na menode_address/myresource2.zip). -pym,--pyModule <pythonModule> Python module with the program entry point. This option must be used in conjunction with `--pyFiles`. -pyreq,--pyRequirements <arg> Specify a requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use '#' as the separator if the optional parameter exists (e.g.: --pyRequirements file:///tmp/requirements.txt#file:///t mp/cached_dir). -s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537). -sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C. Options for Generic CLI mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html -e,--executor <arg> DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the given job, which is equivalent to the "execution.target" config option. The currently available executors are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". -t,--target <arg> The deployment target for the given application, which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the "run-application" action the currently available targets are: "kubernetes-application", "yarn-application". Options for yarn-cluster mode: -d,--detached If present, runs the job in detached mode -m,--jobmanager <arg> Set to yarn-cluster to use YARN execution mode. -yat,--yarnapplicationType <arg> Set a custom application type for the application on YARN -yD <property=value> use value for given property -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -yh,--yarnhelp Help for the Yarn session CLI. -yid,--yarnapplicationId <arg> Attach to running YARN session -yj,--yarnjar <arg> Path to Flink jar file -yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN application -ynm,--yarnname <arg> Set a custom name for the application on YARN -yq,--yarnquery Display available YARN resources (memory, cores) -yqu,--yarnqueue <arg> Specify YARN queue. -ys,--yarnslots <arg> Number of slots per TaskManager -yt,--yarnship <arg> Ship files in the specified directory (t for transfer) -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Options for default mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink- docs-stable/ops/config.html -m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. Attention: This option is respected only if the high-availability configuration is NONE. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Action "run-application" runs an application in Application Mode. Syntax: run-application [OPTIONS] <jar-file> <arguments> Options for Generic CLI mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html -e,--executor <arg> DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the given job, which is equivalent to the "execution.target" config option. The currently available executors are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". -t,--target <arg> The deployment target for the given application, which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the "run-application" action the currently available targets are: "kubernetes-application", "yarn-application". Action "info" shows the optimized execution plan of the program (JSON). Syntax: info [OPTIONS] <jar-file> <arguments> "info" action options: -c,--class <classname> Class with the program entry point ("main()" method). Only needed if the JAR file does not specify the class in its manifest. -p,--parallelism <parallelism> The parallelism with which to run the program. Optional flag to override the default value specified in the configuration. Action "list" lists running and scheduled programs. Syntax: list [OPTIONS] "list" action options: -a,--all Show all programs and their JobIDs -r,--running Show only running programs and their JobIDs -s,--scheduled Show only scheduled programs and their JobIDs Options for Generic CLI mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html -e,--executor <arg> DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the given job, which is equivalent to the "execution.target" config option. The currently available executors are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". -t,--target <arg> The deployment target for the given application, which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the "run-application" action the currently available targets are: "kubernetes-application", "yarn-application". Options for yarn-cluster mode: -m,--jobmanager <arg> Set to yarn-cluster to use YARN execution mode. -yid,--yarnapplicationId <arg> Attach to running YARN session -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Options for default mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink- docs-stable/ops/config.html -m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. Attention: This option is respected only if the high-availability configuration is NONE. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Action "stop" stops a running program with a savepoint (streaming jobs only). Syntax: stop [OPTIONS] <Job ID> "stop" action options: -d,--drain Send MAX_WATERMARK before taking the savepoint and stopping the pipelne. -p,--savepointPath <savepointPath> Path to the savepoint (for example hdfs:///flink/savepoint-1537). If no directory is specified, the configured default will be used ("state.savepoints.dir"). Options for Generic CLI mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html -e,--executor <arg> DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the given job, which is equivalent to the "execution.target" config option. The currently available executors are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". -t,--target <arg> The deployment target for the given application, which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the "run-application" action the currently available targets are: "kubernetes-application", "yarn-application". Options for yarn-cluster mode: -m,--jobmanager <arg> Set to yarn-cluster to use YARN execution mode. -yid,--yarnapplicationId <arg> Attach to running YARN session -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Options for default mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink- docs-stable/ops/config.html -m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. Attention: This option is respected only if the high-availability configuration is NONE. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Action "cancel" cancels a running program. Syntax: cancel [OPTIONS] <Job ID> "cancel" action options: -s,--withSavepoint <targetDirectory> **DEPRECATION WARNING**: Cancelling a job with savepoint is deprecated. Use "stop" instead. Trigger savepoint and cancel job. The target directory is optional. If no directory is specified, the configured default directory (state.savepoints.dir) is used. Options for Generic CLI mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html -e,--executor <arg> DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the given job, which is equivalent to the "execution.target" config option. The currently available executors are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". -t,--target <arg> The deployment target for the given application, which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the "run-application" action the currently available targets are: "kubernetes-application", "yarn-application". Options for yarn-cluster mode: -m,--jobmanager <arg> Set to yarn-cluster to use YARN execution mode. -yid,--yarnapplicationId <arg> Attach to running YARN session -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Options for default mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink- docs-stable/ops/config.html -m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. Attention: This option is respected only if the high-availability configuration is NONE. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Action "savepoint" triggers savepoints for a running job or disposes existing ones. Syntax: savepoint [OPTIONS] <Job ID> [<target directory>] "savepoint" action options: -d,--dispose <arg> Path of savepoint to dispose. -j,--jarfile <jarfile> Flink program JAR file. Options for Generic CLI mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink-docs-stabl e/ops/config.html -e,--executor <arg> DEPRECATED: Please use the -t option instead which is also available with the "Application Mode". The name of the executor to be used for executing the given job, which is equivalent to the "execution.target" config option. The currently available executors are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". -t,--target <arg> The deployment target for the given application, which is equivalent to the "execution.target" config option. For the "run" action the currently available targets are: "remote", "local", "kubernetes-session", "yarn-per-job", "yarn-session". For the "run-application" action the currently available targets are: "kubernetes-application", "yarn-application". Options for yarn-cluster mode: -m,--jobmanager <arg> Set to yarn-cluster to use YARN execution mode. -yid,--yarnapplicationId <arg> Attach to running YARN session -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode Options for default mode: -D <property=value> Allows specifying multiple generic configuration options. The available options can be found at https://ci.apache.org/projects/flink/flink- docs-stable/ops/config.html -m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. Attention: This option is respected only if the high-availability configuration is NONE. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
Flink入门案例 前置说明
注意:入门案例使用DataSet后续就不再使用了,而是使用流批一体的DataStream
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/
准备环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > cn.bigdata</groupId > <artifactId > flink_study_47</artifactId > <version > 1.0-SNAPSHOT</version > <repositories > <repository > <id > aliyun</id > <url > http://maven.aliyun.com/nexus/content/groups/public/</url > </repository > <repository > <id > apache</id > <url > https://repository.apache.org/content/repositories/snapshots/</url > </repository > <repository > <id > cloudera</id > <url > https://repository.cloudera.com/artifactory/cloudera-repos/</url > </repository > </repositories > <properties > <encoding > UTF-8</encoding > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <maven.compiler.source > 1.8</maven.compiler.source > <maven.compiler.target > 1.8</maven.compiler.target > <java.version > 1.8</java.version > <scala.version > 2.12</scala.version > <flink.version > 1.12.0</flink.version > </properties > <dependencies > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-clients_2.12</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-scala_2.12</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-java</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-streaming-scala_2.12</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-streaming-java_2.12</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table-api-scala-bridge_2.12</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table-api-java-bridge_2.12</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table-planner_2.12</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table-planner-blink_2.12</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table-common</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-connector-kafka_2.12</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-sql-connector-kafka_2.12</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-connector-jdbc_2.12</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-csv</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-json</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.bahir</groupId > <artifactId > flink-connector-redis_2.11</artifactId > <version > 1.0</version > <exclusions > <exclusion > <artifactId > flink-streaming-java_2.11</artifactId > <groupId > org.apache.flink</groupId > </exclusion > <exclusion > <artifactId > flink-runtime_2.11</artifactId > <groupId > org.apache.flink</groupId > </exclusion > <exclusion > <artifactId > flink-core</artifactId > <groupId > org.apache.flink</groupId > </exclusion > <exclusion > <artifactId > flink-java</artifactId > <groupId > org.apache.flink</groupId > </exclusion > </exclusions > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-connector-hive_2.12</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.hive</groupId > <artifactId > hive-metastore</artifactId > <version > 2.1.0</version > </dependency > <dependency > <groupId > org.apache.hive</groupId > <artifactId > hive-exec</artifactId > <version > 2.1.0</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-shaded-hadoop-2-uber</artifactId > <version > 2.7.5-10.0</version > </dependency > <dependency > <groupId > org.apache.hbase</groupId > <artifactId > hbase-client</artifactId > <version > 2.1.0</version > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.38</version > </dependency > <dependency > <groupId > io.vertx</groupId > <artifactId > vertx-core</artifactId > <version > 3.9.0</version > </dependency > <dependency > <groupId > io.vertx</groupId > <artifactId > vertx-jdbc-client</artifactId > <version > 3.9.0</version > </dependency > <dependency > <groupId > io.vertx</groupId > <artifactId > vertx-redis-client</artifactId > <version > 3.9.0</version > </dependency > <dependency > <groupId > org.slf4j</groupId > <artifactId > slf4j-log4j12</artifactId > <version > 1.7.7</version > <scope > runtime</scope > </dependency > <dependency > <groupId > log4j</groupId > <artifactId > log4j</artifactId > <version > 1.2.17</version > <scope > runtime</scope > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.44</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > 1.18.2</version > <scope > provided</scope > </dependency > </dependencies > <build > <sourceDirectory > src/main/java</sourceDirectory > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.5.1</version > <configuration > <source > 1.8</source > <target > 1.8</target > </configuration > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-surefire-plugin</artifactId > <version > 2.18.1</version > <configuration > <useFile > false</useFile > <disableXmlReport > true</disableXmlReport > <includes > <include > **/*Test.*</include > <include > **/*Suite.*</include > </includes > </configuration > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-shade-plugin</artifactId > <version > 2.3</version > <executions > <execution > <phase > package</phase > <goals > <goal > shade</goal > </goals > <configuration > <filters > <filter > <artifact > *:*</artifact > <excludes > <exclude > META-INF/*.SF</exclude > <exclude > META-INF/*.DSA</exclude > <exclude > META-INF/*.RSA</exclude > </excludes > </filter > </filters > <transformers > <transformer implementation ="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" > <mainClass > </mainClass > </transformer > </transformers > </configuration > </execution > </executions > </plugin > </plugins > </build > </project >
代码实现-DataSet-了解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.AggregateOperator;import org.apache.flink.api.java.operators.UnsortedGrouping;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.util.Collector;public class WordCount { public static void main (String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> lines = env.fromElements("flink hadoop spark" , "flink hadoop spark" , "flink hadoop" , "flink" ); DataSet<String> words = lines.flatMap(new FlatMapFunction <String, String>() { @Override public void flatMap (String value, Collector<String> out) throws Exception { String[] arr = value.split(" " ); for (String word : arr) { out.collect(word); } } }); DataSet<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction <String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map (String value) throws Exception { return Tuple2.of(value, 1 ); } }); UnsortedGrouping<Tuple2<String, Integer>> grouped = wordAndOne.groupBy(0 ); AggregateOperator<Tuple2<String, Integer>> result = grouped.sum(1 ); result.print(); } }
代码实现-DataStream-匿名内部类-处理批
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class WordCount2 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> lines = env.fromElements("flink hadoop spark" , "flink hadoop spark" , "flink hadoop" , "flink" ); DataStream<String> words = lines.flatMap(new FlatMapFunction <String, String>() { @Override public void flatMap (String value, Collector<String> out) throws Exception { String[] arr = value.split(" " ); for (String word : arr) { out.collect(word); } } }); DataStream<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction <String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map (String value) throws Exception { return Tuple2.of(value, 1 ); } }); KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0); SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1 ); result.print(); env.execute(); } }
代码实现-DataStream-匿名内部类-处理流 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class WordCount3 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<String> lines = env.socketTextStream("node1" , 9999 ); DataStream<String> words = lines.flatMap(new FlatMapFunction <String, String>() { @Override public void flatMap (String value, Collector<String> out) throws Exception { String[] arr = value.split(" " ); for (String word : arr) { out.collect(word); } } }); DataStream<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction <String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map (String value) throws Exception { return Tuple2.of(value, 1 ); } }); KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0); SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1 ); result.print(); env.execute(); } }
代码实现-DataStream-Lambda
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;public class WordCount4 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<String> lines = env.fromElements("flink hadoop spark" , "flink hadoop spark" , "flink hadoop" , "flink" ); SingleOutputStreamOperator<String> words = lines.flatMap( (String value, Collector<String> out) -> Arrays.stream(value.split(" " )).forEach(out::collect) ).returns(Types.STRING); DataStream<Tuple2<String, Integer>> wordAndOne = words.map( (String value) -> Tuple2.of(value, 1 ) ).returns(Types.TUPLE(Types.STRING,Types.INT)); KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0); SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1 ); result.print(); env.execute(); } }
代码实现-On-Yarn-掌握 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;public class WordCount5_Yarn { public static void main (String[] args) throws Exception { ParameterTool parameterTool = ParameterTool.fromArgs(args); String output = "" ; if (parameterTool.has("output" )) { output = parameterTool.get("output" ); System.out.println("指定了输出路径使用:" + output); } else { output = "hdfs://node1:8020/wordcount/output47_" ; System.out.println("可以指定输出路径使用 --output ,没有指定使用默认的:" + output); } StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> lines = env.fromElements("flink hadoop spark" , "flink hadoop spark" , "flink hadoop" , "flink" ); SingleOutputStreamOperator<String> words = lines.flatMap( (String value, Collector<String> out) -> Arrays.stream(value.split(" " )).forEach(out::collect) ).returns(Types.STRING); DataStream<Tuple2<String, Integer>> wordAndOne = words.map( (String value) -> Tuple2.of(value, 1 ) ).returns(Types.TUPLE(Types.STRING, Types.INT)); KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0); SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1 ); System.setProperty("HADOOP_USER_NAME" , "root" ); result.writeAsText(output + System.currentTimeMillis()).setParallelism(1 ); env.execute(); } }
打包改名上传
提交
1 /export/server/flink/bin/flink run -Dexecution.runtime-mode=BATCH -m yarn-cluster -yjm 1024 -ytm 1024 -c cn.flink.hello.WordCount5_Yarn /root/wc.jar --output hdfs://node1:8020/wordcount/output_xx
注意
1 2 3 4 RuntimeExecutionMode.BATCH RuntimeExecutionMode.STREAMING RuntimeExecutionMode.AUTOMATIC
在后续的Flink开发中,把一切数据源看做流即可或者使用AUTOMATIC就行了
Flink1.12使用StreamingFileSink写hdfs会有truncate file fail失败问题。
解决方案:
1.使用cdh版本的hadoop
2.手动释放租约(spark读取的时候进行修复)
Flink核心概念 角色分工
不同角色在作业流程中的位置
DataFlow
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/glossary.html
DataFlow、Operator、Partition、Parallelism、SubTask
OperatorChain和Task
TaskSlot和TaskSlotSharing
执行流程图生成
流处理说明 无边界的流unbounded stream: 真正的流数据
Operator分类 Source
基于集合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;public class SourceDemo01_Collection { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<String> ds1 = env.fromElements("hadoop spark flink" , "hadoop spark flink" ); DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop spark flink" , "hadoop spark flink" )); DataStream<Long> ds3 = env.generateSequence(1 , 100 ); DataStream<Long> ds4 = env.fromSequence(1 , 100 ); ds1.print(); ds2.print(); ds3.print(); ds4.print(); env.execute(); } }
基于文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo02_File { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<String> ds1 = env.readTextFile("data/input/words.txt" ); DataStream<String> ds2 = env.readTextFile("data/input/dir" ); DataStream<String> ds3 = env.readTextFile("data/input/wordcount.txt.gz" ); ds1.print(); ds2.print(); ds3.print(); env.execute(); } }
基于Socket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class SourceDemo03_Socket { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<String> lines = env.socketTextStream("node1" , 9999 ); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction <String, Tuple2<String, Integer>>() { @Override public void flatMap (String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] arr = value.split(" " ); for (String word : arr) { out.collect(Tuple2.of(word, 1 )); } } }); SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1 ); result.print(); env.execute(); } }
自定义Source-随机订单数据 注意: lombok的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Random;import java.util.UUID;public class SourceDemo04_Customer { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<Order> orderDS = env.addSource(new MyOrderSource ()).setParallelism(2 ); orderDS.print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String id; private Integer userId; private Integer money; private Long createTime; } public static class MyOrderSource extends RichParallelSourceFunction <Order>{ private Boolean flag = true ; @Override public void run (SourceContext<Order> ctx) throws Exception { Random random = new Random (); while (flag) { String oid = UUID.randomUUID().toString(); int userId = random.nextInt(3 ); int money = random.nextInt(101 ); long createTime = System.currentTimeMillis(); ctx.collect(new Order (oid,userId,money,createTime)); Thread.sleep(1000 ); } } @Override public void cancel () { flag = false ; } } }
自定义Source-MySQL
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;public class SourceDemo05_Customer_MySQL { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<Student> studentDS = env.addSource(new MySQLSource ()).setParallelism(1 ); studentDS.print(); env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Student { private Integer id; private String name; private Integer age; } public static class MySQLSource extends RichParallelSourceFunction <Student> { private boolean flag = true ; private Connection conn = null ; private PreparedStatement ps = null ; private ResultSet rs = null ; @Override public void open (Configuration parameters) throws Exception { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata" , "root" , "root" ); String sql = "select id,name,age from t_student" ; ps = conn.prepareStatement(sql); } @Override public void run (SourceContext<Student> ctx) throws Exception { while (flag) { rs = ps.executeQuery(); while (rs.next()) { int id = rs.getInt("id" ); String name = rs.getString("name" ); int age = rs.getInt("age" ); ctx.collect(new Student (id,name,age)); } Thread.sleep(5000 ); } } @Override public void cancel () { flag = false ; } @Override public void close () throws Exception { if (conn != null ) conn.close(); if (ps != null ) ps.close(); if (rs != null ) rs.close(); } } }
Kafka Consumer/Source 参数
env.addSource(new Kafka Consumer/Source(参数))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaComsumerDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); Properties props = new Properties (); props.setProperty("bootstrap.servers" , "node1:9092" ); props.setProperty("group.id" , "flink" ); props.setProperty("auto.offset.reset" ,"latest" ); props.setProperty("flink.partition-discovery.interval-millis" ,"5000" ); props.setProperty("enable.auto.commit" , "true" ); props.setProperty("auto.commit.interval.ms" , "2000" ); FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer <String>("flink_kafka" , new SimpleStringSchema (), props); DataStream<String> kafkaDS = env.addSource(kafkaSource); kafkaDS.print(); env.execute(); } }
基本操作 常见的transformation算子:map/filter/rebalance/flatMap/keyBy/sum/reduce…
和之前学习的Scala/Spark里面的一样的意思
需求
对流数据中的单词进行统计,排除敏感词TMD(Theater Missile Defense 战区导弹防御)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class TransformationDemo01 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<String> lines = env.socketTextStream("node1" , 9999 ); DataStream<String> words = lines.flatMap(new FlatMapFunction <String, String>() { @Override public void flatMap (String value, Collector<String> out) throws Exception { String[] arr = value.split(" " ); for (String word : arr) { out.collect(word); } } }); DataStream<String> filted = words.filter(new FilterFunction <String>() { @Override public boolean filter (String value) throws Exception { return !value.equals("TMD" ); } }); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filted.map(new MapFunction <String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map (String value) throws Exception { return Tuple2.of(value, 1 ); } }); KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0); SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.reduce(new ReduceFunction <Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce (Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }); result.print(); env.execute(); } }
合并和连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.streaming.api.datastream.ConnectedStreams;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class TransformationDemo02 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<String> ds1 = env.fromElements("hadoop" , "spark" , "flink" ); DataStream<String> ds2 = env.fromElements("hadoop" , "spark" , "flink" ); DataStream<Long> ds3 = env.fromElements(1L , 2L , 3L ); DataStream<String> result1 = ds1.union(ds2); ConnectedStreams<String, String> result2 = ds1.connect(ds2); ConnectedStreams<String, Long> result3 = ds1.connect(ds3); SingleOutputStreamOperator<String> result = result3.map(new CoMapFunction <String, Long, String>() { @Override public String map1 (String value) throws Exception { return "String:" + value; } @Override public String map2 (Long value) throws Exception { return "Long:" + value; } }); result1.print(); result.print(); env.execute(); } }
拆分和选择
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.ProcessFunction;import org.apache.flink.util.Collector;import org.apache.flink.util.OutputTag;public class TransformationDemo03 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStreamSource<Integer> ds = env.fromElements(1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ); OutputTag<Integer> oddTag = new OutputTag <>("奇数" , TypeInformation.of(Integer.class)); OutputTag<Integer> evenTag = new OutputTag <>("偶数" ,TypeInformation.of(Integer.class)); SingleOutputStreamOperator<Integer> result = ds.process(new ProcessFunction <Integer, Integer>() { @Override public void processElement (Integer value, Context ctx, Collector<Integer> out) throws Exception { if (value % 2 == 0 ) { ctx.output(evenTag, value); } else { ctx.output(oddTag, value); } } }); DataStream<Integer> oddResult = result.getSideOutput(oddTag); DataStream<Integer> evenResult = result.getSideOutput(evenTag); System.out.println(oddTag); System.out.println(evenTag); oddResult.print("奇数:" ); evenResult.print("偶数:" ); env.execute(); } }
rebalance重平衡分区 解决数据倾斜的问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TransformationDemo04 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<Long> longDS = env.fromSequence(0 , 100 ); DataStream<Long> filterDS = longDS.filter(new FilterFunction <Long>() { @Override public boolean filter (Long num) throws Exception { return num > 10 ; } }); SingleOutputStreamOperator<Tuple2<Integer, Integer>> result1 = filterDS .map(new RichMapFunction <Long, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> map (Long value) throws Exception { int subTaskId = getRuntimeContext().getIndexOfThisSubtask(); return Tuple2.of(subTaskId, 1 ); } }).keyBy(t -> t.f0).sum(1 ); SingleOutputStreamOperator<Tuple2<Integer, Integer>> result2 = filterDS.rebalance() .map(new RichMapFunction <Long, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> map (Long value) throws Exception { int subTaskId = getRuntimeContext().getIndexOfThisSubtask(); return Tuple2.of(subTaskId, 1 ); } }).keyBy(t -> t.f0).sum(1 ); result1.print("result1" ); result2.print("result2" ); env.execute(); } }
其他分区操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.Partitioner;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class TransformationDemo05 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<String> linesDS = env.readTextFile("data/input/words.txt" ); SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = linesDS.flatMap(new FlatMapFunction <String, Tuple2<String, Integer>>() { @Override public void flatMap (String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.split(" " ); for (String word : words) { out.collect(Tuple2.of(word, 1 )); } } }); DataStream<Tuple2<String, Integer>> result1 = tupleDS.global(); DataStream<Tuple2<String, Integer>> result2 = tupleDS.broadcast(); DataStream<Tuple2<String, Integer>> result3 = tupleDS.forward(); DataStream<Tuple2<String, Integer>> result4 = tupleDS.shuffle(); DataStream<Tuple2<String, Integer>> result5 = tupleDS.rebalance(); DataStream<Tuple2<String, Integer>> result6 = tupleDS.rescale(); DataStream<Tuple2<String, Integer>> result7 = tupleDS.partitionCustom(new MyPartitioner (), t -> t.f0); result1.print("result1" ); result2.print("result2" ); result3.print("result3" ); result4.print("result4" ); result5.print("result5" ); result6.print("result6" ); result7.print("result7" ); env.execute(); } public static class MyPartitioner implements Partitioner <String>{ @Override public int partition (String key, int numPartitions) { return 0 ; } } }
Sink 基于控制台和文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SinkDemo01 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<String> ds = env.readTextFile("data/input/words.txt" ); ds.print(); ds.print("输出标识" ); ds.printToErr(); ds.printToErr("输出标识" ); ds.writeAsText("data/output/result1" ).setParallelism(1 ); ds.writeAsText("data/output/result2" ).setParallelism(2 ); env.execute(); } }
自定义Sink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;public class SinkDemo02 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<Student> studentDS = env.fromElements(new Student (null , "tony" , 18 )); studentDS.addSink(new MySQLSink ()); env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Student { private Integer id; private String name; private Integer age; } public static class MySQLSink extends RichSinkFunction <Student> { private Connection conn = null ; private PreparedStatement ps = null ; @Override public void open (Configuration parameters) throws Exception { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata" , "root" , "root" ); String sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?);" ; ps = conn.prepareStatement(sql); } @Override public void invoke (Student value, Context context) throws Exception { ps.setString(1 ,value.getName()); ps.setInt(2 ,value.getAge()); ps.executeUpdate(); } @Override public void close () throws Exception { if (conn != null ) conn.close(); if (ps != null ) ps.close(); } } }
JDBC 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.connector.jdbc.JdbcConnectionOptions;import org.apache.flink.connector.jdbc.JdbcSink;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class JDBCDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<Student> studentDS = env.fromElements(new Student (null , "tony2" , 18 )); studentDS.addSink(JdbcSink.sink( "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)" , (ps, value) -> { ps.setString(1 , value.getName()); ps.setInt(2 , value.getAge()); }, new JdbcConnectionOptions .JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://localhost:3306/bigdata" ) .withUsername("root" ) .withPassword("root" ) .withDriverName("com.mysql.jdbc.Driver" ) .build())); env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Student { private Integer id; private String name; private Integer age; } }
总结
原生的 JdbcSink 是一种高效且易于使用的解决方案,但当你需要:
执行复杂的 SQL 操作(如 INSERT ... ON DUPLICATE KEY UPDATE)
手动管理事务或控制批量写入
根据数据特征动态路由到不同的数据库或表
时,JdbcSink 的灵活性不足。此时,自定义 RichSinkFunction 或自定义 JdbcSink 是更合适的解决方案。
Kafka Producer/Sink 控制台生成者 —> flink_kafka主题 –> Flink –>etl —> flink_kafka2主题—>控制台消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;public class KafkaSinkDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); Properties props = new Properties (); props.setProperty("bootstrap.servers" , "node1:9092" ); props.setProperty("group.id" , "flink" ); props.setProperty("auto.offset.reset" ,"latest" ); props.setProperty("flink.partition-discovery.interval-millis" ,"5000" ); props.setProperty("enable.auto.commit" , "true" ); props.setProperty("auto.commit.interval.ms" , "2000" ); FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer <String>("flink_kafka" , new SimpleStringSchema (), props); DataStream<String> kafkaDS = env.addSource(kafkaSource); SingleOutputStreamOperator<String> etlDS = kafkaDS.filter(new FilterFunction <String>() { @Override public boolean filter (String value) throws Exception { return value.contains("success" ); } }); etlDS.print(); Properties props2 = new Properties (); props2.setProperty("bootstrap.servers" , "node1:9092" ); FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer <>("flink_kafka2" , new SimpleStringSchema (), props2); etlDS.addSink(kafkaSink); env.execute(); } }
Redis https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
需求:
从Socket接收实时流数据,做WordCount,并将结果写入到Redis
数据结构使用:
单词:数量 (key-String, value-String)
wcresult: 单词:数量 (key-String, value-Hash)
注意: Redis的Key始终是String, value可以是:String/Hash/List/Set/有序Set
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.redis.RedisSink;import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import org.apache.flink.util.Collector;public class RedisDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<String> lines = env.socketTextStream("node1" , 9999 ); SingleOutputStreamOperator<Tuple2<String, Integer>> result = lines.flatMap(new FlatMapFunction <String, Tuple2<String, Integer>>() { @Override public void flatMap (String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] arr = value.split(" " ); for (String word : arr) { out.collect(Tuple2.of(word, 1 )); } } }).keyBy(t -> t.f0).sum(1 ); result.print(); FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig .Builder().setHost("127.0.0.1" ).build(); RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink <Tuple2<String, Integer>>(conf,new MyRedisMapper ()); result.addSink(redisSink); env.execute(); } public static class MyRedisMapper implements RedisMapper <Tuple2<String, Integer>>{ @Override public RedisCommandDescription getCommandDescription () { return new RedisCommandDescription (RedisCommand.HSET,"wcresult" ); } @Override public String getKeyFromData (Tuple2<String, Integer> t) { return t.f0; } @Override public String getValueFromData (Tuple2<String, Integer> t) { return t.f1.toString(); } } }
Flink四大基石
Window 窗口的分类
基于时间的滑动窗口(掌握)
基于时间的滚动窗口(掌握)
基于数量的滑动窗口(了解)
基于数量的滚动窗口(了解)
API
基于时间的滚动和滑动-掌握
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;public class WindowDemo_1_2 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<String> lines = env.socketTextStream("node1" , 9999 ); SingleOutputStreamOperator<CartInfo> carDS = lines.map(new MapFunction <String, CartInfo>() { @Override public CartInfo map (String value) throws Exception { String[] arr = value.split("," ); return new CartInfo (arr[0 ], Integer.parseInt(arr[1 ])); } }); KeyedStream<CartInfo, String> keyedDS = carDS.keyBy(CartInfo::getSensorId); SingleOutputStreamOperator<CartInfo> result1 = keyedDS .window(TumblingProcessingTimeWindows.of(Time.seconds(5 ))) .sum("count" ); SingleOutputStreamOperator<CartInfo> result2 = keyedDS .window(SlidingProcessingTimeWindows.of(Time.seconds(10 ),Time.seconds(5 ))) .sum("count" ); result2.print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId; private Integer count; } }
基于数量的滚动和滑动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class WindowDemo_3_4 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<String> lines = env.socketTextStream("node1" , 9999 ); SingleOutputStreamOperator<CartInfo> carDS = lines.map(new MapFunction <String, CartInfo>() { @Override public CartInfo map (String value) throws Exception { String[] arr = value.split("," ); return new CartInfo (arr[0 ], Integer.parseInt(arr[1 ])); } }); KeyedStream<CartInfo, String> keyedDS = carDS.keyBy(CartInfo::getSensorId); SingleOutputStreamOperator<CartInfo> result1 = keyedDS .countWindow(5 ) .sum("count" ); SingleOutputStreamOperator<CartInfo> result2 = keyedDS .countWindow(5 ,3 ) .sum("count" ); result2.print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId; private Integer count; } }
Session会话窗口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;import org.apache.flink.streaming.api.windowing.time.Time;public class WindowDemo_5 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<String> lines = env.socketTextStream("node1" , 9999 ); SingleOutputStreamOperator<CartInfo> carDS = lines.map(new MapFunction <String, CartInfo>() { @Override public CartInfo map (String value) throws Exception { String[] arr = value.split("," ); return new CartInfo (arr[0 ], Integer.parseInt(arr[1 ])); } }); KeyedStream<CartInfo, String> keyedDS = carDS.keyBy(CartInfo::getSensorId); SingleOutputStreamOperator<CartInfo> result = keyedDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10 ))) .sum("count" ); result.print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId; private Integer count; } }
session window(会话窗口):一段持续时间内的元素为一个窗口,超过时间间隙的归到另一个窗口。比如我们在浏览器上登录访问某网站,会分配一个session,session在有效期内可以持续访问,当超过有效期需要重新登录,session就相当于一个窗口。
Time/Watermarker 时间分类
EventTime的重要性和Watermarker的引入
Watermarker详解 1.Watermarker本质是时间戳
2.Watermarker = 当前进来的数据最大的事件时间 - ==最大允许的数据延迟时间或乱序时间==
3.Watermarker 可以通过改变窗口触发计算时机来解决一定程度上的数据乱序或延迟达到的问题
==4.Watermarker >= 窗口结束时间 时触发窗口计算==
5.当前的最大的事件时间 - 最大允许的数据延迟时间或乱序时间>= 窗口结束时间时触发窗口计算
6.当前的最大的事件时间 >= 窗口结束时间 +最大允许的数据延迟时间或乱序时间时触发窗口计算
7.在 [window_start_time,window_end_time) 中有数据存在,这个窗口是左闭右开的。
D不在10:00:00-10:10:00窗口进行计算
代码演示-验证版-了解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.commons.lang3.time.FastDateFormat;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.UUID;public class WatermakerDemo02_Check { public static void main (String[] args) throws Exception { FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss" ); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Order> orderDS = env.addSource(new SourceFunction <Order>() { private boolean flag = true ; @Override public void run (SourceContext<Order> ctx) throws Exception { Random random = new Random (); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3 ); int money = random.nextInt(100 ); long eventTime = System.currentTimeMillis() - random.nextInt(5 ) * 1000 ; System.out.println("发送的数据为: " +userId + " : " + df.format(eventTime)); ctx.collect(new Order (orderId, userId, money, eventTime)); Thread.sleep(1000 ); } } @Override public void cancel () { flag = false ; } }); DataStream<Order> watermakerDS = orderDS .assignTimestampsAndWatermarks( new WatermarkStrategy <Order>() { @Override public WatermarkGenerator<Order> createWatermarkGenerator (WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator <Order>() { private int userId = 0 ; private long eventTime = 0L ; private final long outOfOrdernessMillis = 3000 ; private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1 ; @Override public void onEvent (Order event, long eventTimestamp, WatermarkOutput output) { userId = event.userId; eventTime = event.eventTime; maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit (WatermarkOutput output) { Watermark watermark = new Watermark (maxTimestamp - outOfOrdernessMillis - 1 ); System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水印时间:" + df.format(watermark.getTimestamp())); output.emitWatermark(watermark); } }; } }.withTimestampAssigner((order, timestamp) -> order.getEventTime()) ); SingleOutputStreamOperator<String> result = watermakerDS .keyBy(Order::getUserId) .window(TumblingEventTimeWindows.of(Time.seconds(5 ))) .apply(new WindowFunction <Order, String, Integer, TimeWindow>() { @Override public void apply (Integer key, TimeWindow window, Iterable<Order> orders, Collector<String> out) throws Exception { List<String> list = new ArrayList <>(); for (Order order : orders) { Long eventTime = order.eventTime; String formatEventTime = df.format(eventTime); list.add(formatEventTime); } String start = df.format(window.getStart()); String end = df.format(window.getEnd()); String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s" , key.toString(), start, end, list.toString()); out.collect(outStr); } }); result.print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
使用watermark要注意的问题: 1.某一个分区一直没收到数据,导致没更新低水位,不触发窗口计算–>官方解决方案:1.11版本加入了idle机制。
(structed streaming 是固定周期触发器触发,所以flink也可以通过自定义triger 来实现)
2.某个分区生成的水印太快,数据也会滞后输出–>官方解决方案:1.15版本支持了水印对齐。
自定义trigger 在Flink中,Trigger(触发器)和Window(窗口)的设定是相互独立的。Window定义了数据流应该被切分成多大的块进行处理,而Trigger则决定了何时开始这些处理。
如果你的Trigger设定为5秒,而Window设定为10秒,那么理论上来说,每5秒触发器就会检查一次是否满足触发条件。如果满足触发条件(例如数据量达到阈值),那么就会对当前窗口内的数据进行计算。而无论是否满足触发条件,每10秒窗口都会关闭,并对窗口内的数据进行计算 1 2 3 4 。
所以,你的计算可能会在5秒时就触发,也可能会在10秒时触发,具体取决于你的触发条件。如果你的触发条件仅仅是时间,那么计算将会在5秒和10秒时各触发一次 1 2 3 4 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 import groovy.lang.Tuple;import org.apache.commons.lang3.time.FastDateFormat;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.RichFlatMapFunction;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.triggers.*;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.util.Collector;import java.sql.Timestamp;import java.time.Duration;import java.util.Properties;import java.util.Random;import java.util.UUID;public class WaterMaker { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3 ); ParameterTool param = ParameterTool.fromArgs(args); String topic = param.get("topic" , "KafkaWordCount" ); String group_id = param.get("group_id" , "flink_wordcount" ); boolean isWriteKafka = param.getBoolean("isWriteKafka" , false ); boolean isWriteHdfs = param.getBoolean("isWriteHdfs" , false ); boolean isWriteMysql = param.getBoolean("isWriteMysql" , false ); Properties prop = new Properties (); prop.setProperty("bootstrap.servers" , "node1:9092,node2:9092,node3:9092" ); prop.setProperty("group.id" , group_id); prop.setProperty("auto.offset.reset" , "latest" ); prop.setProperty("enable.auto.commit" , "true" ); prop.setProperty("key.deserializer" , "StringDeserializer" ); prop.setProperty("value.deserializer" , "StringDeserializer" ); FlinkKafkaConsumer<String> kafka = new FlinkKafkaConsumer <>(topic, new SimpleStringSchema (), prop); DataStreamSource<String> source = env.addSource(kafka); SingleOutputStreamOperator<Order> result = source.rebalance().map(new RichMapFunction <String, Order>() { @Override public Order map (String value) throws Exception { String[] timeAndWord = value.split("," ); Timestamp timestamp = Timestamp.valueOf(timeAndWord[0 ]); Order order = new Order (); order.setOrderId(timeAndWord[1 ]); order.setMoney(1 ); order.setEvenTime(timestamp.getTime()); return order; } }) .assignTimestampsAndWatermarks(WatermarkStrategy .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10 )) .withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime())) .keyBy(t -> t.getOrderId()) .window(TumblingEventTimeWindows.of(Time.seconds(10 ))) .trigger(ProcessingTimeoutTrigger.of(EventTimeTrigger.create(), Duration.ofSeconds(5 ),false ,true )) .sum("money" ); result.print("结果是:" ); env.execute(); } public static class Order { private String orderId; private Integer userId; private Integer money; private Long evenTime; public Order (String orderId, Integer userId, Integer money, Long evenTime) { this .orderId = orderId; this .userId = userId; this .money = money; this .evenTime = evenTime; } public Order () { } public String getOrderId () { return orderId; } public void setOrderId (String orderId) { this .orderId = orderId; } public Integer getUserId () { return userId; } public void setUserId (Integer userId) { this .userId = userId; } public Integer getMoney () { return money; } public void setMoney (Integer money) { this .money = money; } public Long getEvenTime () { return evenTime; } public void setEvenTime (Long evenTime) { this .evenTime = evenTime; } @Override public String toString () { FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss" ); return "Order{" + "orderId='" + orderId + '\'' + ", userId=" + userId + ", money=" + money + ", evenTime=" + df.format(evenTime) + '}' ; } } }
模拟实现打印定期触发trigger
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.typeutils.base.LongSerializer;import org.apache.flink.runtime.operators.TaskContext;import org.apache.flink.streaming.api.windowing.triggers.Trigger;import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;import org.apache.flink.streaming.api.windowing.windows.Window;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.sql.Timestamp;import java.time.Duration;public class EventAndProcessingTimeOutTrigger <T, W extends Window > extends Trigger <T, W> { private static final Logger LOG= LoggerFactory.getLogger(EventAndProcessingTimeOutTrigger.class); private static final long serialVersionUID = 1L ; private final long interval; private final boolean resetTimerOnNewRecord; private final ValueStateDescriptor<Long> timeoutStateDesc = new ValueStateDescriptor <>("timeout" , LongSerializer.INSTANCE); private EventAndProcessingTimeOutTrigger (long interval,boolean resetTimerOnNewRecord) { this .interval=interval; this .resetTimerOnNewRecord=resetTimerOnNewRecord; } @Override public TriggerResult onElement (T element, long timestamp, W window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { return TriggerResult.FIRE; } else { ValueState<Long> timeoutState = ctx.getPartitionedState(this .timeoutStateDesc); long nextFireTimestamp = ctx.getCurrentProcessingTime() + this .interval; Long timeoutTimestamp = timeoutState.value(); if (timeoutTimestamp != null && resetTimerOnNewRecord) { ctx.deleteProcessingTimeTimer(timeoutTimestamp); timeoutState.clear(); timeoutTimestamp = null ; } if (timeoutTimestamp == null ) { timeoutState.update(nextFireTimestamp); ctx.registerProcessingTimeTimer(nextFireTimestamp); } ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } @Override public TriggerResult onProcessingTime (long time, W window, TriggerContext ctx) throws Exception { long maxTimestamp = window.maxTimestamp(); this .clear(window, ctx); System.out.println("maxTimestamp = " + new Timestamp (maxTimestamp)+" CurrentWatermark = " +new Timestamp (ctx.getCurrentWatermark())); LOG.warn("LOG: maxTimestamp = " + new Timestamp (maxTimestamp)+" CurrentWatermark = " +new Timestamp (ctx.getCurrentWatermark())); ctx.registerEventTimeTimer(maxTimestamp +this .interval); return TriggerResult.FIRE; } @Override public TriggerResult onEventTime (long time, W window, TriggerContext ctx) throws Exception { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } @Override public boolean canMerge () { return true ; } @Override public void onMerge (W window, OnMergeContext ctx) throws Exception { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } @Override public void clear (W window, TriggerContext ctx) throws Exception { ValueState<Long> timeoutTimestampState = ctx.getPartitionedState(this .timeoutStateDesc); Long timeoutTimestamp = timeoutTimestampState.value(); if (timeoutTimestamp != null ) { ctx.deleteProcessingTimeTimer(timeoutTimestamp); timeoutTimestampState.clear(); } ctx.deleteEventTimeTimer(window.maxTimestamp()); } public static <T, W extends Window > EventAndProcessingTimeOutTrigger<T, W> of (Duration timeout) { return new EventAndProcessingTimeOutTrigger <>(timeout.toMillis(),true ); } public static <T, W extends Window > EventAndProcessingTimeOutTrigger<T, W> of (Duration timeout,boolean resetTimerOnNewRecord) { return new EventAndProcessingTimeOutTrigger <>(timeout.toMillis(),resetTimerOnNewRecord); } }
侧道输出解决数据丢失-掌握
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 public class WaterMaker { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); DataStreamSource<String> source = env.socketTextStream("node1" , 9999 ); OutputTag<Order> seriousLateOutputTag = new OutputTag <>("seriousLateOutputTag" , TypeInformation.of(Order.class)); SingleOutputStreamOperator<Order> result = source.rebalance().map(new RichMapFunction <String, Order>() { @Override public Order map (String value) throws Exception { String[] timeAndWord = value.split("," ); Timestamp timestamp = Timestamp.valueOf(timeAndWord[0 ]); Order order = new Order (); order.setOrderId(timeAndWord[1 ]); order.setMoney(1 ); order.setEvenTime(timestamp.getTime()); return order; } }) .assignTimestampsAndWatermarks(WatermarkStrategy .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10 )) .withTimestampAssigner((element, recordTimestamp) -> element.getEvenTime())) .keyBy(t -> t.getOrderId()) .window(TumblingEventTimeWindows.of(Time.seconds(10 ))) .trigger(EventAndProcessingTimeOutTrigger.of(Duration.ofSeconds(5 ),false )) .allowedLateness(Time.seconds(15 )) .sideOutputLateData(seriousLateOutputTag) .sum("money" ); result.print("结果是:" ); result.getSideOutput(seriousLateOutputTag).print("严重迟到丢失的数据:" ); env.execute(); } public static class Order { private String orderId; private Integer userId; private Integer money; private Long evenTime; public Order (String orderId, Integer userId, Integer money, Long evenTime) { this .orderId = orderId; this .userId = userId; this .money = money; this .evenTime = evenTime; } public Order () { } public String getOrderId () { return orderId; } public void setOrderId (String orderId) { this .orderId = orderId; } public Integer getUserId () { return userId; } public void setUserId (Integer userId) { this .userId = userId; } public Integer getMoney () { return money; } public void setMoney (Integer money) { this .money = money; } public Long getEvenTime () { return evenTime; } public void setEvenTime (Long evenTime) { this .evenTime = evenTime; } @Override public String toString () { FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss" ); return "Order{" + "orderId='" + orderId + '\'' + ", userId=" + userId + ", money=" + money + ", evenTime=" + df.format(evenTime) + '}' ; } } }
State 无状态计算和有状态计算
hello –> (hello,1)
hello –> (hello,1)
hello , (hello,1)
hello , (hello,2)
状态分类
State
Managed State–开发中推荐使用 : Fink自动管理/优化,支持多种数据结构
KeyState–只能在keyedStream上使用,支持多种数据结构
OperatorState–一般用在Source上,支持ListState
RawState–完全由用户自己管理,只支持byte[],只能在自定义Operator上使用
分类详细图解:
ManagedState-keyState https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class StateDemo01_KeyState { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<Tuple2<String, Long>> tupleDS = env.fromElements( Tuple2.of("北京" , 1L ), Tuple2.of("上海" , 2L ), Tuple2.of("北京" , 6L ), Tuple2.of("上海" , 8L ), Tuple2.of("北京" , 3L ), Tuple2.of("上海" , 4L ) ); DataStream<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0).maxBy(1 ); DataStream<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0).map(new RichMapFunction <Tuple2<String, Long>, Tuple3<String, Long, Long>>() { private ValueState<Long> maxValueState; @Override public void open (Configuration parameters) throws Exception { ValueStateDescriptor stateDescriptor = new ValueStateDescriptor ("maxValueState" , Long.class); maxValueState = getRuntimeContext().getState(stateDescriptor); } @Override public Tuple3<String, Long, Long> map (Tuple2<String, Long> value) throws Exception { Long currentValue = value.f1; Long historyValue = maxValueState.value(); if (historyValue == null || currentValue > historyValue) { historyValue = currentValue; maxValueState.update(historyValue); return Tuple3.of(value.f0, currentValue, historyValue); } else { return Tuple3.of(value.f0, currentValue, historyValue); } } }); result2.print(); env.execute(); } }
ManagerState-OperatorState https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.state.ListState;import org.apache.flink.api.common.state.ListStateDescriptor;import org.apache.flink.runtime.state.FunctionInitializationContext;import org.apache.flink.runtime.state.FunctionSnapshotContext;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Iterator;public class StateDemo02_OperatorState { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1 ); env.enableCheckpointing(1000 ); env.setStateBackend(new FsStateBackend ("file:///D:/ckp" )); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2 , 3000 )); DataStreamSource<String> ds = env.addSource(new MyKafkaSource ()).setParallelism(1 ); ds.print(); env.execute(); } public static class MyKafkaSource extends RichParallelSourceFunction <String> implements CheckpointedFunction { private boolean flag = true ; private ListState<Long> offsetState = null ; private Long offset = 0L ; @Override public void initializeState (FunctionInitializationContext context) throws Exception { ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor <>("offsetState" , Long.class); offsetState = context.getOperatorStateStore().getListState(stateDescriptor); } @Override public void run (SourceContext<String> ctx) throws Exception { Iterator<Long> iterator = offsetState.get().iterator(); if (iterator.hasNext()){ offset = iterator.next(); } while (flag){ offset += 1 ; int subTaskId = getRuntimeContext().getIndexOfThisSubtask(); ctx.collect("subTaskId:" + subTaskId + ",当前的offset值为:" +offset); Thread.sleep(1000 ); if (offset % 5 == 0 ){ throw new Exception ("bug出现了....." ); } } } @Override public void snapshotState (FunctionSnapshotContext context) throws Exception { offsetState.clear(); offsetState.add(offset); } @Override public void cancel () { flag = false ; } } }
state的定时器 和TTL (联想redis) ctx.timerService().registerProcessingTimeTimer(long time)注册,到点时触发 onTimer操作。
使用 flink 进行实时计算中,会遇到一些状态数不断累积,导致状态量越来越大的情形。
例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的 GROUP BY 语句,以及执行了没有时间窗口限制的双流 JOIN 等等操作。
对于这些情况,经常导致堆内存出现 OOM,或者堆外内存(RocksDB)用量持续增长导致超出容器的配额上限,造成作业的频繁崩溃。从 Flink 1.6 版本开始引入了State TTL 特性 ,该特性可以允许对作业中定义的 Keyed 状态进行超时自动清理,对于Table API 和 SQL 模块引入了空闲状态保留时间(Idle State Retention Time)进行状态管理,下面我们具体介绍一下。
在 Flink 的官方文档 中给我们展示了State TTL的基本用法,用法示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 import org.apache.flink.api.common.state.StateTtlConfig;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.time.Time; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1 )) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor <>("text state" , String.class); stateDescriptor.enableTimeToLive(ttlConfig);
可以看到,要使用 State TTL 功能,首先要定义一个 StateTtlConfig 对象。
1.通过构造器模式(Builder Pattern)来创建,传入一个 Time 对象作为 TTL 时间;
2.然后设置更新类型(Update Type);
3.状态可见性(State Visibility);
4.在后续声明的状态描述符(State Descriptor)中启用 State TTL 功能了。
StateTtl
Config 的参数说明
l TTL :表示状态的过期时间,是一个 org.apache.flink.api.common.time.Time 对象。一旦设置了 TTL,那么如果上次访问的时间戳 + TTL 超过了当前时间,则表明状态过期了(这是一个简化的说法,严谨的定义请参考org.apache.flink.runtime.state.ttl.TtlUtils 类中关于 expired 的实现)
l UpdateType :表示状态时间戳的更新的时机,是一个 Enum 对象。如果设置为 Disabled,则表明不更新时间戳;如果设置为 OnCreateAndWrite,则表明当状态创建或每次写入时都会更新时间戳;如果设置为 OnReadAndWrite,则除了在状态创建和写入时更新时间戳外,读取也会更新状态的时间戳
l StateVisibility :表示对已过期但还未被清理掉的状态如何处理,也是 Enum 对象。如果设置为 ReturnExpiredIfNotCleanedUp,那么即使这个状态的时间戳表明它已经过期了,但是只要还未被真正清理掉,就会被返回给调用方;如果设置为 NeverReturnExpired,那么一旦这个状态过期了,那么永远不会被返回给调用方,只会返回空状态,避免了过期状态带来的干扰
TimeCharacteristic 以及 TtlTimeCharacteristic :表示 State TTL 功能所适用的时间模式,仍然是 Enum 对象。前者已经被标记为 Deprecated(废弃),推荐新代码采用新的 TtlTimeCharacteristic 参数。截止到 Flink 1.8,只支持 ProcessingTime 一种时间模式,对 EventTime 模式的 State TTL 支持还在开发中
CleanupStrategies :
1.默认被动清理,再读这个key时,进行清除;(内存有压力考虑换rocksdb)
2.手动清理:如下
表示过期对象的清理策略,目前来说有三种 Enum 值。当设置为 FULL_STATE_SCAN_SNAPSHOT 时,对应的是 EmptyCleanupStrategy 类,表示对过期状态不做主动清理,当执行完整快照(Snapshot / Checkpoint)时,会生成一个较小的状态文件,但本地状态并不会减小
唯有当作业重启并从上一个快照点恢复后,本地状态才会实际减小,因此可能仍然不能解决内存压力的问题。为了应对这个问题,Flink 还提供了增量清理的枚举值,分别是针对 Heap StateBackend 的 INCREMENTAL_CLEANUP(对应 IncrementalCleanupStrategy 类),以及对 RocksDB StateBackend 有效的 ROCKSDB_COMPACTION_FILTER(对应 RocksdbCompactFilterCleanupStrategy 类)
对于增量清理功能,Flink 可以被配置为每读取若干条记录就执行一次清理操作,而且可以指定每次要清理多少条失效记录;对于 RocksDB 的状态清理,则是通过 JNI 来调用 C++ 语言编写的 FlinkCompactionFilter 来实现,底层是通过 RocksDB 提供的后台 Compaction 操作来实现对失效状态过滤的
配置中有下面几个配置项可以选择:StateTtlConfig中的newBuilder这个方法是必须的,它是设置生存周期的值。
TTL 刷新策略(默认OnCreateAndWrite)
策略类型
描述
StateTtlConfig.UpdateType.Disabled
禁用TTL,永不过期
StateTtlConfig.UpdateType.OnCreateAndWrite
每次写操作都会更新State的最后访问时间
StateTtlConfig.UpdateType.OnReadAndWrite
每次读写操作都会跟新State的最后访问时间
状态可见性(默认NeverReturnExpired)
策略类型
描述
StateTtlConfig.StateVisibility.NeverReturnExpired
永不返回过期状态
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp
可以返回过期但尚未被清理的状态值
Checkpoint Checkpoint和State的区别
Checkpoint执行流程
0.Flink的JobManager创建CheckpointCoordinator
1.Coordinator向所有的SourceOperator发送Barrier栅栏(理解为执行Checkpoint的信号)
2.SourceOperator接收到Barrier之后,暂停当前的操作(暂停的时间很短,因为后续的写快照是异步的),并制作State快照, 然后将自己的快照保存到指定的介质中(如HDFS), 一切 ok之后向Coordinator汇报并将Barrier发送给下游的其他Operator
3.其他的如TransformationOperator接收到Barrier,重复第2步,最后将Barrier发送给Sink
4.Sink接收到Barrier之后重复第2步
5.Coordinator接收到所有的Operator的执行ok的汇报结果,认为本次快照执行成功
Chandy-Lamport algorithm分布式快照算法 Flink中的Checkpoint底层使用了Chandy-Lamport algorithm分布式快照算法 可以保证数据的在分布式环境下的一致性 !
https://zhuanlan.zhihu.com/p/53482103
Chandy-Lamport algorithm算法的作者也是ZK中Paxos 一致性算法的作者
https://www.cnblogs.com/shenguanpu/p/4048660.html
Flink中使用Chandy-Lamport algorithm分布式快照算法取得了成功,后续Spark的StructuredStreaming也借鉴了该算法
状态后端/存储介质
1 2 3 4 5 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-statebackend-rocksdb_2.12</artifactId > <version > 1.12.0</version > </dependency >
Checkpoint代码演示
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/checkpointing.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 import org.apache.commons.lang3.SystemUtils;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.util.Collector;import java.util.Properties;public class CheckpointDemo01 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.enableCheckpointing(1000 ); if (SystemUtils.IS_OS_WINDOWS) { env.setStateBackend(new FsStateBackend ("file:///D:/ckp" )); } else { env.setStateBackend(new FsStateBackend ("hdfs://node1:8020/flink-checkpoint/checkpoint" )); } env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500 ); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10 ); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000 ); DataStream<String> linesDS = env.socketTextStream("node1" , 9999 ); DataStream<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction <String, Tuple2<String, Integer>>() { @Override public void flatMap (String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.split(" " ); for (String word : words) { out.collect(Tuple2.of(word, 1 )); } } }); KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0); DataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1 ); DataStream<String> result = (SingleOutputStreamOperator<String>) aggResult.map(new RichMapFunction <Tuple2<String, Integer>, String>() { @Override public String map (Tuple2<String, Integer> value) throws Exception { return value.f0 + ":::" + value.f1; } }); result.print(); Properties props = new Properties (); props.setProperty("bootstrap.servers" , "node1:9092" ); FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer <>("flink_kafka" , new SimpleStringSchema (), props); result.addSink(kafkaSink); env.execute(); } }
状态恢复-自动重启-全自动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 import org.apache.commons.lang3.SystemUtils;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.util.Collector;import java.util.Properties;import java.util.concurrent.TimeUnit;public class CheckpointDemo02_Restart { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.enableCheckpointing(1000 ); if (SystemUtils.IS_OS_WINDOWS) { env.setStateBackend(new FsStateBackend ("file:///D:/ckp" )); } else { env.setStateBackend(new FsStateBackend ("hdfs://node1:8020/flink-checkpoint/checkpoint" )); } env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500 ); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10 ); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000 ); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1 ); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3 , Time.of(5 , TimeUnit.SECONDS) )); DataStream<String> linesDS = env.socketTextStream("node1" , 9999 ); DataStream<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction <String, Tuple2<String, Integer>>() { @Override public void flatMap (String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.split(" " ); for (String word : words) { if (word.equals("bug" )) { System.out.println("bug....." ); throw new Exception ("bug....." ); } out.collect(Tuple2.of(word, 1 )); } } }); KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0); DataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1 ); DataStream<String> result = (SingleOutputStreamOperator<String>) aggResult.map(new RichMapFunction <Tuple2<String, Integer>, String>() { @Override public String map (Tuple2<String, Integer> value) throws Exception { return value.f0 + ":::" + value.f1; } }); result.print(); Properties props = new Properties (); props.setProperty("bootstrap.servers" , "node1:9092" ); FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer <>("flink_kafka" , new SimpleStringSchema (), props); result.addSink(kafkaSink); env.execute(); } }
状态恢复-手动重启-半自动
1.打包-用到了kafka
2.启动Flink集群
3.上传jar包配置并提交
http://node1:8081/#/submit
4.发送单词并观察hdfs目录
5.取消任务
6.重新提交任务并指定从指定的ckp目录恢复状态接着计算
hdfs://node1:8020/flink-checkpoint/checkpoint/acb9071752276e86552a30fda41e021c/chk-100
7.继续发送数据发现可以恢复从之前的状态继续计算
Savepoint-全手动
演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # 启动yarn session /export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 2 -d # 运行job-会自动执行Checkpoint /export/server/flink/bin/flink run --class cn.checkpoint.CheckpointDemo01 /root/ckp.jar # 手动创建savepoint--相当于手动做了一次Checkpoint /export/server/flink/bin/flink savepoint 0e921a10eb31bb0983b637929ec87a8a hdfs://node1:8020/flink-checkpoint/savepoint/ # 停止job /export/server/flink/bin/flink cancel 0e921a10eb31bb0983b637929ec87a8a # 重新启动job,手动加载savepoint数据 /export/server/flink/bin/flink run -s hdfs://node1:8020/flink-checkpoint/savepoint/savepoint-0e921a-1cac737bff7a --class cn.checkpoint.CheckpointDemo01 /root/ckp.jar # 停止yarn session yarn application -kill application_1607782486484_0014
BroadcastState-动态更新规则配置(存内存) flink广播变量有两种方式:
1.静态广播:
分布式缓存: Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。 当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。
1 2 3 4 5 6 7 8 9 10 11 12 env.registerCachedFile("D:/wxgz-local/resources_ceshi/too.properties" , "too" ) val file: File = getRuntimeContext.getDistributedCache.getFile("too" ) val prop = new Properties prop.load(new FileInputStream (file)) val value = prop.getProperty("cycle" )
2.动态广播更新
与分布式缓存的区别: 1.广播变量是基于内存的,是将变量分发到各个worker节点的内存上(避免多次复制,节省内存) 2.分布式缓存是基于磁盘的,将文件copy到各个节点上,当函数运行时可以在本地文件系统检索该文件(避免多次复制,提高执行效率)
需求
l 注意事项
Broadcast State 是Map 类型,即K-V 类型。
Broadcast State 只有在广播的一侧, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processBroadcastElement 方法中可以修改。在非广播的一侧, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processElement 方法中只读。
Broadcast State 中元素的顺序,在各Task 中可能不同。基于顺序的处理,需要注意。
Broadcast State 在Checkpoint 时,每个Task 都会Checkpoint 广播状态。
Broadcast State 在运行时保存在内存中 ,目前还不能保存在RocksDB State Backend 中。
注意:广播流数据源是必须running状态,否则checkpoint会失败。所以要么mysql每次加载时间比ck小,要么mysql数据要打到kafka。或采用cdc 监测changelog方式
有一个事件流–用户的行为日志,里面有用户id,但是没有用户的详细信息
有一个配置流/规则流–用户信息流–里面有用户的详细的信息
现在要将事件流和配置流进行关联, 得出日志中用户的详细信息,如 (用户id,详细信息, 操作)
那么我们可以将配置流/规则流–用户信息流 作为状态进行广播 (因为配置流/规则流–用户信息流较小)
思考:
1.配置流不应该是只上报有更新的配置,然后更新broadcast state的值,事件流再读取?
数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 public static class MySource implements SourceFunction <Tuple4<String, String, String, Integer>> { private boolean isRunning = true ; @Override public void run (SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception { Random random = new Random (); SimpleDateFormat df = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" ); while (isRunning){ int id = random.nextInt(4 ) + 1 ; String user_id = "user_" + id; String eventTime = df.format(new Date ()); String eventType = "type_" + random.nextInt(3 ); int productId = random.nextInt(4 ); ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId)); Thread.sleep(500 ); } } @Override public void cancel () { isRunning = false ; } } public static class MySQLSource extends RichSourceFunction <Map<String, Tuple2<String, Integer>>> { private boolean flag = true ; private Connection conn = null ; private PreparedStatement ps = null ; private ResultSet rs = null ; @Override public void open (Configuration parameters) throws Exception { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata" , "root" , "root" ); String sql = "select `userID`, `userName`, `userAge` from `user_info`" ; ps = conn.prepareStatement(sql); } @Override public void run (SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception { while (flag){ Map<String, Tuple2<String, Integer>> map = new HashMap <>(); ResultSet rs = ps.executeQuery(); while (rs.next()){ String userID = rs.getString("userID" ); String userName = rs.getString("userName" ); int userAge = rs.getInt("userAge" ); map.put(userID, Tuple2.of(userName,userAge)); } ctx.collect(map); Thread.sleep(5000 ); } } @Override public void cancel () { flag = false ; } @Override public void close () throws Exception { if (conn != null ) conn.close(); if (ps != null ) ps.close(); if (rs != null ) rs.close(); } }
代码步骤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 1.env 2.source -1.构建实时数据事件流-自定义随机 <userID, eventTime, eventType, productID> -2.构建配置流-从MySQL <用户id,<姓名,年龄>> 3.transformation -1.定义状态描述器 MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor = new MapStateDescriptor<>("config",Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT))); -2.广播配置流 BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor); -3.将事件流和广播流进行连接 BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS); -4.处理连接后的流-根据配置流补全事件流中的用户的信息 4.sink 5.execute
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.state.BroadcastState;import org.apache.flink.api.common.state.MapStateDescriptor;import org.apache.flink.api.common.state.ReadOnlyBroadcastState;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple4;import org.apache.flink.api.java.tuple.Tuple6;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;import org.apache.flink.streaming.api.datastream.BroadcastStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.util.Collector;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.text.SimpleDateFormat;import java.util.Date;import java.util.HashMap;import java.util.Map;import java.util.Random;public class BroadcastStateDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1 ); DataStreamSource<Tuple4<String, String, String, Integer>> eventDS = env.addSource(new MySource ()); DataStreamSource<Map<String, Tuple2<String, Integer>>> userDS = env.addSource(new MySQLSource ()); MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor = new MapStateDescriptor <>("info" , Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT))); BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = userDS.broadcast(descriptor); BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS = eventDS.connect(broadcastDS); SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> result = connectDS.process(new BroadcastProcessFunction < Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>, Tuple6<String, String, String, Integer, String, Integer> >() { @Override public void processElement (Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception { ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor); Map<String, Tuple2<String, Integer>> map = broadcastState.get(null ); if (map != null ) { String userId = value.f0; Tuple2<String, Integer> tuple2 = map.get(userId); String username = tuple2.f0; Integer age = tuple2.f1; out.collect(Tuple6.of(userId, value.f1, value.f2, value.f3, username, age)); } } @Override public void processBroadcastElement (Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception { BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor); broadcastState.clear(); broadcastState.put(null , value); } }); result.print(); env.execute(); } public static class MySource implements SourceFunction <Tuple4<String, String, String, Integer>> { private boolean isRunning = true ; @Override public void run (SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception { Random random = new Random (); SimpleDateFormat df = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" ); while (isRunning) { int id = random.nextInt(4 ) + 1 ; String user_id = "user_" + id; String eventTime = df.format(new Date ()); String eventType = "type_" + random.nextInt(3 ); int productId = random.nextInt(4 ); ctx.collect(Tuple4.of(user_id, eventTime, eventType, productId)); Thread.sleep(500 ); } } @Override public void cancel () { isRunning = false ; } } public static class MySQLSource extends RichSourceFunction <Map<String, Tuple2<String, Integer>>> { private boolean flag = true ; private Connection conn = null ; private PreparedStatement ps = null ; private ResultSet rs = null ; @Override public void open (Configuration parameters) throws Exception { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata" , "root" , "root" ); String sql = "select `userID`, `userName`, `userAge` from `user_info`" ; ps = conn.prepareStatement(sql); } @Override public void run (SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception { while (flag) { Map<String, Tuple2<String, Integer>> map = new HashMap <>(); ResultSet rs = ps.executeQuery(); while (rs.next()) { String userID = rs.getString("userID" ); String userName = rs.getString("userName" ); int userAge = rs.getInt("userAge" ); map.put(userID, Tuple2.of(userName, userAge)); } ctx.collect(map); Thread.sleep(5000 ); } } @Override public void cancel () { flag = false ; } @Override public void close () throws Exception { if (conn != null ) conn.close(); if (ps != null ) ps.close(); if (rs != null ) rs.close(); } } }
1 2 3 4 5 6 1. 需要重新加载: 重新广播还是预加载?定时覆盖保存一份到本地,恢复时如果不从checkpoint或savepoint恢复时可以自己从之前的历史的加载。 2. 任务刚启动的时候,数据join不上,广播流先到,也有可能是未来的属性,事件流先到,就会有join不上的问题 SQL temporary join 可以先保存两流状态,等到join上输出,可是保存在内存中,数据量大,版本太多占用内存,设置空闲状态过期
总结:
1.维表数据量小,且可控,使用broadcast+预加载
(单独线程从本地reload或从分布式缓存加载,从本地reload每个task都要拉取一份,分布式缓存是每个节点拉取一份,从网络io考虑,分布式缓存。)
2.维表数据量大,或者两条事件流join,把行为串联,可以用热存储,把大概率先到的一条流存外部系统,然后另一条流interval join对应版本(异步io和Guava Cache减轻网络延迟和压力)。
3.上面的方式维表的更新存在延迟,如果对实时要求高,且两条流互相等待的时间不大,可以考虑flink sql temporary join,实现起来代码简单。但是state的管理是个问题,设置不合理,内存会溢出。
涉及到低水位的就要注意一个问题,如果某个分区没数据,低水位就不会推进。简单的方法就是加空闲源检测机制idle,比较好的方法是定义一个定期触发的trigger,推进水位。
Flink-双流Join join的分类
join() 算子提供的语义为”Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间两种时间特征。
coGroup() 只有 inner join 肯定还不够,如何实现 left/right outer join 呢?答案就是利用 coGroup() 算子。
它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。
1 2 3 4 dataStream.coGroup(otherStream) .where(0 ).equalTo(1 ) .window(TumblingEventTimeWindows.of(Time.seconds(3 ))) .apply (new CoGroupFunction () {...});
join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。
所以 Flink 又提供了”Interval join”的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]
interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间 。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。
1 2 3 4 if (timeBehaviour != TimeBehaviour.EventTime) { throw new UnsupportedTimeCharacteristicException ("Time-bounded stream joins are only supported in event time" ); }
为什么说window join和interval join的区别在于interval state有清理机制?
双流join的本质就是把双流利用state把数据储存起来,然后计算时嵌套循环判断join。window state 保存一个窗口的 state的数据,interval 如果不清除保留的是历史这个key的所有state,状态太大,join前把不需要的清理掉再循环join。
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html
代码演示-WindowJoin
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 import com.alibaba.fastjson.JSON;import lombok.Data;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.api.common.functions.JoinFunction;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import java.math.BigDecimal;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;public class JoinDemo01_WindowJoin { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource ()); DataStreamSource<OrderItem> OrderItemDS = env.addSource(new OrderItemSource ()); SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(new GoodsWatermark ()); SingleOutputStreamOperator<OrderItem> OrderItemDSWithWatermark = OrderItemDS.assignTimestampsAndWatermarks(new OrderItemWatermark ()); DataStream<FactOrderItem> resultDS = goodsDSWithWatermark.join(OrderItemDSWithWatermark) .where(Goods::getGoodsId) .equalTo(OrderItem::getGoodsId) .window(TumblingEventTimeWindows.of(Time.seconds(5 ))) .apply(new JoinFunction <Goods, OrderItem, FactOrderItem>() { @Override public FactOrderItem join (Goods first, OrderItem second) throws Exception { FactOrderItem result = new FactOrderItem (); result.setGoodsId(first.getGoodsId()); result.setGoodsName(first.getGoodsName()); result.setCount(new BigDecimal (second.getCount())); result.setTotalMoney(new BigDecimal (second.getCount()).multiply(first.getGoodsPrice())); return result; } }); resultDS.print(); env.execute(); } @Data public static class Goods { private String goodsId; private String goodsName; private BigDecimal goodsPrice; public static List<Goods> GOODS_LIST; public static Random r; static { r = new Random (); GOODS_LIST = new ArrayList <>(); GOODS_LIST.add(new Goods ("1" , "小米12" , new BigDecimal (4890 ))); GOODS_LIST.add(new Goods ("2" , "iphone12" , new BigDecimal (12000 ))); GOODS_LIST.add(new Goods ("3" , "MacBookPro" , new BigDecimal (15000 ))); GOODS_LIST.add(new Goods ("4" , "Thinkpad X1" , new BigDecimal (9800 ))); GOODS_LIST.add(new Goods ("5" , "MeiZu One" , new BigDecimal (3200 ))); GOODS_LIST.add(new Goods ("6" , "Mate 40" , new BigDecimal (6500 ))); } public static Goods randomGoods () { int rIndex = r.nextInt(GOODS_LIST.size()); return GOODS_LIST.get(rIndex); } public Goods () { } public Goods (String goodsId, String goodsName, BigDecimal goodsPrice) { this .goodsId = goodsId; this .goodsName = goodsName; this .goodsPrice = goodsPrice; } @Override public String toString () { return JSON.toJSONString(this ); } } @Data public static class OrderItem { private String itemId; private String goodsId; private Integer count; @Override public String toString () { return JSON.toJSONString(this ); } } @Data public static class FactOrderItem { private String goodsId; private String goodsName; private BigDecimal count; private BigDecimal totalMoney; @Override public String toString () { return JSON.toJSONString(this ); } } public static class GoodsSource extends RichSourceFunction <Goods> { private Boolean isCancel; @Override public void open (Configuration parameters) throws Exception { isCancel = false ; } @Override public void run (SourceContext sourceContext) throws Exception { while (!isCancel) { Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods)); TimeUnit.SECONDS.sleep(1 ); } } @Override public void cancel () { isCancel = true ; } } public static class OrderItemSource extends RichSourceFunction <OrderItem> { private Boolean isCancel; private Random r; @Override public void open (Configuration parameters) throws Exception { isCancel = false ; r = new Random (); } @Override public void run (SourceContext sourceContext) throws Exception { while (!isCancel) { Goods goods = Goods.randomGoods(); OrderItem orderItem = new OrderItem (); orderItem.setGoodsId(goods.getGoodsId()); orderItem.setCount(r.nextInt(10 ) + 1 ); orderItem.setItemId(UUID.randomUUID().toString()); sourceContext.collect(orderItem); orderItem.setGoodsId("111" ); sourceContext.collect(orderItem); TimeUnit.SECONDS.sleep(1 ); } } @Override public void cancel () { isCancel = true ; } } public static class GoodsWatermark implements WatermarkStrategy <Goods> { @Override public TimestampAssigner<Goods> createTimestampAssigner (TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<Goods> createWatermarkGenerator (WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator <Goods>() { @Override public void onEvent (Goods event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark (System.currentTimeMillis())); } @Override public void onPeriodicEmit (WatermarkOutput output) { output.emitWatermark(new Watermark (System.currentTimeMillis())); } }; } } public static class OrderItemWatermark implements WatermarkStrategy <OrderItem> { @Override public TimestampAssigner<OrderItem> createTimestampAssigner (TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<OrderItem> createWatermarkGenerator (WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator <OrderItem>() { @Override public void onEvent (OrderItem event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark (System.currentTimeMillis())); } @Override public void onPeriodicEmit (WatermarkOutput output) { output.emitWatermark(new Watermark (System.currentTimeMillis())); } }; } } }
代码演示-IntervalJoin
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 import com.alibaba.fastjson.JSON;import lombok.Data;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;import java.math.BigDecimal;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;public class JoinDemo02_IntervalJoin { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource ()); DataStreamSource<OrderItem> OrderItemDS = env.addSource(new OrderItemSource ()); SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(new GoodsWatermark ()); SingleOutputStreamOperator<OrderItem> OrderItemDSWithWatermark = OrderItemDS.assignTimestampsAndWatermarks(new OrderItemWatermark ()); SingleOutputStreamOperator<FactOrderItem> resultDS = goodsDSWithWatermark.keyBy(Goods::getGoodsId) .intervalJoin(OrderItemDSWithWatermark.keyBy(OrderItem::getGoodsId)) .between(Time.seconds(-2 ), Time.seconds(1 )) .process(new ProcessJoinFunction <Goods, OrderItem, FactOrderItem>() { @Override public void processElement (Goods left, OrderItem right, Context ctx, Collector<FactOrderItem> out) throws Exception { FactOrderItem result = new FactOrderItem (); result.setGoodsId(left.getGoodsId()); result.setGoodsName(left.getGoodsName()); result.setCount(new BigDecimal (right.getCount())); result.setTotalMoney(new BigDecimal (right.getCount()).multiply(left.getGoodsPrice())); out.collect(result); } }); resultDS.print(); env.execute(); } @Data public static class Goods { private String goodsId; private String goodsName; private BigDecimal goodsPrice; public static List<Goods> GOODS_LIST; public static Random r; static { r = new Random (); GOODS_LIST = new ArrayList <>(); GOODS_LIST.add(new Goods ("1" , "小米12" , new BigDecimal (4890 ))); GOODS_LIST.add(new Goods ("2" , "iphone12" , new BigDecimal (12000 ))); GOODS_LIST.add(new Goods ("3" , "MacBookPro" , new BigDecimal (15000 ))); GOODS_LIST.add(new Goods ("4" , "Thinkpad X1" , new BigDecimal (9800 ))); GOODS_LIST.add(new Goods ("5" , "MeiZu One" , new BigDecimal (3200 ))); GOODS_LIST.add(new Goods ("6" , "Mate 40" , new BigDecimal (6500 ))); } public static Goods randomGoods () { int rIndex = r.nextInt(GOODS_LIST.size()); return GOODS_LIST.get(rIndex); } public Goods () { } public Goods (String goodsId, String goodsName, BigDecimal goodsPrice) { this .goodsId = goodsId; this .goodsName = goodsName; this .goodsPrice = goodsPrice; } @Override public String toString () { return JSON.toJSONString(this ); } } @Data public static class OrderItem { private String itemId; private String goodsId; private Integer count; @Override public String toString () { return JSON.toJSONString(this ); } } @Data public static class FactOrderItem { private String goodsId; private String goodsName; private BigDecimal count; private BigDecimal totalMoney; @Override public String toString () { return JSON.toJSONString(this ); } } public static class GoodsSource extends RichSourceFunction <Goods> { private Boolean isCancel; @Override public void open (Configuration parameters) throws Exception { isCancel = false ; } @Override public void run (SourceContext sourceContext) throws Exception { while (!isCancel) { Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods)); TimeUnit.SECONDS.sleep(1 ); } } @Override public void cancel () { isCancel = true ; } } public static class OrderItemSource extends RichSourceFunction <OrderItem> { private Boolean isCancel; private Random r; @Override public void open (Configuration parameters) throws Exception { isCancel = false ; r = new Random (); } @Override public void run (SourceContext sourceContext) throws Exception { while (!isCancel) { Goods goods = Goods.randomGoods(); OrderItem orderItem = new OrderItem (); orderItem.setGoodsId(goods.getGoodsId()); orderItem.setCount(r.nextInt(10 ) + 1 ); orderItem.setItemId(UUID.randomUUID().toString()); sourceContext.collect(orderItem); orderItem.setGoodsId("111" ); sourceContext.collect(orderItem); TimeUnit.SECONDS.sleep(1 ); } } @Override public void cancel () { isCancel = true ; } } public static class GoodsWatermark implements WatermarkStrategy <Goods> { @Override public TimestampAssigner<Goods> createTimestampAssigner (TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<Goods> createWatermarkGenerator (WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator <Goods>() { @Override public void onEvent (Goods event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark (System.currentTimeMillis())); } @Override public void onPeriodicEmit (WatermarkOutput output) { output.emitWatermark(new Watermark (System.currentTimeMillis())); } }; } } public static class OrderItemWatermark implements WatermarkStrategy <OrderItem> { @Override public TimestampAssigner<OrderItem> createTimestampAssigner (TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<OrderItem> createWatermarkGenerator (WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator <OrderItem>() { @Override public void onEvent (OrderItem event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark (System.currentTimeMillis())); } @Override public void onPeriodicEmit (WatermarkOutput output) { output.emitWatermark(new Watermark (System.currentTimeMillis())); } }; } } }
Flink-End-to-End Exactly-Once 数据一致性语义分类
数据一致性语义详解
注意:
Exactly-Once 更准确的理解 应该是:
数据只会被正确的处理一次!
而不是说数据只被处理一次,有可能多次,但只有最后一次是正确的,成功的!
如何实现局部的Exactly-Once 可以使用:
1.去重
2.幂等
1 2 3 INSERT INTO t_student (id,`name`,age) VALUES(9 ,'Gordon' ,18 ) > 1062 - Duplicate entry '9' for key 'PRIMARY' > 时间: 0. 001s
3.分布式快照/Checkpoint—Flink使用的是这个
如何实现End-To-End Exactly-Once
Source: 如Kafka的offset 支持数据的replay/重放/重新传输
Transformation: 借助于Checkpoint
Sink: Checkpoint + 两阶段事务提交
SourceOperater从Kafka消费消息/数据并记录offset
TransformationOperater对数据进行处理转换并做Checkpoint
SinkOperator将结果写入到Kafka
注意:在sink的时候会执行两阶段提交:
1.开启事务
2.各个Operator执行barrier的Checkpoint, 成功则进行预提交
3.所有Operator执行完预提交则执行真正的提交
4.如果有任何一个预提交失败则回滚到最近的Checkpoint
代码演示 kafka主题flink-kafka1 —>
Flink Source –>
Flink-Transformation做WordCount–>
结果存储到kafka主题-flink-kafka2
1 2 3 4 5 6 7 /export/server/kafka/bin/kafka-topics.sh --zookeeper node1:2181 --create --replication-factor 2 --partitions 3 --topic flink_kafka1 /export/server/kafka/bin/kafka-topics.sh --zookeeper node1:2181 --create --replication-factor 2 --partitions 3 --topic flink_kafka2 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka1 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 import org.apache.commons.lang3.SystemUtils;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;import org.apache.flink.util.Collector;import java.util.Properties;import java.util.Random;import java.util.concurrent.TimeUnit;public class Flink_Kafka_EndToEnd_Exactly_Once { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.enableCheckpointing(1000 ); if (SystemUtils.IS_OS_WINDOWS) { env.setStateBackend(new FsStateBackend ("file:///D:/ckp" )); } else { env.setStateBackend(new FsStateBackend ("hdfs://node1:8020/flink-checkpoint/checkpoint" )); } env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500 ); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10 ); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000 ); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1 ); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3 , Time.of(5 , TimeUnit.SECONDS) )); Properties props1 = new Properties (); props1.setProperty("bootstrap.servers" , "node1:9092" ); props1.setProperty("group.id" , "flink" ); props1.setProperty("auto.offset.reset" , "latest" ); props1.setProperty("flink.partition-discovery.interval-millis" , "5000" ); FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer <String>("flink_kafka1" , new SimpleStringSchema (), props1); kafkaSource.setCommitOffsetsOnCheckpoints(true ); DataStream<String> kafkaDS = env.addSource(kafkaSource); SingleOutputStreamOperator<String> result = kafkaDS.flatMap(new FlatMapFunction <String, Tuple2<String, Integer>>() { private Random ran = new Random (); @Override public void flatMap (String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] arr = value.split(" " ); for (String word : arr) { int num = ran.nextInt(5 ); if (num > 3 ){ System.out.println("随机异常产生了" ); throw new Exception ("随机异常产生了" ); } out.collect(Tuple2.of(word, 1 )); } } }).keyBy(t -> t.f0) .sum(1 ) .map(new MapFunction <Tuple2<String, Integer>, String>() { @Override public String map (Tuple2<String, Integer> value) throws Exception { return value.f0 + ":" + value.f1; } }); Properties props2 = new Properties (); props2.setProperty("bootstrap.servers" , "node1:9092" ); props2.setProperty("transaction.timeout.ms" , "5000" ); FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer <>( "flink_kafka2" , new KeyedSerializationSchemaWrapper (new SimpleStringSchema ()), props2, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); result.addSink(kafkaSink); env.execute(); } }
Flink-异步IO-了解 原理
API
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html
注意: 如果要使用异步IO, 对应Client有一定要求:
1.该Client要支持发送异步请求,如vertx
2.如果Client不支持可以使用线程池来模拟异步请求
代码演示 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 DROP TABLE IF EXISTS `t_category`; CREATE TABLE `t_category` ( `id` int(11) NOT NULL, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ---------------------------- -- Records of t_category -- ---------------------------- INSERT INTO `t_category` VALUES ('1', '手机'); INSERT INTO `t_category` VALUES ('2', '电脑'); INSERT INTO `t_category` VALUES ('3', '服装'); INSERT INTO `t_category` VALUES ('4', '化妆品'); INSERT INTO `t_category` VALUES ('5', '食品');
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 import io.vertx.core.AsyncResult;import io.vertx.core.Handler;import io.vertx.core.Vertx;import io.vertx.core.VertxOptions;import io.vertx.core.json.JsonObject;import io.vertx.ext.jdbc.JDBCClient;import io.vertx.ext.sql.SQLClient;import io.vertx.ext.sql.SQLConnection;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.AsyncDataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.async.ResultFuture;import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.*;import java.util.Collections;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ASyncIODemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<CategoryInfo> categoryDS = env.addSource(new RichSourceFunction <CategoryInfo>() { private Boolean flag = true ; @Override public void run (SourceContext<CategoryInfo> ctx) throws Exception { Integer[] ids = {1 , 2 , 3 , 4 , 5 }; for (Integer id : ids) { ctx.collect(new CategoryInfo (id, null )); } } @Override public void cancel () { this .flag = false ; } }); SingleOutputStreamOperator<CategoryInfo> result1 = AsyncDataStream .unorderedWait(categoryDS, new ASyncIOFunction1 (), 1000 , TimeUnit.SECONDS, 10 ); SingleOutputStreamOperator<CategoryInfo> result2 = AsyncDataStream .unorderedWait(categoryDS, new ASyncIOFunction2 (), 1000 , TimeUnit.SECONDS, 10 ); result1.print("方式一:Java-vertx中提供的异步client实现异步IO \n" ); result2.print("方式二:MySQL中同步client+线程池模拟异步IO \n" ); env.execute(); } } @Data @NoArgsConstructor @AllArgsConstructor class CategoryInfo { private Integer id; private String name; } class MysqlSyncClient { private static transient Connection connection; private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver" ; private static final String URL = "jdbc:mysql://localhost:3306/bigdata" ; private static final String USER = "root" ; private static final String PASSWORD = "root" ; static { init(); } private static void init () { try { Class.forName(JDBC_DRIVER); } catch (ClassNotFoundException e) { System.out.println("Driver not found!" + e.getMessage()); } try { connection = DriverManager.getConnection(URL, USER, PASSWORD); } catch (SQLException e) { System.out.println("init connection failed!" + e.getMessage()); } } public void close () { try { if (connection != null ) { connection.close(); } } catch (SQLException e) { System.out.println("close connection failed!" + e.getMessage()); } } public CategoryInfo query (CategoryInfo category) { try { String sql = "select id,name from t_category where id = " + category.getId(); Statement statement = connection.createStatement(); ResultSet rs = statement.executeQuery(sql); if (rs != null && rs.next()) { category.setName(rs.getString("name" )); } } catch (SQLException e) { System.out.println("query failed!" + e.getMessage()); } return category; } } class ASyncIOFunction1 extends RichAsyncFunction <CategoryInfo, CategoryInfo> { private transient SQLClient mySQLClient; @Override public void open (Configuration parameters) throws Exception { JsonObject mySQLClientConfig = new JsonObject (); mySQLClientConfig .put("driver_class" , "com.mysql.jdbc.Driver" ) .put("url" , "jdbc:mysql://localhost:3306/bigdata" ) .put("user" , "root" ) .put("password" , "root" ) .put("max_pool_size" , 20 ); VertxOptions options = new VertxOptions (); options.setEventLoopPoolSize(10 ); options.setWorkerPoolSize(20 ); Vertx vertx = Vertx.vertx(options); mySQLClient = JDBCClient.createNonShared(vertx, mySQLClientConfig); } @Override public void asyncInvoke (CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception { mySQLClient.getConnection(new Handler <AsyncResult<SQLConnection>>() { @Override public void handle (AsyncResult<SQLConnection> sqlConnectionAsyncResult) { if (sqlConnectionAsyncResult.failed()) { return ; } SQLConnection connection = sqlConnectionAsyncResult.result(); connection.query("select id,name from t_category where id = " +input.getId(), new Handler <AsyncResult<io.vertx.ext.sql.ResultSet>>() { @Override public void handle (AsyncResult<io.vertx.ext.sql.ResultSet> resultSetAsyncResult) { if (resultSetAsyncResult.succeeded()) { List<JsonObject> rows = resultSetAsyncResult.result().getRows(); for (JsonObject jsonObject : rows) { CategoryInfo categoryInfo = new CategoryInfo (jsonObject.getInteger("id" ), jsonObject.getString("name" )); resultFuture.complete(Collections.singletonList(categoryInfo)); } } } }); } }); } @Override public void close () throws Exception { mySQLClient.close(); } @Override public void timeout (CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception { System.out.println("async call time out!" ); input.setName("未知" ); resultFuture.complete(Collections.singleton(input)); } } class ASyncIOFunction2 extends RichAsyncFunction <CategoryInfo, CategoryInfo> { private transient MysqlSyncClient client; private ExecutorService executorService; @Override public void open (Configuration parameters) throws Exception { super .open(parameters); client = new MysqlSyncClient (); executorService = new ThreadPoolExecutor (10 , 10 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); } @Override public void asyncInvoke (CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception { executorService.execute(new Runnable () { @Override public void run () { resultFuture.complete(Collections.singletonList((CategoryInfo) client.query(input))); } }); } @Override public void close () throws Exception { } @Override public void timeout (CategoryInfo input, ResultFuture<CategoryInfo> resultFuture) throws Exception { System.out.println("async call time out!" ); input.setName("未知" ); resultFuture.complete(Collections.singleton(input)); } }
Flink-Streaming Flie Sink(新版本弃用,整合到File sink) 介绍
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html
https://blog.csdn.net/u013220482/article/details/100901471
代码演示 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 import org.apache.commons.lang3.SystemUtils;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringEncoder;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.core.fs.Path;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;public class StreamingFileSinkDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.enableCheckpointing(1000 ); if (SystemUtils.IS_OS_WINDOWS) { env.setStateBackend(new FsStateBackend ("file:///D:/ckp" )); } else { env.setStateBackend(new FsStateBackend ("hdfs://node1:8020/flink-checkpoint/checkpoint" )); } env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500 ); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10 ); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000 ); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1 ); DataStream<String> lines = env.socketTextStream("node1" , 9999 ); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction <String, Tuple2<String, Integer>>() { @Override public void flatMap (String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] arr = value.split(" " ); for (String word : arr) { out.collect(Tuple2.of(word, 1 )); } } }); SingleOutputStreamOperator<String> result = wordAndOne.keyBy(t -> t.f0).sum(1 ) .map(new MapFunction <Tuple2<String, Integer>, String>() { @Override public String map (Tuple2<String, Integer> value) throws Exception { return value.f0 + ":" + value.f1; } }); result.print(); OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("prefix" ) .withPartSuffix(".txt" ) .build(); StreamingFileSink<String> streamingFileSink = StreamingFileSink. forRowFormat(new Path ("hdfs://node1:8020/FlinkStreamFileSink/parquet" ), new SimpleStringEncoder <String>("UTF-8" )) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(15 )) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5 )) .withMaxPartSize(1024 * 1024 * 1024 ) .build()) .withOutputFileConfig(config) .build(); result.addSink(streamingFileSink); env.execute(); } }
Flink-高级特性-Flie Sink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 import org.apache.commons.lang3.SystemUtils;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringEncoder;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.connector.file.sink.FileSink;import org.apache.flink.core.fs.Path;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;public class FileSinkDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.enableCheckpointing(1000 ); if (SystemUtils.IS_OS_WINDOWS) { env.setStateBackend(new FsStateBackend ("file:///D:/ckp" )); } else { env.setStateBackend(new FsStateBackend ("hdfs://node1:8020/flink-checkpoint/checkpoint" )); } env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500 ); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10 ); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000 ); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1 ); DataStream<String> lines = env.socketTextStream("node1" , 9999 ); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction <String, Tuple2<String, Integer>>() { @Override public void flatMap (String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] arr = value.split(" " ); for (String word : arr) { out.collect(Tuple2.of(word, 1 )); } } }); SingleOutputStreamOperator<String> result = wordAndOne.keyBy(t -> t.f0).sum(1 ) .map(new MapFunction <Tuple2<String, Integer>, String>() { @Override public String map (Tuple2<String, Integer> value) throws Exception { return value.f0 + ":" + value.f1; } }); result.print(); OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("prefix" ) .withPartSuffix(".txt" ) .build(); FileSink<String> sink = FileSink .forRowFormat(new Path ("hdfs://node1:8020/FlinkFileSink/parquet" ), new SimpleStringEncoder <String>("UTF-8" )) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(15 )) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5 )) .withMaxPartSize(1024 * 1024 * 1024 ) .build()) .withOutputFileConfig(config) .withBucketAssigner(new DateTimeBucketAssigner ("yyyy-MM-dd--HH" )) .build(); result.sinkTo(sink); env.execute(); } }
Flink监控 https://blog.lovedata.net/8156c1e1.html
什么是Metrics
Metrics分类
代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.metrics.Counter;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class MetricsDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); DataStream<String> lines = env.socketTextStream("node1" , 9999 ); SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction <String, String>() { @Override public void flatMap (String value, Collector<String> out) throws Exception { String[] arr = value.split(" " ); for (String word : arr) { out.collect(word); } } }); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words .map(new RichMapFunction <String, Tuple2<String, Integer>>() { Counter myCounter; @Override public void open (Configuration parameters) throws Exception { myCounter = getRuntimeContext().getMetricGroup().addGroup("myGroup" ).counter("myCounter" ); } @Override public Tuple2<String, Integer> map (String value) throws Exception { myCounter.inc(); return Tuple2.of(value, 1 ); } }); SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1 ); result.print(); env.execute(); } }
提交观察UI
1.打包
2.提交到Yarn上运行
3.查看监控指标
4.也可以通过浏览器f12的找到url发送请求获取监控信息
5.也可以通过代码发送请求获取监控信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 import java.io.BufferedReader;import java.io.InputStreamReader;import java.net.URL;import java.net.URLConnection;public class MetricsTest { public static void main (String[] args) { String result = sendGet("http://node1:8088/proxy/application_1609508087977_0010/jobs/558a5a3016661f1d732228330ebfaad5" ); System.out.println(result); } public static String sendGet (String url) { String result = "" ; BufferedReader in = null ; try { String urlNameString = url; URL realUrl = new URL (urlNameString); URLConnection connection = realUrl.openConnection(); connection.setRequestProperty("accept" , "*/*" ); connection.setRequestProperty("connection" , "Keep-Alive" ); connection.setRequestProperty("user-agent" , "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)" ); connection.connect(); in = new BufferedReader (new InputStreamReader (connection.getInputStream())); String line; while ((line = in.readLine()) != null ) { result += line; } } catch (Exception e) { System.out.println("发送GET请求出现异常!" + e); e.printStackTrace(); } finally { try { if (in != null ) { in.close(); } } catch (Exception e2) { e2.printStackTrace(); } } return result; } }
6.也可以整合三方工具对flink进行监控
https://blog.lovedata.net/8156c1e1.html
Flink内存管理
1 2 # --—提交参数——— /export/server/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 -p 6 -ys 2 streaming.jar -c dictionary_utils.OneDictForExecutor
Flink性能优化 问题定位口诀 一压 二查三指标 ,延迟吞吐是核心。
时刻关注资源量,排查首先看GC 。
口诀解析
常见性能问题 1.序列化和反序列化
2.数据倾斜
3.频繁gc
4.外部系统
5.大窗口
经典场景调优 数据去重
数据倾斜
内存调优
实时场景常见问题排查 1.数据延迟:(本身有动态背压机制credit-base,但是业务不接受延迟太久,还是会手动处理,重启。)
①确认本身的处理耗时情况,如果本身处理耗时不高,延迟高,那就是上游问题;
①本身耗时高:
是否访问量比平时大了,如果是则增加并发;
是否有访问外部系统,热点数据导致的部分线程处理速度变慢;–线程池和cache
2.数据倾斜:
①rebalance
②窗口计算有倾斜,可以加随机前缀分成两段聚合;
②使用水印不触发,也会导致数据堆积
3.内存溢出
改用G1垃圾回收器,和jvm调优。 窗口数据高峰时溢出,从数据结构、存储方式、压缩上面再考虑。
rocksdb的优化:block size和block cache size可调大,compaction的size是合并大小可调大,flush和compaction后台线程数可调大,关注性能,可关闭压缩。
1.复用对象
1 2 3 4 5 6 7 8 9 stream .apply(new WindowFunction <WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() { @Override public void apply (String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception { long changesCount = ... collector.collect(new Tuple2 <>(userName, changesCount)); } }
上面的代码可以优化为下面的代码:
可以避免Tuple2的重复创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 stream .apply(new WindowFunction <WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() { private Tuple2<String, Long> result = new Tuple <>(); @Override public void apply (String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception { long changesCount = ... result.f0 = userName; result.f1 = changesCount; collector.collect(result); } }
Flink Table&SQL 为什么需要Table&SQL
发展历史
两种Table planners 1.旧的planner( 创建环境时useOldPlanner)
2.Blink planner (创建环境时useBlinkPlanner(),新版本默认)
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html
核心概念
动态表/无界表
连续查询/需要借助State
Flink Table & SQL 基本操作 案例1
将DataStream数据转Table和View然后使用sql进行统计查询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.Arrays;import static org.apache.flink.table.api.Expressions.$;public class Demo01 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings); DataStream<Order> orderA = env.fromCollection(Arrays.asList( new Order (1L , "beer" , 3 ), new Order (1L , "diaper" , 4 ), new Order (3L , "rubber" , 2 ))); DataStream<Order> orderB = env.fromCollection(Arrays.asList( new Order (2L , "pen" , 3 ), new Order (2L , "rubber" , 3 ), new Order (4L , "beer" , 1 ))); Table tableA = tenv.fromDataStream(orderA, $("user" ), $("product" ), $("amount" )); tableA.printSchema(); System.out.println(tableA); tenv.createTemporaryView("tableB" , orderB, $("user" ), $("product" ), $("amount" )); String sql = "select * from " +tableA+" where amount > 2 \n" + "union \n" + " select * from tableB where amount > 1" ; Table resultTable = tenv.sqlQuery(sql); resultTable.printSchema(); System.out.println(resultTable); DataStream<Tuple2<Boolean, Order>> resultDS = tenv.toRetractStream(resultTable, Order.class); resultDS.print(); env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Order { public Long user; public String product; public int amount; } }
案例2
使用Table/DSL风格和SQL风格完成WordCount
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class Demo02 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings); DataStream<WC> wordsDS = env.fromElements( new WC ("Hello" , 1 ), new WC ("World" , 1 ), new WC ("Hello" , 1 ) ); tenv.createTemporaryView("t_words" , wordsDS,$("word" ), $("frequency" )); String sql = "select word,sum(frequency) as frequency\n " + "from t_words\n " + "group by word" ; Table resultTable = tenv.sqlQuery(sql); DataStream<Tuple2<Boolean, WC>> resultDS = tenv.toRetractStream(resultTable, WC.class); resultDS.print(); env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class WC { public String word; public long frequency; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class Demo02_2 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings); DataStream<WC> wordsDS = env.fromElements( new WC ("Hello" , 1 ), new WC ("World" , 1 ), new WC ("Hello" , 1 ) ); Table table = tenv.fromDataStream(wordsDS); Table resultTable = table .groupBy($("word" )) .select($("word" ), $("frequency" ).sum().as("frequency" )) .filter($("frequency" ).isEqual(2 )); DataStream<Tuple2<Boolean, WC>> resultDS = tenv.toRetractStream(resultTable, WC.class); resultDS.print(); env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class WC { public String word; public long frequency; } }
案例3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import java.time.Duration;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;import static org.apache.flink.table.api.Expressions.$;public class Demo03 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings); DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction <Order>() { private Boolean isRunning = true ; @Override public void run (SourceContext<Order> ctx) throws Exception { Random random = new Random (); while (isRunning) { Order order = new Order (UUID.randomUUID().toString(), random.nextInt(3 ), random.nextInt(101 ), System.currentTimeMillis()); TimeUnit.SECONDS.sleep(1 ); ctx.collect(order); } } @Override public void cancel () { isRunning = false ; } }); DataStream<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5 )) .withTimestampAssigner((order, recordTimestamp) -> order.getCreateTime()) ); tenv.createTemporaryView("t_order" ,orderDSWithWatermark,$("orderId" ), $("userId" ), $("money" ), $("createTime" ).rowtime()); String sql = "select userId, count(orderId) as orderCount, max(money) as maxMoney,min(money) as minMoney\n " + "from t_order\n " + "group by userId,\n " + "tumble(createTime, INTERVAL '5' SECOND)" ; Table resultTable = tenv.sqlQuery(sql); DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(resultTable, Row.class); resultDS.print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long createTime; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.Tumble;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import java.time.Duration;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;import static org.apache.flink.table.api.Expressions.$;import static org.apache.flink.table.api.Expressions.lit;public class Demo03_2 { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env); DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction <Order>() { private Boolean isRunning = true ; @Override public void run (SourceContext<Order> ctx) throws Exception { Random random = new Random (); while (isRunning) { Order order = new Order (UUID.randomUUID().toString(), random.nextInt(3 ), random.nextInt(101 ), System.currentTimeMillis()); TimeUnit.SECONDS.sleep(1 ); ctx.collect(order); } } @Override public void cancel () { isRunning = false ; } }); DataStream<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5 )) .withTimestampAssigner((order, recordTimestamp) -> order.getCreateTime()) ); tenv.createTemporaryView("t_order" ,orderDSWithWatermark,$("orderId" ), $("userId" ), $("money" ), $("createTime" ).rowtime()); Table resultTable = tenv.from("t_order" ) .window(Tumble.over(lit(5 ).second()) .on($("createTime" )) .as(" " )) .groupBy($("tumbleWindow" ), $("userId" )) .select( $("userId" ), $("orderId" ).count().as("orderCount" ), $("money" ).max().as("maxMoney" ), $("money" ).min().as("minMoney" ) ); DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(resultTable, Row.class); resultDS.print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long createTime; } }
Flink SQL空闲状态保留时间(idle state retention time) Flink SQL新手有可能犯的错误,笔者认为其中之一就是忘记设置空闲状态保留时间导致状态爆炸 。
为什么要设置 如果我们在数据流上进行分组查询,分组处理产生的结果(不仅仅是聚合结果)会作为中间状态存储下来。随着分组key的不断增加,状态自然也会不断膨胀。但是这些状态数据基本都有时效性,不必永久保留。例如,使用Top-N语法进行去重,重复数据的出现一般都位于特定区间内(例如一小时或一天内),过了这段时间之后,对应的状态就不再需要了。Flink SQL提供的idle state retention time特性可以保证当状态中某个key对应的数据未更新的时间达到阈值时,该条状态被自动清理。设置方法是:
1 tbEnv.getConfig().setIdleStateRetention(Duration.ofDays(1 ));
注意setIdleStateRetentionTime()方法需要传入两个参数:状态的最小保留时间minRetentionTime和最大保留时间maxRetentionTime(根据实际业务决定),且两者至少相差5分钟。如果minRetentionTime和maxRetentionTime的间隔设置太小,就会比较频繁地产生Timer与更新ValueState,维护Timer的成本会变大
Flink-高级特性-新特性-FlinkSQL整合Hiv 1.介绍
版本
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/
添加依赖和jar包和配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.12 </artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-metastore</artifactId> <version>2.1 .0 </version> <exclusions> <exclusion> <artifactId>hadoop-hdfs</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1 .0 </version> </dependency>
上传资料hive中的jar包到flink/lib中
FlinkSQL整合Hive-CLI命令行整合
1.修改hive-site.xml
1 2 3 4 <property> <name>hive.metastore.uris</name> <value>thrift://node3:9083</value> </property>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 <?xml-stylesheet type="text/xsl" href="configuration.xsl" ?> <configuration > <property > <name > javax.jdo.option.ConnectionUserName</name > <value > root</value > </property > <property > <name > javax.jdo.option.ConnectionPassword</name > <value > 123456</value > </property > <property > <name > javax.jdo.option.ConnectionURL</name > <value > jdbc:mysql://node3:3306/hive?createDatabaseIfNotExist=true& useSSL=false</value > </property > <property > <name > javax.jdo.option.ConnectionDriverName</name > <value > com.mysql.jdbc.Driver</value > </property > <property > <name > hive.metastore.schema.verification</name > <value > false</value > </property > <property > <name > datanucleus.schema.autoCreateAll</name > <value > true</value > </property > <property > <name > hive.server2.thrift.bind.host</name > <value > node3</value > </property > <property > <name > hive.metastore.uris</name > <value > thrift://node3:9083</value > </property > </configuration >
2.启动元数据服务
nohup /export/server/hive/bin/hive –service metastore &
3.修改flink/conf/sql-client-defaults.yaml
1 2 3 4 5 catalogs: - name: myhive type: hive hive-conf-dir: /export/server/hive/conf default-database: default
4.分发
5.启动flink集群
/export/server/flink/bin/start-cluster.sh
6.启动flink-sql客户端-hive在哪就在哪启
/export/server/flink/bin/sql-client.sh embedded
7.执行sql:
show catalogs;
use catalog myhive;
show tables;
select * from person;
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.TableResult;import org.apache.flink.table.catalog.hive.HiveCatalog;public class HiveDemo { public static void main (String[] args) { EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); String name = "myhive" ; String defaultDatabase = "default" ; String hiveConfDir = "./conf" ; HiveCatalog hive = new HiveCatalog (name, defaultDatabase, hiveConfDir); tableEnv.registerCatalog("myhive" , hive); tableEnv.useCatalog("myhive" ); String insertSQL = "insert into person select * from person" ; TableResult result = tableEnv.executeSql(insertSQL); System.out.println(result.getJobClient().get().getJobStatus()); } }
整合kafka和hive 案例4(掌握从datastream转sql,sql转datastream)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 import org.apache.commons.lang3.time.FastDateFormat;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.configuration.Configuration;import org.apache.flink.configuration.RestOptions;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableResult;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.catalog.hive.HiveCatalog;import org.apache.flink.types.Row;import java.io.Serializable;import java.sql.Timestamp;public class KafkaToHive { public static void main (String[] args) throws Exception { Configuration configuration = new Configuration (); configuration.setString(RestOptions.BIND_PORT,"8081-8089" ); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env,settings); env.enableCheckpointing(10 *60 *1000 , CheckpointingMode.EXACTLY_ONCE); env.setParallelism(3 ); SingleOutputStreamOperator<Order> socket = env.socketTextStream("node1" , 9999 ).rebalance().map(new RichMapFunction <String, Order>() { @Override public Order map (String value) throws Exception { String[] timeAndWord = value.split("," ); Timestamp timestamp = Timestamp.valueOf(timeAndWord[0 ]); Order order = new Order (); order.setOrderId(timeAndWord[1 ]); order.setMoney(1L ); order.setEvenTime(timestamp.getTime()); return order; } }); tbEnv.createTemporaryView("socket" ,socket); tbEnv.executeSql("DROP TABLE IF EXISTS ODS" ); TableResult odsResult = tbEnv.executeSql("CREATE TABLE ODS (\n" + " `orderId` STRING,\n" + " `userId` INT,\n" + " `money` BIGINT,\n" + " `evenTime` BIGINT\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'KafkaWordCount',\n" + " 'properties.bootstrap.servers' = 'node1:9092',\n" + " 'format' = 'json',\n" + " 'sink.partitioner' = 'round-robin'\n" + ") " ); String sql="insert into ODS select orderId,userId,money,evenTime from socket" ; TableResult result = tbEnv.executeSql(sql); tbEnv.executeSql("DROP TABLE IF EXISTS DW" ); TableResult dwResult = tbEnv.executeSql("CREATE TABLE DW (\n" + " `orderId` STRING,\n" + " `userId` INT,\n" + " `money` BIGINT,\n" + " `evenTime` BIGINT\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'KafkaWordCount',\n" + " 'properties.bootstrap.servers' = 'node1:9092',\n" + " 'properties.group.id' = 'sink_to_hive',\n" + " 'scan.startup.mode' = 'group-offsets',\n" + " 'format' = 'json'\n" + ")" ); String name = "myhive" ; String defaultDatabase = "default" ; String hiveConfDir = "E:\\BigData\\workspace\\Flink\\Flink_study\\src\\main\\resources" ; HiveCatalog hive = new HiveCatalog (name, defaultDatabase, hiveConfDir); tbEnv.registerCatalog("myhive" , hive); tbEnv.useCatalog("myhive" ); tbEnv.executeSql("DROP TABLE IF EXISTS OrderTb" ); TableResult OrderTb = tbEnv.executeSql("create table if not EXISTS OrderTb(" + " `orderId` STRING,\n" + " `userId` INT,\n" + " `money` BIGINT,\n" + " `evenTime` BIGINT\n" + ")WITH (\n" + " 'is_generic'='false' )" ); tbEnv.executeSql("insert into OrderTb select orderId,userId,money,evenTime from default_catalog.default_database.DW" ); Table table = tbEnv.sqlQuery("select * from default_catalog.default_database.DW " ); tbEnv.toRetractStream(table, TypeInformation.of(Row.class)).print(); env.execute(); } public static class Order implements Serializable { private String orderId; private Integer userId; private Long money; private Long evenTime; public Order (String orderId, Integer userId, Long money, Long evenTime) { this .orderId = orderId; this .userId = userId; this .money = money; this .evenTime = evenTime; } public Order () { } public String getOrderId () { return orderId; } public void setOrderId (String orderId) { this .orderId = orderId; } public Integer getUserId () { return userId; } public void setUserId (Integer userId) { this .userId = userId; } public Long getMoney () { return money; } public void setMoney (Long money) { this .money = money; } public Long getEvenTime () { return evenTime; } public void setEvenTime (Long evenTime) { this .evenTime = evenTime; } @Override public String toString () { FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss" ); return "Order{" + "orderId='" + orderId + '\'' + ", userId=" + userId + ", money=" + money + ", evenTime=" + df.format(evenTime) + '}' ; } } }
Flink-练习-双十一实时交易大屏-掌握 需 求
数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public static class MySource implements SourceFunction <Tuple2<String, Double>> { private boolean flag = true ; private String[] categorys = {"女装" , "男装" , "图书" , "家电" , "洗护" , "美妆" , "运动" , "游戏" , "户外" , "家具" , "乐器" , "办公" }; private Random random = new Random (); @Override public void run (SourceContext<Tuple2<String, Double>> ctx) throws Exception { while (flag) { int index = random.nextInt(categorys.length); String category = categorys[index]; double price = random.nextDouble() * 100 ; ctx.collect(Tuple2.of(category, price)); Thread.sleep(20 ); } } @Override public void cancel () { flag = false ; } \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
实现步骤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 1.env 2.source 3.transformation--预聚合 3.1定义大小为一天的窗口,第二个参数表示中国使用的UTC+08:00时区比UTC时间早 keyBy(t->t.f0) window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)) 3.2定义一个1s的触发器 .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) 3.3聚合结果.aggregate(new PriceAggregate(), new WindowResult()); 3.4看一下聚合的结果 CategoryPojo(category=男装, totalPrice=17225.26, dateTime=2020-10-20 08:04:12) 4.sink-使用上面预聚合的结果,实现业务需求: tempAggResult.keyBy(CategoryPojo::getDateTime) //每秒钟更新一次统计结果 .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) //在ProcessWindowFunction中实现该复杂业务逻辑 .process(new WindowResultProcess()); 4.1.实时计算出当天零点截止到当前时间的销售总额 4.2.计算出各个分类的销售top3 4.3.每秒钟更新一次统计结果 5.execute
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.commons.lang3.StringUtils;import org.apache.commons.lang3.time.FastDateFormat;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.AggregateFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;import java.math.BigDecimal;import java.math.RoundingMode;import java.util.List;import java.util.PriorityQueue;import java.util.Queue;import java.util.Random;import java.util.stream.Collectors;public class DoubleElevenBigScreem { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1 ); DataStream<Tuple2<String, Double>> orderDS = env.addSource(new MySource ()); DataStream<CategoryPojo> tempAggResult = orderDS .keyBy(t -> t.f0) .window(TumblingProcessingTimeWindows.of(Time.days(1 ), Time.hours(-8 ))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1 ))) .aggregate(new PriceAggregate (), new WindowResult ()); tempAggResult.print("初步聚合的各个分类的销售总额" ); tempAggResult.keyBy(CategoryPojo::getDateTime) .window(TumblingProcessingTimeWindows.of(Time.seconds(1 ))) .process(new FinalResultWindowProcess ()); env.execute(); } public static class MySource implements SourceFunction <Tuple2<String, Double>> { private boolean flag = true ; private String[] categorys = {"女装" , "男装" , "图书" , "家电" , "洗护" , "美妆" , "运动" , "游戏" , "户外" , "家具" , "乐器" , "办公" }; private Random random = new Random (); @Override public void run (SourceContext<Tuple2<String, Double>> ctx) throws Exception { while (flag) { int index = random.nextInt(categorys.length); String category = categorys[index]; double price = random.nextDouble() * 100 ; ctx.collect(Tuple2.of(category, price)); Thread.sleep(20 ); } } @Override public void cancel () { flag = false ; } } private static class PriceAggregate implements AggregateFunction <Tuple2<String, Double>, Double, Double> { @Override public Double createAccumulator () { return 0D ; } @Override public Double add (Tuple2<String, Double> value, Double accumulator) { return value.f1 + accumulator; } @Override public Double getResult (Double accumulator) { return accumulator; } @Override public Double merge (Double a, Double b) { return a + b; } } private static class WindowResult implements WindowFunction <Double, CategoryPojo, String, TimeWindow> { private FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss" ); @Override public void apply (String category, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception { long currentTimeMillis = System.currentTimeMillis(); String dateTime = df.format(currentTimeMillis); Double totalPrice = input.iterator().next(); out.collect(new CategoryPojo (category,totalPrice,dateTime)); } } @Data @AllArgsConstructor @NoArgsConstructor public static class CategoryPojo { private String category; private double totalPrice; private String dateTime; } private static class FinalResultWindowProcess extends ProcessWindowFunction <CategoryPojo, Object, String, TimeWindow> { @Override public void process (String dateTime, Context context, Iterable<CategoryPojo> elements, Collector<Object> out) throws Exception { double total = 0D ; Queue<CategoryPojo> queue = new PriorityQueue <>(3 , (c1, c2) -> c1.getTotalPrice() >= c2.getTotalPrice() ? 1 : -1 ); for (CategoryPojo element : elements) { double price = element.getTotalPrice(); total += price; if (queue.size()< 3 ){ queue.add(element); }else { if (price >= queue.peek().getTotalPrice()){ queue.poll(); queue.add(element); } } } List<String> top3List = queue.stream() .sorted((c1, c2) -> c1.getTotalPrice() >= c2.getTotalPrice() ? -1 : 1 ) .map(c -> "分类:" + c.getCategory() + " 金额:" + c.getTotalPrice()) .collect(Collectors.toList()); double roundResult = new BigDecimal (total).setScale(2 , RoundingMode.HALF_UP).doubleValue(); System.out.println("时间: " +dateTime +" 总金额 :" + roundResult); System.out.println("top3: \n" + StringUtils.join(top3List,"\n" )); } } }
Flink-练习-订单自动好评-掌握 需求
数据
思考:开窗口计算?
按照一定规则收集数据计算,规则是针对所有数据,跟单条数据无关
每个订单有每个订单的倒计时开始时间都不一样,不能用窗口计算。
计算的触发和结束由每条数据/key自己决定,不能用窗口,得自己建立一个状态保存。
思考:
1.进来一条数据就判断是否好评,还是都先保存状态倒计时,然后到时间再统一判断是否超时。
结合实际情况,如果评价超时多,那就后判断,但是已经好评的就会保存state,占用空间,这是牺牲空间换时间。如果都是积极评价的那选择先判断,牺牲时间换空间思路。
2.状态用valuestate 还是mapstate?
用valuestate就行,不需要获取订单完成时间,倒计时已经记录了时间
3.现实好评都是隔很多天比如15天,这样存储的状态就大了,没必要实时搞,可以按小时/天去更新状态。
4.状态过期处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static class MySource implements SourceFunction <Tuple3<String, String, Long>> { private boolean flag = true ; @Override public void run (SourceContext<Tuple3<String, String, Long>> ctx) throws Exception { Random random = new Random (); while (flag) { String userId = random.nextInt(5 ) + "" ; String orderId = UUID.randomUUID().toString(); long currentTimeMillis = System.currentTimeMillis(); ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis)); Thread.sleep(500 ); } } @Override public void cancel () { flag = false ; } }
实现步骤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 1.env 2.source 3.transformation 设置经过interval毫秒用户未对订单做出评价,自动给与好评.为了演示方便,设置5s的时间 long interval = 5000L; 分组后使用自定义KeyedProcessFunction完成定时判断超时订单并自动好评 dataStream.keyBy(0).process(new TimerProcessFuntion(interval)); 3.1定义MapState类型的状态,key是订单号,value是订单完成时间 3.2创建MapState MapStateDescriptor<String, Long> mapStateDesc = new MapStateDescriptor<>("mapStateDesc", String.class, Long.class); mapState = getRuntimeContext().getMapState(mapStateDesc); 3.3注册定时器 mapState.put(value.f0, value.f1); ctx.timerService().registerProcessingTimeTimer(value.f1 + interval); 3.4定时器被触发时执行并输出结果 4.sink 5.execute
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.state.MapState;import org.apache.flink.api.common.state.MapStateDescriptor;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.util.Collector;import java.util.Iterator;import java.util.Map;import java.util.Random;import java.util.UUID;public class OrderAutomaticFavorableComments { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1 ); DataStream<Tuple3<String, String, Long>> orderDS = env.addSource(new MySource ()); long interval = 5000L ; orderDS.keyBy(t -> t.f0) .process(new TimerProcessFunction (interval)); env.execute(); } public static class MySource implements SourceFunction <Tuple3<String, String, Long>> { private boolean flag = true ; @Override public void run (SourceContext<Tuple3<String, String, Long>> ctx) throws Exception { Random random = new Random (); while (flag) { String userId = random.nextInt(5 ) + "" ; String orderId = UUID.randomUUID().toString(); long currentTimeMillis = System.currentTimeMillis(); ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis)); Thread.sleep(500 ); } } @Override public void cancel () { flag = false ; } } private static class TimerProcessFunction extends KeyedProcessFunction <String, Tuple3<String, String, Long>, Object> { private long interval; public TimerProcessFunction (long interval) { this .interval = interval; } private MapState<String, Long> mapState = null ; @Override public void open (Configuration parameters) throws Exception { MapStateDescriptor<String, Long> mapStateDescriptor = new MapStateDescriptor <>("mapState" , String.class, Long.class); mapState = getRuntimeContext().getMapState(mapStateDescriptor); } @Override public void processElement (Tuple3<String, String, Long> value, Context ctx, Collector<Object> out) throws Exception { mapState.put(value.f1, value.f2); ctx.timerService().registerProcessingTimeTimer(value.f2 + interval); } @Override public void onTimer (long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception { Iterator<Map.Entry<String, Long>> iterator = mapState.iterator(); while (iterator.hasNext()) { Map.Entry<String, Long> map = iterator.next(); String orderId = map.getKey(); Long orderTime = map.getValue(); if (!isFavorable(orderId)) { if (System.currentTimeMillis() - orderTime >= interval) { System.out.println("orderId:" + orderId + "该订单已经超时未评价,系统自动给与好评!...." ); iterator.remove(); mapState.remove(orderId); } } else { System.out.println("orderId:" + orderId + "该订单已经评价...." ); iterator.remove(); mapState.remove(orderId); } } } public boolean isFavorable (String orderId) { return orderId.hashCode() % 2 == 0 ; } } }