spark

Spark入门

四代计算引擎

  • 第一代引擎:MR

  • 第二代引擎:Hive(MR,Spark,Tez) 部分支持DAG(有向无环图)

  • 第三代引擎:Spark和Impala(完全支持DAG)

  • 第四代计算引擎:批流统一FLink(完全支持DAG)

  • 技术发展:

  • spark.assets/image-20210327102855537.png
  • spark.assets/image-20210327103436785.png
  • spark.assets/image-20210327103519977.png
  • spark.assets/image-20210327103617027.png
  • spark.assets/image-20210327103902124.png
  • 面试题:Hadoop的基于进程的计算和Spark基于线程方式优缺点?

    • 只需要回答进程和线程的区别

    • 线程基本概念

      l 线程是CPU的基本调度单位

      l 一个进程一般包含多个线程, 一个进程下的多个线程共享进程的资源

      l 不同进程之间的线程相互不可见

      l 线程不能独立执行

      l 一个线程可以创建和撤销另外一个线程

Spark的部署

  • spark.assets/image-20210327105704323.png

local模式

  • spark.assets/image-20210327105933324.png
  • 如何安装?

    • local模式,开箱即用模式,使用测试模式下
    • spark.assets/image-20210327110806723.png
  • 如何spark-shell应用?

    • bin/spark-shell –master local[3]
    • spark.assets/image-20210327111138411.png
    • spark.assets/image-20210327111507583.png
    • spark.assets/image-20210327111806641.png
    • wordcount
    • spark.assets/image-20210327112250810.png
    • 本地文件执行wordcount
    • spark.assets/image-20210327113020760.png
    • hdfs文件执行wordcount
    • spark.assets/image-20210327113147099.png
    • 了解Spark的任务流程
    • spark.assets/image-20210327113000682.png
    • 深入原理:了解Shuffle
    • spark.assets/image-20210327114319204.png
    • 理解:为什么会生成两个文件夹?
    • spark.assets/image-20210327114644040.png
  • 如何提交任务?

    • spark.assets/image-20210327120055333.png
1
2
3
4
5
6
# Spark-local模式提交任务
bin/spark-submit \
--master local[3] \
--class org.apache.spark.examples.SparkPi \
/export/server/spark/examples/jars/spark-examples_2.11-2.4.5.jar \
10

Standalone

  • 2-Standalone模式学会查看

    • spark.assets/image-20210327110202972.png
  • 如何安装?

    • 根据架构安装
    • spark.assets/image-20210327120358367.png
    • 配置文件中需要更改:
      • 1-指定谁是Master,谁是Worker
      • 2-指定Master的通信地址,7077,指定Master的WebUi地址,8080
      • 3-可选项指定WOrker通信地址和Worker的WebUi地址
      • 4-配置Spark的历史日志服务器
        • 为什么配置?因为如果执行Spark-Shell启动4040在关闭当前应用窗口之后无法查看UI
        • 如何配置?需要将Spark的历史日志服务器的日志写入到HDFS分布式文件系统
    • 配置:
    • spark.assets/image-20210327121309024.png
    • spark.assets/image-20210327120815023.png
    • 查看WebUi
    • spark.assets/image-20210327121628357.png
  • 如何spark-shell?

    • bin/spark-shell –master spark://node1:7077
    • spark.assets/image-20210327122102781.png
  • 如何提交Spark任务?

    • spark.assets/image-20210327121829846.png
1
2
3
4
5
bin/spark-submit \
--master spark://node1:7077 \
--class org.apache.spark.examples.SparkPi \
/export/server/spark/examples/jars/spark-examples_2.11-2.4.5.jar \
10
  • spark.assets/image-20210327121933226.png

StandaloneHA

  • 3-standaloneHA模式

    • 1-架构

    • spark.assets/image-20210327143454035.png
    • 2-如何搭建

      • 1-首先注释spark-env.sh的master节点的部分(因为zk选举)
      • 2-增加zk选举的配置项
      • 3-进一步分发即可
    • spark.assets/image-20210327143733819.png
    • 3-如何执行spark-shell(测试)

    • bin/spark-shell –master spark://node1:7077,node2:7077

    • 启动遇到的问题

    • spark.assets/image-20210327144140349.png
    • 解决:在node2上启动master

    • spark.assets/image-20210327144240038.png
    • 正常启动

    • spark.assets/image-20210327144340319.png
    • 可以执行简单的rdd创建

    • spark.assets/image-20210327144507857.png
    • 4-如何提交任务(实际生产环境)

1
2
3
4
5
bin/spark-submit \
--master spark://node1:7077,node2:7077 \
--class org.apache.spark.examples.SparkPi \
/export/server/spark/examples/jars/spark-examples_2.11-2.4.5.jar \
10
  • spark.assets/image-20210327144639336.png
  • 观察现象:

  • spark.assets/image-20210327144912641.png
  • spark.assets/image-20210327145319768.png

SparkOnYarn

  • 使用的Yarn负责资源管理和调度

  • Driver申请资源—–AppMaster应用管理器—-ResourceManager—--NodeManager—-Container—-Task任务

  • (1)搭建Yarn环境

    • 1-首先需要在spark-env.sh中配置HADOOP_CONF和Yarn_CONF
    • spark.assets/image-20210327162321701.png2-需要关闭内存检查
    • spark.assets/image-20210327162502738.png
    • 3-需要整合Yarn的历史日志服务器和Spark的历史日志服务器
      • 为什么整合?
        • 答案:因为Yarn的历史日志服务器中的历史Job无法查看具体spark的任务使用了多少executor和内存
    • spark.assets/image-20210327162647036.png
    • spark.assets/image-20210327163011193.png
  • (2)提交Yarn任务

1
2
3
4
5
bin/spark-submit \
--master yarn \
--class org.apache.spark.examples.SparkPi \
/export/server/spark/examples/jars/spark-examples_2.11-2.4.5.jar \
10
  • spark.assets/image-20210327163343546.png
  • (3)查看监控—8088,历史日志服务器19888

  • spark.assets/image-20210327163515924.png
  • spark.assets/image-20210327163617881.png

重难点知识

  • Spark部署模式
  • SparkOnYarn的两种deploymode模式原理
  • 掌握基于Scala的WOrdcount的IDEA编码
  • 学会提交Jar包集群跑
  • 需要形成属于自己的脚本

Spark的两种Mode模式(集群模式均可用)

  • 集群模式:standalone和HA和Yarn

  • 本质区别:Cluster和Client模式最最本质的区别是:Driver程序运行在哪里。

  • spark.assets/image-20210327164001715.png
  • 第一种Client:driver启动在本地

  • spark.assets/image-20210327164611022.png
1
2
3
4
5
6
bin/spark-submit \
--master spark://node1:7077 \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
/export/server/spark-2.4.5-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.5.jar \
10
  • spark.assets/image-20210327165606268.png
  • 第二种Cluster:driver启动在由Master指定的worker节点上

  • spark.assets/image-20210327164914937.png
1
2
3
4
5
6
7
# Spark-standalone模式提交任务-cluster
bin/spark-submit \
--master spark://node1:7077 \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
/export/server/spark-2.4.5-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.5.jar \
10
  • spark.assets/image-20210327165656647.png
  • spark.assets/image-20210327165819301.png
  • spark.assets/image-20210327165853693.png
  • spark.assets/image-20210327165907321.png
  • 总结:

    • 本质:driver启动在哪里,如果启动在client端客户端直接看到结果,否则需要worker节点日志查看

Spark的OnYarn原理详解

  • Client
  • 1-首先Driver启动在Client端的

  • 2-向ResourceManager申请启动AppMaster

  • 3-由AppMaster向ResourceManager申请资源

  • 4-由ResourceManager(返回资源列表到AppMaster在)指定NodeManager启动Executor进程

  • 5-Executor就是资源反向注册到Driver端

  • 6-Driver端继续执行计算任务—–DAGScheduler和TaskScheduler

  • 注意:执行到Action算子时,触发一个Job,并根据宽依赖开始划分Stage,每个Stage生成对应的TaskSet,之后将Task分发到各个Executor上执行

  • spark.assets/image-20210327173511877.png
  • 命令

1
2
3
4
5
6
bin/spark-submit \
--master yarn \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
/export/server/spark/examples/jars/spark-examples_2.11-2.4.5.jar \
10
  • spark.assets/image-20210327175342898.png
  • Cluster原理
  • spark.assets/image-20210327173910704.png
  • spark.assets/image-20210327174352261.png
  • spark.assets/image-20210327175118488.png
1
2
3
4
5
6
bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--class org.apache.spark.examples.SparkPi \
/export/server/spark/examples/jars/spark-examples_2.11-2.4.5.jar \
10
  • 如何查询结果(conf 底下的log4j.properties.tmplete需复制一份log4j.properties,否则看不到stdout的东西):

  • spark.assets/image-20210327175614690.png
  • spark.assets/image-20210327175655139.png
  • spark.assets/image-20210327175714557.png
  • 查看Container容器和Executor的关系

  • 答案:Executor是运行在Continer里面

  • Executor是进程,Task是线程,最终执行计算的是一个线程执行一个分区的Task任务

  • spark.assets/image-20210327175929005.png
  • spark.assets/image-20210327180106135.png

Spark的IDEA编程指南

创建项目
  • 步骤:

  • 1-首先明确项目名称和包名

    • bigdata-spark_2.11
    • spark-chapter01_2.11
    • spark.assets/image-20210327150541284.png
  • 2-加载pom文件,记得修改本地maven库一直使用的库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
<properties>
<encoding>UTF-8</encoding>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.5</spark.version>
<hadoop.version>2.7.5</hadoop.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<!-- 后续使用 -->
<!--SparkSQL+ Hive依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-streaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- SparkMlLib机器学习模块,里面有ALS推荐算法-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--spark-streaming+Kafka依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--StructuredStreaming+Kafka依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.7.7</version>
</dependency>
<!-- Redis客户端工具-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>

<build>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<!-- Maven 编译的插件 -->
<plugins>
<!-- 指定编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- 指定编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Scala编程指南[掌握]
  • 3-构建基础Scala的WordCount版本

  • 需求:使用IDEA实现wordcount案例

  • 步骤:

    • 1-首先创建SparkContent上下文环境
    • 2-从外部文件数据源读取数据
    • 3-执行flatmap执行扁平化操作
    • 4-执行map转化操作,(word,1)
    • 5-reduceByKey(预聚合)
    • 6-输出到文件系统或打印即可
  • 结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.spark.sparkbase

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* DESC:
* 1-首先创建SparkContent上下文环境
* 2-从外部文件数据源读取数据
* 3-执行flatmap执行扁平化操作
* 4-执行map转化操作,(word,1)
* 5-reduceByKey(预聚合)
* 6-输出到文件系统或打印即可
*/
object _01SparkWordCount {
def main(args: Array[String]): Unit = {
//* 1-首先创建SparkContext上下文环境
val conf: SparkConf = new SparkConf().setAppName("_01SparkWordCount").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
//sc.setLogLevel("WARN")
//* 2-从外部文件数据源读取数据
val fileRDD: RDD[String] = sc.textFile("data/baseinput/words.txt")
//println(s"count value is:${fileRDD.count()}")//count value is:2
//* 3-执行flatmap执行扁平化操作
val valueRDD: RDD[String] = fileRDD.flatMap(x => x.split("\\s+"))
//* 4-执行map转化操作,(word,1)
val mapRDD: RDD[(String, Int)] = valueRDD.map(x => (x, 1))
//* 5-reduceByKey(预聚合)
val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey((a, b) => a + b)
//* 6-输出到文件系统或打印即可
resultRDD.foreach(x=>println(x)) //这里是直接使用foreach对RDD进行打印
println("=============================")
resultRDD.collect().foreach(println(_))//能否转换为集合在打印
println("=============================")
resultRDD.saveAsTextFile("data/baseoutput/output-1")
sc.stop()
}
}

Java编程指南[了解]

  • 4-构建基础Java的WordCount版本[了解]

    • 1-需求:使用Java语言完成Wordcount代码

    • 2-步骤:

      • 1-首先创建SparkContent上下文环境
      • 2-从外部文件数据源读取数据
      • 3-执行flatmap执行扁平化操作
      • 4-执行map转化操作,(word,1)
      • 5-reduceByKey(预聚合)
      • 6-输出到文件系统或打印即可
    • 3-结果或代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package cn.spark.sparkbase;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

/**
* DESCRIPTION:
* 1-首先创建SparkContent上下文环境
* 2-从外部文件数据源读取数据
* 3-执行flatmap执行扁平化操作
* 4-执行map转化操作,(word,1)
* 5-reduceByKey(预聚合)
* 6-输出到文件系统或打印即可
*/
public class _01SparkFirst {
public static void main(String[] args) {
//1-首先创建SparkContent上下文环境
SparkConf conf = new SparkConf().setAppName("_01SparkFirst").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
//2-从外部文件数据源读取数据
JavaRDD<String> fileRDD = jsc.textFile("data/baseinput/words.txt");
//3-执行flatmap执行扁平化操作(学习看源码)
JavaRDD<String> flatMapRDD = fileRDD.flatMap(x -> Arrays.asList(x.split("\\s+")).iterator());
//4-执行map转化操作,(word,1)(学习看源码)
JavaPairRDD<String, Integer> mapRDD = flatMapRDD.mapToPair(x -> new Tuple2<>(x, 1));
//5-reduceByKey(预聚合)
JavaPairRDD<String, Integer> resultRDD = mapRDD.reduceByKey((a, b) -> a + b);
//6-输出到文件系统或打印即可
List<Tuple2<String, Integer>> collectResult = resultRDD.collect();
collectResult.forEach(System.out::println);
//关闭连接
jsc.stop();
}
}
  • 总结1:

    • setAppName撰写this.getClass.getSimpleName.stripSuffix(“$”)方式
  • 总结2:

1
2
//这里设置线程休眠为了能够看到WebUI的展示的DAG等监控界面
Thread.sleep(100*1000)
  • spark.assets/image-20210327155412197.png
  • 总结3:

    • Spark划分为两大角色:Driver和Executor
    • spark.assets/image-20210327160052790.png
    • Driver是负责应用管理者,申请资源和执行dag的计算
    • Executor利用线程执行对应分区的计算,一个task需要一个线程执行
    • spark.assets/image-20210327160453052.png
    • 传统意义上的几核几线程,实质上指的是PC机器的线程数,比如我的机器6和12线程,线程个数最多可以使用12个
    • spark.assets/image-20210327160757406.png
    • Job和DAG的关系
    • spark.assets/image-20210327160950818.png
    • 代码中那些是在Driver端进行的,那些是Executor端进行的
    • spark.assets/image-20210327161207745.png
    • spark.assets/image-20210327161215209.png
打包上传后提交任务
  • 后续再讲解打包上传

  • 需求:需要实现代码并提交到集群上运行

  • 步骤;

    • 1-首先pom实现package打包

      • 直接package
    • 2-上传虚拟机中

      • 直接拖拽
    • 3-执行任务

1
2
3
4
5
6
7
8
# 自己实现的代码放在集群上跑任务,尝试使用Spark Yarn-client模式
bin/spark-submit \
--master yarn \
--deploy-mode client \
--class cn.spark.sparkbase._01SparkWordCountTar \
/export/data/spark-base_2.11-1.0.0.jar \
hdfs://node1.spark.cn:8020/wordcount/input \
hdfs://node1.spark.cn:8020/wordcount/output-8
  • 查看结果

  • spark.assets/image-20210327181358437.png
  • 该任务需要掌握

SparkSubmit提交任务的参数
  • 下面的重点掌握
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
YARN:
--master MASTER_URL集群资源管理器 spark://host:port,host1:port, mesos://host:port, yarn
k8s://https://host:port, or local (Default: local[*]).
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
on one of the worker machines inside the cluster ("cluster")
(Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
--jars JARS Comma-separated list of jars to include on the driverand executor classpaths.
--conf PROP=VALUE Arbitrary Spark configuration property.
# 关键部分,Driver端和Executor端的配置
# Driver申请资源执行计算任务
--driver-memory MEM Driver端的内存默认1G Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
--driver-cores NUM Number of cores used by the driver, only in cluster mode(Default: 1).
这里注意,为什么local不需要使用driver-cores,因为使用local[*]模拟本机多线程
# Executor是真正执行资源和计算任务的
--num-executors NUM Number of executors to launch (Default: 2).启动多少个executors,默认2个
If dynamic allocation is enabled, the initial number of
executors will be at least NUM.还可以动态开启
--executor-memory MEM 每个Executor的内存,默认1G Memory per executor (e.g. 1000M, 2G) (Default: 1G).
--executor-cores NUM 每个executor有多少cores,yarn默认为1
Number of cores per executor. (Default: 1 in YARN mode,
or all available cores on the worker in standalone mode)
--queue QUEUE_NAME The YARN queue to submit to (Default: "default").

如果有一个需求的数据量,需要满足Executor端的内存一定超越给定的数据量,
cpu-cores越多越好(cpu-cores模拟的线程,每个线程执行1个分区的数据,如果业务数据分区越多,开启cpucores越多)
bin/spark-submit \
--master yarn \
--deploy_mode cluster \
--driver-memory 2g \
--driver-cores 2 \
--num-executors 10 \
--executor-memory 2g \
--executor-cores 3 \
--class cn.spark.apple.mainclass \
jar包路径 \
程序需要参数
  • spark.assets/image-20210327183419885.png
  • spark.assets/image-20210327183443992.png
  • spark.assets/image-20210327183528338.png
IDEA直接读取HDFS文件
  • 需求:在IDEA中直接读取HDFS文件,如何实现

  • 步骤:

    • 1-远程连接Hadoop集群
    • 2-直接拷贝core-site.xml和hdfs-site.xml配置文件(为了使用相对路径)
    • 3-拷贝到对应module模块的resource目录下
    • 4-直接写相对路径即可读取
  • 操作连接服务器的步骤

  • spark.assets/image-20210329100755508.png
  • spark.assets/image-20210329100814540.png
  • spark.assets/image-20210329100956361.png
  • spark.assets/image-20210329101111673.png
  • 用groupbykey的方法实现wordcount案例

  • spark.assets/image-20210329102340493.png
  • 排序的操作,这里截图使用的是top方法

  • spark.assets/image-20210329103236824.png
  • 所有代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    package cn.spark.sparkbase

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}

    /**
    * DESC:
    * 1-首先需要创建SparkContext,引入SparkConf
    * 2-读取外部数据文件到RDD
    * 3-RDD的转换
    * 4-将结果数据保存在HDFS中
    */
    object _01SparkWordCount {
    def main(args: Array[String]): Unit = {
    //1-首先需要创建SparkContext,引入SparkConf
    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    //2-读取外部数据文件到RDD
    val fileRDD: RDD[String] = sc.textFile("hdfs://node1:8020/wordcount/input/")
    //fileRDD.foreach(println(_))
    //3-RDD的转换
    val flatMapRDD: RDD[String] = fileRDD
    .filter(line => line != null && line.trim.length > 0)
    .flatMap(_.split("\\s+"))
    val resultRDD: RDD[(String, Int)] = flatMapRDD.map((_, 1)).groupByKey().map(x =>(x._1,x._2.sum) )
    //将结果数据收集到Driver端-all the data is loaded into the driver's memory.
    val resultArray: Array[(String, Int)] = resultRDD.collect()
    println(resultArray.toBuffer) //打印方法
    //Buffer((hello,4), (me,3), (you,2), (her,1))
    //4-排序操作,因为take是action算子,如果返回值是rdd的化验证操作是Transformation操作
    println("sotyby oprations is:================")
    resultRDD.sortBy(_._2,true).take(3).foreach(println(_))
    println("sotybykey oprations is:================")
    //Take the first num elements of the RDD.
    resultRDD.map(_.swap).sortByKey(true).take(3).foreach(println(_))
    println("top oprations is:================")
    //Returns the top k (largest) elements from this RDD
    resultRDD.top(3)(Ordering.by(x=>x._2)).foreach(println(_))
    //4-将结果数据保存在HDFS中
    //flatMapRDD.foreach(println(_))
    sc.stop()
    }
    }

SparkCore–RDD

重难点知识

  • 使用IDEA完成HDFS文件读取

  • RDD的引入

  • RDD的特性–面试必问[重点]

  • RDD的创建

  • RDD的转换算子

  • RDD的行动算子

  • RDD的案例实战[重点]

RDD是什么

RDD*(Resilient Distributed Dataset)*是弹性分布式数据集,是不可变,可分区,可并行计算的集合。

  • 什么是弹性:数据可以存储在磁盘也可以存储在内存中

  • 什么是分布式:分布式计算和分布式存储

  • 什么是数据集:数据构成的集合

  • 不可变:RDD一旦创建就不能改变,不能转换

  • 可分区:RDD是可以划分为不同分区partition

  • 可并行计算的集合:基于内存的可并行计算集合

  • spark.assets/image-20210329104805381.png
  • 源码分析

  • spark.assets/image-20210329105132237.png
  • spark.assets/image-20210329105322406.png
  • spark.assets/image-20210329105804738.png

RDD的五大属性

  • 1-分区列表,每一个RDD都是不同分区构成的
  • 2-计算函数:每个RDD的分区都有计算函数作用
  • 3-依赖关系:每个RDD有一定依赖关系
  • 4-可选分区器:也就是RDD有分区器,默认是Hash-Partitioner
  • 5-可选位置优先性:移动计算不要移动存储
  • spark.assets/image-20210329114735118.png
  • 稍后以WordCount案例梳理五大属性
  • 1-如何查看分区个数
  • spark.assets/image-20210329114358375.png
  • 2-如何查看分区器
  • spark.assets/image-20210329114611523.png
  • spark.assets/image-20210329114647245.png
  • 补充图示
  • spark.assets/image-20210329114846354.png

RDD创建的方法

  • 三种方法的创建

  • spark.assets/image-20210329115334221.png
  • 增加分区的理解

  • spark.assets/image-20210329115712928.png
  • spark.assets/image-20210329120459797.png
  • IDEA中查看

  • spark.assets/image-20210329120813069.png
  • spark.assets/image-20210329121120383.png
  • 点击源码查看的过程,得到结论是makerdd或parallise都是根据totalcpucores和2比较最大值

  • 如果直接覆盖makerdd或parallise的第二个分区个数的参数,改变该取值

  • spark.assets/image-20210329121141838.png
  • spark.assets/image-20210329121155114.png
  • spark.assets/image-20210329121233674.png
  • spark.assets/image-20210329121305638.png
  • spark.assets/image-20210329121325894.png

并行度

  • 思考:spark.default.parallelism啥东西
  • spark.assets/image-20210329121553014.png
  • spark.assets/image-20210329121821898.png
  • 如何在代码中设置上述参数
  • spark.assets/image-20210329122144187.png
  • 查看textFile如何控制并行度?
  • spark.assets/image-20210329144455731.png
  • spark.assets/image-20210329144553963.png
  • spark.assets/image-20210329144700382.png
  • 如何使用Spark的rdd读取很多小文件?
  • 1-数据集含义:userid-moviesid-rating-timestamp,一个用户在什么时间给什么电影评分
  • spark.assets/image-20210329145620263.png
  • 2-sc.textFile遇到小文件没有办法很好合并小文件的,即便重写第二个参数也没有作用
  • Spark中提供了小文件的处理方案sc.wholetextFile的方式,不会根据文件多少得到多少分区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package cn.spark.sparkbase.rddopration

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* DESC:
*/
object _02SparkRDDFirst {
def main(args: Array[String]): Unit = {
//1-准备环境
val conf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.set("spark.default.parallelism", "4")
val sc = new SparkContext(conf)
//2-准备读取外部文件系统
val fileRDD: RDD[String] = sc.textFile("data/baseinput/words.txt")
//sc.parallelize(1 to 10)
//3-查看分区个数
println("partitons length:" + fileRDD.getNumPartitions) //2
println(s"partiiton length is:${fileRDD.partitions.length}") //2
//读取一个文件夹下的多个文件
//textFile在读取小文件的时候,会参考小文件的个数,文件个数越多,分区个数越多
val filesRDD1: RDD[String] = sc.textFile("data/baseinput/ratings100/")
println("partitons length:" + filesRDD1.getNumPartitions) //100
//Spark读取小文件的方法
val filesRDD2: RDD[(String, String)] = sc.wholeTextFiles("data/baseinput/ratings100/")
//println("partitons length:" + filesRDD2.getNumPartitions) //2
filesRDD2.take(3).foreach(println(_))
//23106,5214,3.0,1043445719

sc.stop()
}
}
  • 补充案例

  • spark.assets/image-20210329150938698.png
  • spark.assets/image-20210329151026005.png
  • 用textFile时,它的partition的数量是与文件夹下的文件数量(实例中用3个xxx.log文件)相关,一个文件就是一个partition(既然3个文件就是:partition=3),文件大小均匀的情况下,可以这么估算,具体按照hadoop的文件切片规则(goalsize->splitsize->1.1split每个大文件会进行切片)。
  • textFile分片源码(日志打开info,搜索input split)
spark.assets/image-20210401101124646.png spark.assets/image-20210401125812303.png

spark wholeTextFile合并小文件的过程较为简单,mapreduce的CombineTextInputFormat合并过程较为复杂。

https://blog.csdn.net/qq_35241080/article/details/106065442

spark.assets/image-20230223212739929.png

RDD的算子分类

  • Transformation转化算子
  • Action行动算子,使用action算子不会返回rdd,且会将结果返回给driver端。
RDD的Transformation算子
  • 第一种:RDD的单Value类型RDD

  • 第二种:RDD的双Value的RDD

  • 第三种:RDD的Key和value类型的RDD

RDD单Value类型算子

map算子

  • 1-含义:经过fun的操作得到新的RDD
  • 2-操作
  • spark.assets/image-20210329153047465.png

filter算子

  • 1-含义:经过fun的操作得到新的RDD,元素需要满足条件
  • 2-操作
  • spark.assets/image-20210329153254866.png

flatMap算子

  • 1-含义:扁平化
  • 2-操作
  • spark.assets/image-20210329153446260.png

mapPartitions算子

  • 1-含义
  • spark.assets/image-20210329153707322.png
  • 2-操作

mapParittionWithIndex算子

  • 1-含义
  • spark.assets/image-20210329153736589.png
  • 2-操作

sample算子

  • 1-含义
  • spark.assets/image-20210329153839101.png
  • seed:保证每次随机切分的数据的可重复性
  • 2-操作
  • spark.assets/image-20210329154153144.png

glom算子

  • 1-含义:查看每个分区的内容
  • 2-操作
  • spark.assets/image-20210329154322630.png

sortBy算子

  • 1-含义:根据key或者value进行排序
  • 2-操作:
  • spark.assets/image-20210329154534322.png

coalese算子(合并)

  • 如果缩减分区直接给定缩减到的数量,扩分区需要需要开启Shuffle为true
  • 1-含义
  • spark.assets/image-20210329160552847.png
  • 2-操作
  • spark.assets/image-20210329161053037.png

repartition算子

  • 1-含义:重分区,调用coalesce(numPartitions,shuffle=true),因此缩减分区尽量用coalesce。

  • 2-操作

  • spark.assets/image-20210329161337772.png
  • 源码:

  • spark.assets/image-20210329161453997.png
  • spark.assets/image-20210329161611016.png
  • repartitionAndSortWithinPartitions重分区的时候就排序,比分完区再排序高效

RDD的双Value类型算子

集合的交集并集补集

union并集

intersection交集

distinct去重

subtract差集

spark.assets/image-20210329162256595.png

zip拉链

需要保持元素和分区个数一致

spark.assets/image-20210329162543660.png spark.assets/image-20210329162617585.png

RDD的Key和Value的算子

partitionBy:

  • 1-含义:
  • spark.assets/image-20210329163033895.png
  • 2-操作:
  • spark.assets/image-20210329163525679.png
  • spark.assets/image-20210329163812119.png
  • ****注意:Spark采用的分区有三种****:第一、水平分区,也就是sc.makerdd按照下标元素划分,第二、Hash划分根据数据确定性划分到某个分区,一般只给定分区数。第三、Range分区该方法一般按照元素大小进行划分不同区域,每个分区表示一个数据区域,如数组中每个数是[0,100]之间的随机数,Range划分首先将区域划分为10份,然后将数组中每个数字分发到不同的分区,比如将18分到(10,20]的分区,最后对每个分区进行排序。

reduceByKey

  • 1-含义:在shuffle前进行预聚合,减少shuffle拉取的数据量
  • spark.assets/image-20210329164120195.png
  • 2-操作:
  • spark.assets/image-20210329164239002.png

groupByKey

  • 1-含义:

  • spark.assets/image-20210407204927753.png
  • 2-操作:

  • spark.assets/image-20210329164600142.png
  • 了解区别

    1. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].
    2. groupByKey:按照key进行分组,直接进行shuffle。
    3. 开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。
  • 需求:使用GroupByKey的源码—使用底层源码实现GroupByKey

  • spark.assets/image-20210329165407910.png
  • spark.assets/image-20210329165624219.png
  • spark.assets/image-20210329170006705.png
  • spark.assets/image-20210329170137771.png
  • spark.assets/image-20210329170509329.png
  • spark.assets/image-20210329171103876.png
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package cn.spark.sparkbase.base

import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.{Aggregator, SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**
* DESC:
*/
object _01groupByKeyOperation {
def main(args: Array[String]): Unit = {
//* 1-首先创建SparkContext上下文环境
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
//* 2创建数据,生成RDD
val filerdd: RDD[String] = sc.makeRDD(Array("hello you", "hello me hello she", "hello spark"))
val flatmapRDD: RDD[String] = filerdd.flatMap(_.split("\\s+"))
val mapRDD: RDD[(String, Int)] = flatmapRDD.map(x => (x, 1))
//* 3使用groupBy完成分组
val groupRDD1: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()
groupRDD1.collect().foreach(println(_))
//(me,CompactBuffer(1))
//(spark,CompactBuffer(1))
//(you,CompactBuffer(1))
//(she,CompactBuffer(1))
//(hello,CompactBuffer(1, 1, 1, 1))
//CompactBuffer是spark模仿arraybuffer构建缓冲容器
//换一种方法模拟gropuByKey实现相同key的value的分组,使用源码的方法
//1-首先CompactBuffer存放分组的结果可以防止的ArrayBuffer
//val createCombiner = (v: V) => CompactBuffer(v)
//val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
//val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
/*
key value 存放value的arraybuffer
*/
val createCombiner = (v: Int) => ArrayBuffer(v)
val mergeValue = (buf: ArrayBuffer[Int], v: Int) => buf += v
val mergeCombiners = (c1: ArrayBuffer[Int], c2: ArrayBuffer[Int]) => c1 ++= c2
val valueRDD: ShuffledRDD[String, Int, ArrayBuffer[Int]] = new ShuffledRDD[String, Int, ArrayBuffer[Int]](mapRDD, new org.apache.spark.HashPartitioner(4))
.setAggregator(new Aggregator(
createCombiner,
mergeValue,
mergeCombiners))
.setMapSideCombine(false)
println("source code create groupBykey")
valueRDD.foreach(println(_))
sc.stop()
}
}

reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?

这四个算子底层调用的都是同一个方法combineByKeyWithClassTag只不过他们的参数传值不同:

reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则可以不相同。

1
2
3
4
5
CombineByKey与combineByKeyWithClassTag是一样的,为了兼容所以保留着
This method is here for backward compatibility. It does not provide combiner classtag information to the shuffle.
jvm的类型擦除问题,泛型T,K,U,V的类型信息,类Java语言只能在编译的阶段获取到类型参数,一旦代码被送入JVM运行,就会被擦除.
1.只要在定义泛型函数或泛型类的时候,在泛型标识后加上:ClassTag关键字,并且导入 scala.reflect.ClassTag 包即可
2.或者使用:Manifest关键字,这个不需要导包

用法举例:
FoldByKey(同一分区同一key与初始值计算一次,n个分区同一key与初始值计算n次)

spark.assets/image-20210329175439719.png

aggreateByKey 不同分区的最大值相加

为避免reduceByKey内存问题,可用aggregateByKey。

1
To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.
spark.assets/image-20210329175221167.png

combineByKey(可用来求平均值)

  • spark.assets/image-20210329174316632.png

sortByKey

思考:sortBy是全局排序吗?是。
rdd. sortBy(_._2,false). foreach(println) //虽然sortBy是全局排序,但由于不止一个分区,foreach输出的时候分区的先后顺序随机,又把全局排序后的数据打乱了。

按照RangePartitioner shuffle会有倾斜的问题

spark.assets/image-20230228183833184.png
  • 1-含义:
  • spark.assets/image-20210329175618915.png
  • 2-操作:
  • spark.assets/image-20210329175728523.png

join

  • 1-含义:
  • spark.assets/image-20210329175822048.png
  • 2-操作:
  • spark.assets/image-20210329175908622.png

cogroup

  • 1-含义:
  • spark.assets/image-20210329175934847.png
  • 2-操作:
  • spark.assets/image-20210329180035130.png

cartisian

  • 1-含义:
  • spark.assets/image-20210329180049060.png
  • 2-操作:
  • spark.assets/image-20210329180141685.png

mapvalue

  • 1-含义:
  • 对Value进行操作
  • 2-操作:
  • spark.assets/image-20210329180245294.png

RDD的Action算子

  • reduce

  • collect(拉取的分区结果数据很大的情况下,会造成driver端的内存溢出,可以foreach打印出来,或者saveasTextFile保存到硬盘查看)

  • count

  • first

  • take(从第一个分区开始,满足要求就不再遍历获取,collect)

  • spark.assets/image-20210329180609115.png
  • takeSample

  • spark.assets/image-20210329180739703.png
  • takeorder

  • aggreate

  • fold

  • spark.assets/image-20210329180904341.png
  • countByKey

  • spark.assets/image-20210329181008334.png
  • foreach

  • rdd.foreach方法在执行的过程中的打印方法是在Executor中执行的,每个Executor在执行完自己的逻辑之后就执行foreach进行打印,因此在本地多线程执行的时候,可能List(3,4)是有可能先执行完成,所以会存在顺序错乱的情况。多线程,多分区才会有这种情况。而value.collect().foreach(println)这种写法的print是在数据采集到Driver之后,在Driver端打印的。所以顺序不会乱

  • 常见的RDD统计操作

  • spark.assets/image-20210329181148676.png

RDD的关键算子练习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package cn.spark.sparkbase.base

import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import org.junit.Test

/**
* DESC:
*/
class _02RDDTest {

private val sc = new SparkContext(new SparkConf().setAppName("_02RDDTest").setMaster("local[*]"))

@Test
def test01(): Unit = {
sc.parallelize(Seq(1, 2, 3, 4, 5))
.map(x => x * 2)
.foreach(println(_))

sc.parallelize(Seq(1, 2, 3, 4, 5))
.filter(x => x > 3)
.foreach(println(_))

}

@Test
def test02: Unit = {
sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)
.map(x => (x * 2))
.foreach(println(_))
//如果直接使用foreach是无法作为iteratale返回
sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)
//f: Iterator[T] => Iterator[U],
.mapPartitions(iter => {
iter.foreach(println(_))
iter
}).collect()
//执行每个分区的元素乘以2
sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)
//f: Iterator[T] => Iterator[U],
.mapPartitions(iter => {
val iterator: Iterator[Int] = iter.map(item => item * 2)
iterator
}).collect().foreach(println(_))

println("上面的等价写法")
sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)
//f: Iterator[T] => Iterator[U],
.mapPartitions(iter => {
iter.map(item => item * 2)
}).collect().foreach(println(_))
}

@Test
def test03: Unit = {
sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)
// f: (Int几号分区, Iterator[T]) => Iterator[U]
.mapPartitionsWithIndex((index, iter) => {
println("index:" + index)
iter.map(_ * 2)
}).collect().foreach(println(_))
println("改进的方法")
sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)
// f: (Int几号分区, Iterator[T]) => Iterator[U]
.mapPartitionsWithIndex((index, iter) => {
iter.map(x => "index is:" + index + "\tvalue is:" + x * 2)
}).collect().foreach(println(_))
println("想用mapPartition的方法实现分区有哪些元素")
sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)
.mapPartitions(iter => {
println("partitionID:", TaskContext.getPartitionId())
iter.map(x => x * 2)
}).collect().foreach(println(_))

println("想用mapPartition的方法实现分区有哪些元素,改进方法")
sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)
.mapPartitions(iter => {
iter.map(x => "partitionID:" + TaskContext.getPartitionId() + "\tvalue is:" + x * 2)
}).collect().foreach(println(_))
//想用mapPartition的方法实现分区有哪些元素,改进方法
//partitionID:0 value is:2
//partitionID:0 value is:4
//partitionID:0 value is:6
//partitionID:1 value is:8
//partitionID:1 value is:10
//partitionID:1 value is:12
}

}

案例1:groupByKey(要求和需要再sum)

  • spark.assets/image-20210330091415920.png
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
def test01(): Unit = {
val value: RDD[(String, Int)] = sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
value
.groupByKey()
.collect()
.foreach(println(_))
//(a,CompactBuffer(1, 1))
//(b,CompactBuffer(1))
value
.groupByKey()
.map(x => (x._1, x._2.sum))
.collect()
.foreach(println(_))
}

案例2:combineBykey(平均值)

  • spark.assets/image-20210330092050781.png
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Test
def test02: Unit = {
val rdd: RDD[(String, Double)] = sc.parallelize(Seq(
("zhangsan", 99.0),
("zhangsan", 96.0),
("lisi", 97.0),
("lisi", 98.0),
("zhangsan", 97.0))
)
// createCombiner: Value 元素=> C 对元素的操作,
// mergeValue: (C, Value) => C,需要将上一个初始值和新的value进行合并
// mergeCombiners: (C, C) => C,
val createCombiner = (curr: Double) => (curr, 1)
val mergeValue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, curr._2 + 1)
val mergeCombiners = (curr1: (Double, Int), agg1: (Double, Int)) => (curr1._1 + agg1._1, curr1._2 + agg1._2)
val valueRDD: RDD[(String, (Double, Int))] = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)

valueRDD.foreach(println(_))
// x._1 x._2._1
//(zhangsan,(292.0,3))
//(lisi,(195.0,2))
valueRDD.map(x => (x._1, x._2._1 / x._2._2)).foreach(println(_))//平均值
//(zhangsan,(292.0,3))
//(zhangsan,97.33333333333333)
//(lisi,97.5)
}

案例3:foldByKey

  • spark.assets/image-20210330093143876.png
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
def test03: Unit = {
//
sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
.aggregateByKey(0)(_ + _, _ + _)
.collect()
.foreach(println(_))

sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
.foldByKey(0)(_ + _)
.collect()
.foreach(println(_))

sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
.foldByKey(0)((a, b) => a + b)
.collect()
.foreach(println(_))
}
  • collectAsMap
1
2
3
4
5
@Test
def test04(): Unit ={
val rdd = sc.parallelize(List(("a", 1), ("a", 3), ("b", 2)))
rdd.collectAsMap().foreach(println(_))
}
  • randomSplit
1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
def test04(): Unit ={
val rdd = sc.parallelize(List(("a", 1), ("a", 3), ("b", 2)))
rdd.collectAsMap().foreach(println(_))
//Randomly splits this RDD with the provided weights.
//weights: Array[Double],
//seed: Long = Utils.random.nextLong
val array: Array[RDD[(String, Int)]] = rdd.randomSplit(Array(0.6, 0.4), 123L)
val traingSet: RDD[(String, Int)] = array(0)
val testSet: RDD[(String, Int)] = array(1)
println("=======================")
traingSet.collect().foreach(println(_))
}
  • join
1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
def testJoin(): Unit ={
// 模拟数据集
val empRDD: RDD[(Int, String)] = sc.parallelize(
Seq((1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu"))
)
val deptRDD: RDD[(Int, String)] = sc.parallelize(
Seq((1001, "sales"), (1002, "tech"))
)
empRDD.join(deptRDD).foreach(println(_))
println("============")
empRDD.leftOuterJoin(deptRDD).foreach(println(_))
}

实战案例

实战1:Spark实战PV、UV、访问来源TOPN
  • 需求:读取日志文件,实现PV,UV,TOPN
1
2
3
4
5
日志格式:
127.0.0.1 - - [28/Nov/2019:08:37:25 +0800] "GET / HTTP/1.1" 200 57621 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36"
1、pv的全称是page view,译为页面浏览量或点击量,通常是衡量一个网站甚至一条网络新闻的指标。PV反映的是浏览某网站的页面数,所以每刷新一次也算一次。就是说PV与来访者的数量成正比,但PV并不是页面的来访者数量,而是网站被访问的页面数量。

2、uv的全称是unique view,译为通过互联网访问、浏览这个网页的自然人,访问网站的一台电脑客户端被视为一个访客,在同一天内相同的客户端只被计算一次。

*

  • 步骤:

    • 实现PV-PageView:
    • 1-首先读取数据
    • 2-通过map转换函数对于每一行数据都统计为一个PV
    • 3-需要通过reduceByKey累加计算(AggerateByKey,FoldByKey)
    • 4-实现PV排序操作
    • 5-打印输出或保存起来
    • 实现UV-UserView
    • 1-读取数据
    • 2-通过map转换函数将IP地址选择出来
    • 3-使用distinct去重,并实现统计
    • 4-使用reduceByKey实现相同key的value的统计
    • 5-实现UV的排序输出
    • 6-saveAsTextFile
    • 实现TopK-排名前几位
    • 1-读取数据
    • 2-通过map选择出用户点击的URL或APP等,X(10)
    • 3-进一步实现过滤统计
    • 4-实现TopK输出
    • 5-saveAsTextFile
  • 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package cn.spark.sparkbase.pro

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* DESC:
* 实现PV-PageView:
* 1-首先读取数据
* 2-通过map转换函数对于每一行数据都统计为一个PV
* 3-需要通过reduceByKey累加计算(AggerateByKey,FoldByKey)
* 4-实现PV排序操作
* 5-打印输出或保存起来
* 实现UV-UserView
*/
object _01SparkPvUvTopK {
def main(args: Array[String]): Unit = {
// 实现PV-PageView:
val sc: SparkContext = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val sc = new SparkContext(conf)
sc
}
// 1-首先读取数据
val fileRDD: RDD[String] = sc.textFile("data/baseinput/access.log")
//fileRDD.take(5).foreach(println(_))
//194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
// 2-通过map转换函数对于每一行数据都统计为一个PV
val mapRDD: RDD[(String, Int)] = fileRDD.map(x => ("PV", 1))
//mapRDD.take(3).foreach(println(_))
// 3-需要通过reduceByKey累加计算(AggerateByKey,FoldByKey)
val resuleRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
// 5-打印输出或保存起来
resuleRDD.foreach(println(_)) //(PV,14619)
// 实现UV-UserView
// 1-读取数据 -已经读取
// 2-通过map转换函数将IP地址选择出来
val ipValue: RDD[String] = fileRDD.map(x => x.split("\\s+")).map(x => x(0))
// 3-使用distinct去重,并实现统计
val uvValue: RDD[(String, Int)] = ipValue.distinct().map(x => ("UV", 1))
// 4-使用reduceByKey实现相同key的value的统计
val uvResult: RDD[(String, Int)] = uvValue.reduceByKey(_ + _)
uvResult.foreach(println(_)) //(UV,1050)
// 5-实现UV的排序输出
//实现TopK-排名前几位
//1-读取数据--已经读取
//2-通过map选择出用户点击的URL或APP等,X(10)
val value1RDD: RDD[(String, Int)] = fileRDD.map(_.split("\\s+")).filter(x => x.length > 10).map(x => (x(10), 1))
//value1RDD.take(20).foreach(println(_))
//3-进一步实现过滤统计
val result1RDD: RDD[(String, Int)] = value1RDD.reduceByKey(_ + _).sortBy(x => x._2, false).filter(x => x._1 != "\"-\"")
//如果直接执行sortBy或sortBykey全局排序的,打印在客户端是随机的分区,这时候可以增加collect收集到driver端,但是数据量大不建议,collect影响性能。
result1RDD.take(5).foreach(println(_))
//("http://blog.fens.me/category/hadoop-action/",547)
//("http://blog.fens.me/",377)
//("http://blog.fens.me/wp-admin/post.php?post=2445&action=edit&message=10",360)
//("http://blog.fens.me/r-json-rjson/",274)
//("http://blog.fens.me/angularjs-webstorm-ide/",271)
//4-实现TopK输出
sc.stop()
}
}
  • 总结

    • 学会使用对应RDD实战
实战2:Spark实战区域热点查询
  • 需求:

    • 需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬
  • 数据集:

    • spark.assets/image-20210330120312064.png
    • 用户IP日志信息:记录的是用户IP

1
2
20090121000132095572000|125.213.100.12|show.51.com|/
关注的就是第二个字段
  • 城市IP端信息:需要对某一个区域根据IP划分
1
1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
  • 125.213.100.12 对比 1.0.1.0|1.0.3.255 无法比较大小

  • 方法:1.0.1.0|1.0.3.255部分已经转化为Long类型数据16777472|16778239

    • 如果125.213.100.12 转化为Long类型数据,可以拿到该数据和IP.txt数据进行对比
    • 如何对比?一个数字在一个连续数字区域中?这里选择的是二分查找法
    • spark.assets/image-20210330121337381.png
  • 统计:如果一个用户IP位于城市IP端之内,可以实现统计分析

  • spark.assets/image-20210330122430639.png
  • spark.assets/image-20210330122438073.png
  • 步骤分析:

    • 1-创建SparkContext环境
    • 2-读取两个文件,ip和用户地址
    • 3-将用户IP对比IP信息段
      • 3-1首先将用户ip转化为long类型
      • 3-2将long类型ip放在ip地址信息段中进行查询(二分查找法)
      • 3-3根据查找的索引下标,得到经度和维度
    • 4-根据经度和维度统计区域热度
  • 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package cn.spark.sparkbase.pro

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* DESC:
* 1-创建SparkContext环境
* 2-读取两个文件,ip和用户地址
* 3-将用户IP对比IP信息段
* 3-1首先将用户ip转化为long类型
* 3-2将long类型ip放在ip地址信息段中进行查询(二分查找法)
* 3-3根据查找的索引下标,得到经度和维度
* 4-根据经度和维度统计区域热度
*/
object _05IPCheck {
/
def binarySearch(ipLong: Long, boradcastValue: Array[(String, String, String, String)]): Int = {
var start = 0
var end = boradcastValue.length - 1

while (start <= end) {
var middle = (start + end) / 2
//查找出结果--中间位置的startip
if (ipLong >= boradcastValue(middle)._1.toLong && ipLong <= boradcastValue(middle)._2.toLong) {
return middle
}
//在右边查找
if (ipLong > boradcastValue(middle)._2.toLong) {
start = middle + 1
}
//在左边查找
if (ipLong < boradcastValue(middle)._1.toLong) {
end = middle - 1
}
}
-1
}

def ipToLong(ip: String): Long = {
//注意:IP个原始面貌:
//10111111.10111010.11110000.11110000
val ipArr: Array[Int] = ip.split("[.]").map(s => Integer.parseInt(s))
var ipnum = 0L
for (i <- ipArr) {
//<<表示位运算左移 ,0L左移之后还是0L,二进制形式:00000000.00000000.00000000.00000000
//其他数,左移之后,后面补0
//|表示位运算或,或的特点是,与0进行或,返回本身
//第一次:
//00000000.00000000.00000000.10111111
//00000000.00000000.00000000.00000000
//00000000.00000000.00000000.10111111
//第二次:
//00000000.00000000.00000000.10111010
//00000000.00000000.10111111.00000000
//00000000.00000000.10111111.10111010
//第三次:
//00000000.00000000.00000000.11110000
//00000000.10111111.10111010.00000000
//00000000.10111111.10111010.11110000
//第四次:
//0000
0.00000000.00000000.11110000
//10111111.10111010.11110000.00000000
//10111111.10111010.11110000.11110000
ipnum = i | (ipnum << 8)
}
ipnum
}

def main(args: Array[String]): Unit = {
//1-创建SparkContext环境
val sc: SparkContext = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val sc = new SparkContext(conf)
sc
}
//2-读取两个文件,ip和用户地址
val userIPRDD: RDD[String] = sc.textFile("data/baseinput/ip/20190121000132.394251.http.format")
val userSplitRDD: RDD[String] = userIPRDD.map(_.split("\\|")).map(x => x(1))
//userSplitRDD.take(5).foreach(println(_))//125.213.100.123

val ipRangeRDD: RDD[String] = sc.textFile("data/baseinput/ip/ip.txt")
//223.247.0.0|223.247.7.255|3757506560|3757508607|亚洲|中国|安徽|池州||电信|341700|China|CN|117.489157|30.656037
//开始ip的起始long类型地址,结束ip的long类型地址,经度,维度
val ipRDD: RDD[(String, String, String, String)] = ipRangeRDD.map(_.split("\\|")).map(x => (x(2), x(3), x(x.length - 2), x(x.length - 1)))
//ipRDD.take(5).foreach(println(_))//(16777472,16778239,119.306239,26.075302)
val broadcastIpValue: Broadcast[Array[(String, String, String, String)]] = sc.broadcast(ipRDD.collect())
//broadcastIpValue.value//直接引用他的值
//3-将用户IP对比IP信息段
val resultRDD: RDD[((String, String), Int)] = userSplitRDD.mapPartitions(iter => {
val boradcastValue: Array[(String, String, String, String)] = broadcastIpValue.value
iter.map(ip => {
//* 3-1首先将用户ip转化为long类型
val ipLong: Long = ipToLong(ip)
//* 3-2将long类型ip放在ip地址信息段中进行查询(二分查找法)
val index: Int = binarySearch(ipLong, boradcastValue)
//* 3-3根据查找的索引下标,得到经度和维度
((boradcastValue(index)._3, boradcastValue(index)._4), 1)
})
}) //end mapParttion
//4-根据经度和维度统计区域热度
resultRDD.reduceByKey(_ + _).sortBy(_._2, false).take(5).foreach(println(_))
//((108.948024,34.263161),1824)
//((116.405285,39.904989),1535)
//((106.504962,29.533155),400)
//((114.502461,38.045474),383)
//((106.57434,29.60658),177)
sc.stop()
}
}
  • 总结:

    • 直接利用ip转化为long类型的额工具类
    • 注意使用二分查找方法
    • 需要使用广播变量
实战3:Spark实战搜狗分词统计查询
1
2
3
4
5
<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.7.7</version>
</dependency>
  • 读取数据

  • 搜索关键词统计

  • 用户搜索点击统计

  • 搜索时间段统计

  • spark.assets/image-20210330105419488.png
  • 步骤

    • 读取数据:sc.textFile,数据处理,这里可以使用map或mappartition,map(x=>x.split(“\\s+”)
    • 搜索关键词统计:首先拿到查询词,对查询词进行分词,根据分词进行统计
    • 用户搜索点击统计:需要拿到用户ID和用户查询词,然后根据(用户ID,用户查询词)进行统计
    • 搜索时间段统计:获取用户的搜索的时间段,根据时间段统计排序
  • 代码

  • 1-分词代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package cn.spark.sparkbase.pro

import java.util

import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import com.hankcs.hanlp.tokenizer.StandardTokenizer

/**
* DESC:
*/
import scala.collection.JavaConverters._

object _02HanLp {
def main(args: Array[String]): Unit = {

val terms: util.List[Term] = HanLP.segment("我是黑马程序员的一匹黑马")
println(terms) //[我/r, 是/v, 黑马/n, 程序员/n, 的/uj, 一/m, 匹/q, 黑马/n]
println(terms.asScala.map(_.word.trim)) //ArrayBuffer(我, 是, 黑马, 程序员, 的, 一, 匹, 黑马)

val terms1: util.List[Term] = StandardTokenizer.segment("我是黑马程序员的一匹黑马")
println(terms1.asScala.map(_.word.trim))

//00:00:00 2982199073774412 [360安全卫士] 8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html
val arr: Array[String] = """00:00:00 2982199073774412 [360安全卫士] 8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html""".split("\\s+")
println(arr(2).replaceAll("\\[|\\]", ""))//360安全卫士
}
}
  • 搜索关键词统计
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package cn.spark.sparkbase.pro

import java.util

import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.JavaConverters._
import scala.collection.mutable

/**
* DESC:
* 首先完成搜狗词库的数据读取
* 1-创建SparkContext
* 2-读取数据
*/
object _04SougouCount {
def main(args: Array[String]): Unit = {
//1-创建SparkContext
val sc: SparkContext = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val sc = new SparkContext(conf)
sc
}
//2-读取数据
val sougouRDD: RDD[String] = sc.textFile("data/baseinput/sougu/SogouQ.reduced")
println(s"sougouRDD count value is:${sougouRDD.count()}") //1724264
//3-引入样例类,可以基于样例类更好解析各个搜狗的字段
val recordRDD: RDD[SogouRecord] = sougouRDD
//这里的过滤需要处理,需要对有缺失字段的需要使用length判断长度
.filter(line => line != null && line.trim.split("\\s+").length == 6)
.mapPartitions(iter => {
iter.map(record => {
val arr: Array[String] = record.split("\\s+")
SogouRecord(arr(0), arr(1), arr(2).replaceAll("\\[|\\]", ""), arr(3).toInt, arr(4).toInt, arr(5))
})
})
//打印信息
recordRDD.take(5).foreach(println(_))
//SogouRecord(00:00:00,2982199073774412,360安全卫士,8,3,download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html)
//SogouRecord(00:00:00,07594220010824798,哄抢救灾物资,1,1,news.21cn.com/social/daqian/2008/05/29/4777194_1.shtml)
//3-搜索关键词统计
val valueRDD: RDD[String] = recordRDD.mapPartitions(iter => {
iter.flatMap(record => {
val terms: util.List[Term] = HanLP.segment(record.queryWords)
terms.asScala.map(_.word.trim)
})
}) //end mappartition
//valueRDD.take(5).foreach(println(_))
val keyWordCount: RDD[(String, Int)] = valueRDD.map(x => (x, 1)).reduceByKey(_ + _).sortBy(_._2, false)
println("========搜索关键词统计============")
keyWordCount.take(5).foreach(println(_))
//(+,193939)
//(的,93246)
//(.,90985)
//(地震,88451)
//(救灾,69662)


sc.stop()
}
}

RDD的DAG

  • spark.assets/image-20230226004638330.png
  • spark.assets/image-20210330155143975.png
  • 为什么需要有DAG?

    • Spark作为第三代计算引擎,通过有向无环图构建Spark的任务。
  • 什么是DAG?

    • 有向无环图,Spark或Impala或Flink都是基于DAG构建任务
    • 通过4040端口查看DAG有向无环图
  • DAG如何划分Stage

    • 划分Stages的依据是发生宽依赖,从当前job的最后一个算子往前推,遇到宽依赖,那么当前在这个批次中的所有算子操作都划分成一个stage
  • DAG划分Stage优势是什么?

    • 同一个Stage内可以并行计算,这样能够加快计算速度

Spark调度流程分析

spark的调度分为资源申请和任务调度。

如下图 110为资源申请,1219为任务调度。

12其实在driver启动后初始化就创建了DAGSchedluer和TaskScheduler,TaskScheduler通过后台进程去向ResourceManager申请资源。

  • spark.assets/b6812fd9592e46c483f8f64fc50e6ad2.jpg

粗粒度资源申请(Spark)

Application执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的task执行完成后,才会释放这部分资源。

优点:在Application执行之前,所有的资源都申请完毕,每一个task运行时直接使用资源就可以了,不需要task运行时在执行前自己去申请资源,task启动就快了,task执行快了,stage执行就快了,job就快了,application执行就快了。

缺点:直到最后一个task执行完成才会释放资源,集群的资源无法充分利用。当数据倾斜时更严重。

细粒度资源申请(MapReduce)

Application执行之前不需要先去申请资源,而是直接执行,让job中的每一个task在执行前自己去申请资源,task执行完成就释放资源。

优点:集群的资源可以充分利用。

缺点:task自己去申请资源,task启动变慢,Application的运行就相应的变慢了。

RDD依赖关系

案例

  • spark.assets/image-20210330152344108.png
  • spark.assets/image-20210330152626009.png
  • spark.assets/image-20210330152727589.png
  • spark.assets/image-20210330152828890.png

为什么有依赖

  • 原因有哪些?

    • 1-依赖关系可以进行RDD的容错,某一个RDD出错,可以顺着依赖链重建RDD
    • 2-RDD的依赖可以构建RDD的血缘关系,也是方便容错
    • 3-依赖关系是为了RDD实现并行计算的,只有shuffle依赖无法实现并行计算

    相邻两个RDD的关系称之为依赖关系,多个连续的RDD的依赖关系,称之为血缘

  • 依赖关系是Spark借助于DAG有向无环图实现的RDD的容错方法,基于依赖关系构建RDD或Spark的血缘关系

依赖关系分为几类?

  • spark.assets/image-20210330153013442.png
  • spark.assets/image-20210330153819152.png
  • 窄依赖:

    • 一个子RDD依赖于一个父RDD,就是窄依赖(错误)
    • 一个父RDD对应一个子RDD,就是窄依赖
  • 宽依赖:

    • 一个子RDD依赖于多个父RDD,就是宽依赖(错误)
    • 一个父RDD对应多个子RDD,就是宽依赖
  • shuffle

    • 发生宽依赖,重新分桶,一定会伴随shuffle,
  • 总结:

    • 区分宽依赖还是窄依赖,看父RDD会不会被子RDD分享,被分享就是宽依赖,否则就是窄依赖。
    • 区分是否发生shuffle,父RDD的数据会不会被重新分桶(分到一个或多个桶,可能shuffle前后分区数一样.)

RDD缓存

  • 什么是缓存
    • Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或者缓存数据集。
  • 为什么需要缓存?
    • 对于某些比较昂贵的算子可以将计算结果缓存起来,重复利用加快计算速度
    • 如果将数据缓存起来可以进行容错,因为缓存可以将数据保存在内存或磁盘中
  • 缓存有几种分类?
    • 两种:Cache和Persist
    • 缓存的级别有哪些?
      • spark.assets/image-20210330160907938.png
      • spark.assets/image-20210330161116768.png
      • 1-内存
      • 2-磁盘
      • 3-堆外内存(off_heap,不受限于jvm管理)
      • 4-序列化—网络传输
      • 5-副本—容错
  • 缓存如何选择?
    • 尽量选择内存,如果内存放不下可以尝试序列化,除非算子昂贵可以放在磁盘,如果容错恢复增加副本机制
    • spark.assets/image-20210330163924968.png
  • 缓存怎么使用?
    • cache
    • persist
    • unpersist
    • 经过缓存的数据明显加快计算速度,一般用于昂贵算子,如shuffle算子的缓存
    • spark.assets/image-20210330164755248.png
  • 缓存有什么问题?
    • 缓存的数据可能存在丢失的情况,考虑使用非易失介质如HDFS分布式文件系统
    • 引出checkpoint检查点机制

RDD的CheckPoint

  • 什么是checkpoint机制?

    • checkpoint是Spark的重要容错机制
    • 因为cache或persist会存在丢失缓存数据的情况,所以提出检查点机制,将数据放在HDFS中,之后就可以从checkpoint所保存的HDFS路径中读取
  • 检查点机制的基本原理?

    • Checkpoint会斩断依赖链,因为Checkpoint会把结果保存在HDFS这类存储中,更加的安全可靠,一般不需要回溯依赖链,而cache和persist不会斩断依赖链.
  • 怎么使用checkpoint检查点机制?

    • sc.setCheckpontDir(“hdfs:///”)设置将RDD的依赖关系保存在HDFS的那个路径中
    • rdd1.checkpoint() 将rdd保存在上述设置的HDFS中
  • checkpoint和缓存的区别和联系?

  • spark.assets/image-20210330165819518.png
  • 实验:

  • spark.assets/image-20210330170522925.png
  • spark.assets/image-20210330170710829.png
  • spark.assets/image-20210330171115309.png
  • spark.assets/image-20210330170926310.png
  • 如果删除checkpoint的一个hdfs文件,会报错,文件不存在

  • spark.assets/image-20210330171438109.png
  • Spark如何实现容错机制的?思考

    • 结果:
    • 1-如果Spark中将数据缓存在内存中,也就是cache和persist,首先从内存中寻找这部分数据
    • 2-如果内存没有数据的话,再从checkpoint所保存的HDFS中寻找
    • 3-如果上述都没有做,直接利用RDD的血缘关系实现容错,也就是重复从rdd1计算得到rdd2在得到rdd3
    • 分析:
    • spark.assets/image-20210330172853037.png
    • 源码
    • spark.assets/image-20210330173603330.png
    • spark.assets/image-20210330173716229.png
    • spark.assets/image-20210330173653892.png
    • spark.assets/image-20210330173333510.png
    • spark.assets/image-20210330173515971.png

共享变量

广播变量

  • spark.assets/image-20210331095314132.png
  • spark.assets/image-20210331095257535.png
  • 关键点:

  • 1-广播变量,是在driver端定义的,executor端拥有副本,在executor端是不能改变广播变量的值

  • 2-广播变量获取的时候是从BlockManager中获取数据,如果本地没有从Driver端获取变量副本

  • 3-如何使用:sc.broadcast(map.collect)

  • 案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package cn.spark.sparkbase.base

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* DESC:
*/
object _04broadcast {
def main(args: Array[String]): Unit = {

//申请资源
val sc:SparkContext={
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val sc = new SparkContext(conf)
sc
}
//创建RDD
val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))
val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap
//fruitMap.foreach(println(_))
//需求:根据水果的编号查找水果的名称
val fruitsIds: RDD[Int] = sc.parallelize(Array(2, 4, 1, 3))
fruitsIds.map(x=>fruitMap(x)).collect().foreach(println(_))
//改进:如果水果很多,那么每个水果都需要拉取fruitMap变量进行对比得到水果名称
val broadMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap)
fruitsIds.map(x=>broadMap.value(x)).collect().foreach(println(_))
//orange
//grape
//apple
//banana
sc.stop()
}
}

累加器

  • 共享变量-累加器:

Accumulator只提供了累加的功能,只能累加,不能减少。

累加器只能在Driver端构建,并只能从Driver端读取结果,在Task端只能进行累加。

  • scala的累加
  • rdd的累加问题
  • 累加器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package cn.spark.sparkbase.base

import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{Accumulator, SparkConf, SparkContext}

/**
* DESC:
*/
object _05accumulate {
def main(args: Array[String]): Unit = {
//申请资源
val sc: SparkContext = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val sc = new SparkContext(conf)
sc
}
//scala的加法
var counter1 = 0
val seq = Seq(1, 2, 3)
seq.map(x => counter1 += x)
println("counter result is:" + counter1)
//rdd的加法-0--为什么会出现现象?因为变量在driver端定义,将数据发送到executor执行累加,但是执行完累加后结果并没返回driver
var counter2 = 0
val rdd1: RDD[Int] = sc.parallelize(seq)
rdd1.foreach(x => counter2 += x)
println(counter2)
//提出了在driver端和executor端共享当前变量
//累加器也是在action操作的时候触发
val acc: Accumulator[Int] = sc.accumulator(0)
rdd1.foreach(x=>acc+=x)
println(acc)
//sc.accumulator该方法2.0已经弃用,可以直接使用sc.longAccumulator
val acc_count: LongAccumulator = sc.longAccumulator("acc_count")
rdd1.foreach(x=>acc_count.add(x))
println(acc_count)//LongAccumulator(id: 51, name: Some(acc_count), value: 6)
println(acc_count.value)
}
}

累加器重复累加问题

Spark中的一系列transform操作都会构造成一长串的任务链,此时就需要通过一个action操作来触发(lazy的特性),accumulator也是如此。

因此在一个action操作之后,调用value方法查看,是没有任何变化

第一次action操作之后,调用value方法查看,变成了5

第二次action操作之后,调用value方法查看,变成了10

原因就在于第二次action操作的时候,又执行了一次累加器的操作,同个累加器,在原有的基础上又加了5,从而变成了10

解决方案具体来说,在以下场景中累加器的行为会表现出重复累加:

  1. 没有缓存的情况下:如果没有使用cachepersist,每次调用action(例如countcollect等),RDD会重新执行所有依赖的transform操作,累加器的值会在每次action时重新计算并增加。
  2. 有缓存的情况下:通过使用cachepersist,可以将RDD的中间结果缓存到内存或磁盘中。这样,后续的action操作会直接从缓存中读取结果,而不再重新执行整个任务链,这样累加器的值也不会再次增加。

共享变量案例

  • 需求:包括非单词组合,统计数据词频时过滤非单词的符合并且统计总的格式。

  • 分析:首先过滤单词,词频统计、

  • 步骤:

    • 1-读取数据
    • 2-使用广播变量,定义的一组非单词组合的list或map广播到executor执行
    • 2-过滤出来,使用累加器统计非单词的组合
    • 3-使用wordcout统计单词综合
  • 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package cn.spark.sparkbase.base

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

/**
* DESC:
* * 1-读取数据
* * 2-使用广播变量,定义的一组非单词组合的list或map广播到executor执行
* * 2-过滤出来,使用累加器统计非单词的组合
* * 3-使用wordcout统计单词综合
*/
object _06acc_broadcast {
def main(args: Array[String]): Unit = {
//申请资源
val sc: SparkContext = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val sc = new SparkContext(conf)
sc
}
// 1-读取数据
//定义广播变量,因为每个单词都需要查看是否是费单词序列
val list: List[String] = List(",", ".", "!", "#", "$", "%")
val broadcastList: Broadcast[List[String]] = sc.broadcast(list)
//定义累加器
val acc_count: LongAccumulator = sc.longAccumulator("acc_count")

val fileRDD: RDD[String] = sc.textFile("data/baseinput/words1.txt")
val wordscount: RDD[String] = fileRDD
.filter(line => line != null && line.trim.length > 0)
.flatMap(line => line.split("\\s+"))
.filter(word => {
val listValue: List[String] = broadcastList.value
val isFlag: Boolean = listValue.contains(word)
if (isFlag) {
acc_count.add(1L)
}
!isFlag
})
// 2-使用广播变量,定义的一组非单词组合的list或map广播到executor执行
// 2-过滤出来,使用累加器统计非单词的组合
// 3-使用wordcout统计单词综合
println("wordcount的结果")
wordscount.map((_, 1)).reduceByKey(_ + _).collect().foreach(println(_))
println("非单词的组合:", acc_count.value)
}
}
  • spark.assets/image-20210331103425594.png
  • 通过睡眠Thread.sleep睡眠查看WebUi

Kryo序列化

  • spark.assets/image-20210331112703791.png
  • spark.assets/image-20210331112802475.png
  • 什么kryo序列化?

  • spark.assets/image-20210331113025537.png
  • spark.assets/image-20210331113420186.png
  • 如何使用kryo序列化?

  • spark.assets/image-20210331113605232.png
  • kryo序列化是java序列化的10x,kryo序列化并不是支持所有的类型,对于一部分类型需要注册,其他基础类直接设置

  • spark.assets/20200430105422254.png
  • 配置说明:

1
2
3
4
5
6
7
8
1. spark.serializer:序列化时用的类,需要申明为org.apache.spark.serializer.KryoSerializer。这个设置不仅控制各个worker节点之间的混洗数据序列化格式,同时还控制RDD存到磁盘上的序列化格式及广播变量的序列化格式。
2. spark.kryoserializer.buffer:每个Executor中的每个core对应着一个序列化buffer。如果你的对象很大,可能需要增大该配置项。其值不能超过spark.kryoserializer.buffer.max
3. spark.kryoserializer.buffer.max:允许使用序列化buffer的最大值
4. spark.kryo.classesToRegister:向Kryo注册自定义的的类型,类名间用逗号分隔
5. spark.kryo.referenceTracking:跟踪对同一个对象的引用情况,这对发现有循环引用或同一对象有多个副本的情况是很有用的。设置为false可以提高性能
6. spark.kryo.registrationRequired:是否需要在Kryo登记注册?如果为true,则序列化一个未注册的类时会抛出异常
7. spark.kryo.registrator:为Kryo设置这个类去注册你自定义的类。最后,如果你不注册需要序列化的自定义类型,Kryo也能工作,不过每一个对象实例的序列化结果都会包含一份完整的类名,这有点浪费空间
8. spark.kryo.unsafe:如果想更加提升性能,可以使用Kryo unsafe方式

kryo的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//刚开始的时候一直序列化不成功,应该是找不到类,java+scala混编,把scala设置成src了,去掉pom的这个配置就可以了。
//第一种方式
//实现一个KryoRegistrator注册类,在该类里面对自定义的序列化类进行注册,然后在conf里面配置该类
public class MyKryoRegistrator implements KryoRegistrator {
@Override
public void registerClasses( Kryo kryo) {
kryo.register(User.class);
kryo.register(User[].class);
kryo.register(scala.collection.convert.Wrappers$.class);
}
}
@Data
public class User implements Serializable {
int id;
String name;
String city;
List<String> hobby;
}

// 在conf配置如下
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
conf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.set("spark.kryo.registrator",classOf[MyKryoRegistrator].getName)


val sc = new SparkContext(conf)

//第二种方式
conf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[User]))

声明序列化为kryo

spark.assets/image-20230227033247178.png spark.assets/image-20230227032331056.png spark.assets/image-20230226164309634.png spark.assets/image-20230227042540983.png

扩展2:groupBy什么时候是窄依赖或宽依赖?

  • spark.assets/image-20210331114250206.png
  • spark.assets/image-20210331114434517.png

内存模型

  • spark.assets/image-20210331114727741.png
  • 问题1:
  • spark.assets/image-20210331114934684.png
  • Spark和Flink不一样,Flink的内存对用户无法操作,Spark的内存用户可以配置
  • SparkCore的内存模型
  • spark.assets/image-20210331115357701.png
  • 问题2:
  • spark.assets/image-20210331115548422.png
  • spark.assets/image-20210331115616734.png
  • spark.assets/image-20210331115812793.png
  • spark.assets/image-20210331120038457.png
  • spark.assets/image-20210331120334511.png

Spark的shuffle

  • Spark不同版本的shuffle

    • 1.2之前HashShuffleManager
    • 1.2之后SortShuffleManager
  • Shuffle阶段

    • shuffle write:mapper阶段,上一个stage得到最后的结果写出
      shuffle read :reduce阶段,下一个stage拉取上一个stage进行合并
  • HashShuffleManager

    • 未经优化的hashShuffleManager
    • spark.assets/image-20210331121139637.png
    • 优化的hashShuffleManager
      • shuffleFileGroup
    • spark.assets/image-20210331121322477-1622994367239.png
spark.assets/image-20210331121322477.png spark.assets/image-20210331121405820.png
  • SortShuffleManager

    • spark.assets/image-20210331121523379.png
    • bypassMerge机制触发条件如下

      • shuffle write task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(*默认为200*)

      • 不能是聚合类的算子,比如reduceByKey,本质是map side combine是否为true

      • spark.assets/image-20230328233608398.png

        此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

        该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

    • 普通机制

      • spark.assets/image-20210331122648578.png
      • (1)该模式下,数据会先写入一个内存数据结构中(默认5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。

        (2) 接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

        注意:

        shuffle中的定时器:定时器会检查内存数据结构的大小,如果内存数据结构空间不够,那么会申请额外的内存,申请的大小满足如下公式:

        applyMemory=nowMenory*2-oldMemory

        申请的内存=当前的数据内存情况*2-上一次的内嵌情况

        意思就是说内存数据结构的大小的动态变化,如果存储的数据超出内存数据结构的大小,将申请内存数据结构存储的数据*2-内存数据结构的设定值的内存大小空间。申请到了,内存数据结构的大小变大,内存不够,申请不到,则发生溢写。

        由于Spark是一个内存计算框架,没有办法严格控制Executor内存,只能采用监控方式监控内存,内存初始值为5M,当超出的时候,如5.02M,监控内存数据的对象会去申请5.022-5=5.04M内存,*如果申请到了就不需要溢写,否则会发生溢写。

        区别:

        (a)Spark内存数据初始值为5M,他可以申请扩大,而MR固定的Buffer为100M

        (b)溢写磁盘文件还带有索引文件,索引文件是对磁盘文件的描述,还记录每个分区的起始位置start offset和终止位置end offset

        (3)排序

        在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。

        (4)溢写

        排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

        (5)merge

        一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并成1个磁盘文件,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

        SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

bypass和普通机制的区别

第一,磁盘写机制不同;

第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

两个最终都是会产生2M(map task number)个磁盘小文件。

总结:SortShuffle分为普通机制和bypass机制,普通机制在内存数据结构(默认为5M)完成排序,而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能。

  • 页面监控:

    • spark.assets/image-20210331123737378.png
  • 补充:Shuffle类算子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
下面列出Shuffle类不同分类算子
去重
def distinct()
def distinct(numPartitions: Int)
聚合
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
排序
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
重分区
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)
集合或者表操作
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

  • *spark.shuffle.file.buffer* 默认32K,调整64K

  • *spark.reducer.maxSizeInFlight:* 默认48M

  • *spark.shuffle.io.maxRetries* :重试次数

  • ****spark.shuffle.io.retryWait:****:重试时间

  • 内存结构:Array和Map,5M

  • spark.assets/image-20210331145131584.png
  • spark.assets/image-20210331145515423.png
  • spark.assets/image-20210331145701303.png
  • spark.assets/image-20210331145919396.png
  • spark.assets/image-20210331150250050.png
  • spark.assets/image-20210331150509308.png
  • spark.assets/image-20210331151039694.png
  • spark.assets/image-20210331151711346.png
  • 补充源码

  • spark.assets/image-20210331152346503.png
  • spark.assets/image-20210331152525171.png
  • spark.assets/image-20210331152809557.png
  • spark.assets/image-20210331153110113.png
  • spark.assets/image-20210331153933489.png

SparkSQL引入

  • SparkCore撰写代码非常复杂,引入SparkSQL处理结构化数据
  • SparkSQL基于Hive之上做了改进

什么是sparksql

  • spark.assets/image-20210331160531652.png
  • spark.assets/image-20210331160809716.png

SparkSQL和Hive的关系(发展历程)

  • SparkSQL引入Hive的发展历程:

    • Hive

    • spark.assets/image-20210331161008605.png
    • Shark

    • spark.assets/image-20210331161309661.png

      HQL on MapReduce -> HQL on Spark

    • SparkSQL

    • spark.assets/image-20210331161546924.png
    • 历史发展

SparkSQL的数据结构

  • SparkCore数据结构:RDD

  • SparkSQL数据结构:DataFrame和DataSet

  • 思考:三种数据结构关系

  • spark.assets/image-20210331162730736.png

    在2.0中DataFrame = DataSet[Row]

    DataFrame是在运行时候进行类型的检查,而Dataset是在编译的时候进行类型检查,DataSet安全性更好

SparkSQL的DataFrame创建的四种方法

  • SparkSession=sqlcontext+hivecontext+sparkcontext
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package cn.spark.sparksql

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataset, SparkSession}

/**
* DESC:
* 使用SparkSession应用程序入口
*/
object _01SparkSession {
def main(args: Array[String]): Unit = {
//现在我们使用SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.getOrCreate()
//读取文件
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//
val valueDS: Dataset[String] = spark.read.textFile("data/baseinput/words.txt")
println("counts:" + valueDS.count())
}
}
  • RDD转DF的方式

  • 1-RDD配合样例类实现转化为DF

  • spark.assets/image-20210331165229535.png
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package cn.spark.sparksql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* DESC:
*/
case class People(id: Int, name: String, age: Int)

object _02toDFWay1 {
def main(args: Array[String]): Unit = {
//申请资源
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.getOrCreate()
//For implicit conversions from RDDs to DataFrames
import spark.implicits._
//读取数据
val sc: SparkContext = spark.sparkContext
val fileRDD: RDD[String] = sc.textFile("data/baseinput/sql/people1.txt")
//切分后套入person类
val peopleRDD: RDD[People] = fileRDD.map(_.split("\\s+")).map(x => People(x(0).toInt, x(1), x(2).toInt))
//引入隐式反馈转化为df
val peopleDF: DataFrame = peopleRDD.toDF()
//打印输出
peopleDF.show()
//+---+--------+---+
//| id| name|age|
//+---+--------+---+
//| 1|zhangsan| 20|
//| 2| lisi| 29|
//| 3| wangwu| 25|
//如何打印scheme
peopleDF.printSchema()
//root
// |-- id: integer (nullable = false)
// |-- name: string (nullable = true)
// |-- age: integer (nullable = false)
//关闭资源
spark.stop()
}
}
  • 2-spark直接读取数据文件转化为DF(了解)

    • spark.assets/image-20210331173050877.png
    • spark.assets/image-20210331173500656.png
    • spark.assets/image-20210331173714098.png
  • 4-RDD配合Row+StructType转化为DF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package cn.spark.sparksql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
* DESC:
*/

object _04toDFWay3 {
def main(args: Array[String]): Unit = {
//申请资源
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.getOrCreate()
//For implicit conversions from RDDs to DataFrames
import spark.implicits._
//读取数据
val sc: SparkContext = spark.sparkContext
val fileRDD: RDD[String] = sc.textFile("data/baseinput/sql/people1.txt")
//一个row对象就是一行数据
val peopleRDD: RDD[Row] = fileRDD.map(_.split("\\s+")).map(x => Row(x(0).toInt, x(1), x(2).toInt))
//需要引入structedFiled
val schema: StructType = new StructType()
.add("id", DataTypes.IntegerType, true)
.add("name", "string", true)
.add("age", "int", true)
//打印输出
val peopleDF: DataFrame = spark.createDataFrame(peopleRDD, schema)
peopleDF.show(2,false)
//+---+--------+---+
//| id| name|age|
//+---+--------+---+
//| 1|zhangsan| 20|
//| 2| lisi| 29|
//| 3| wangwu| 25|
//如何打印scheme
peopleDF.printSchema()
//root
// |-- id: integer (nullable = false)
// |-- name: string (nullable = true)
// |-- age: integer (nullable = false)
//关闭资源
spark.stop()
}
}
  • 方式2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package cn.spark.sparksql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DataTypes, IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
* DESC:
*/

object _04toDFWay4 {
def main(args: Array[String]): Unit = {
//申请资源
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.getOrCreate()
//For implicit conversions from RDDs to DataFrames
//读取数据
val sc: SparkContext = spark.sparkContext
val fileRDD: RDD[String] = sc.textFile("data/baseinput/sql/people1.txt")
//一个row对象就是一行数据
val peopleRDD: RDD[Row] = fileRDD.map(_.split("\\s+")).map(x => Row(x(0).toInt, x(1), x(2).toInt))
//需要引入structedFiled

/* val schema: StructType = StructType(Array(
StructField("id", DataTypes.IntegerType, true),
StructField("name", DataTypes.StringType, true),
StructField("age", DataTypes.IntegerType, true)
))*/

val schema: StructType = StructType(
StructField("f1", IntegerType, true) ::
StructField("f2", StringType, false) ::
StructField("f3", IntegerType, false) ::
Nil)
//打印输出
val peopleDF: DataFrame = spark.createDataFrame(peopleRDD, schema)
peopleDF.show(2, false)
//+---+--------+---+
//| id| name|age|
//+---+--------+---+
//| 1|zhangsan| 20|
//| 2| lisi| 29|
//| 3| wangwu| 25|
//如何打印scheme
peopleDF.printSchema()
//root
// |-- id: integer (nullable = false)
// |-- name: string (nullable = true)
// |-- age: integer (nullable = false)
//关闭资源
spark.stop()
}
}
  • 5-直接toDF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package cn.spark.sparksql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* DESC:
*/

object _03toDFWay2 {
def main(args: Array[String]): Unit = {
//申请资源
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.getOrCreate()
//For implicit conversions from RDDs to DataFrames
import spark.implicits._
//读取数据
val sc: SparkContext = spark.sparkContext
val fileRDD: RDD[String] = sc.textFile("data/baseinput/sql/people1.txt")
//切分后套入person类
val peopleRDD = fileRDD.map(_.split("\\s+")).map(x => (x(0).toInt, x(1), x(2).toInt))
//引入隐式反馈转化为df
val peopleDF: DataFrame = peopleRDD.toDF("id","name","age")
//打印输出
peopleDF.show()
//+---+--------+---+
//| id| name|age|
//+---+--------+---+
//| 1|zhangsan| 20|
//| 2| lisi| 29|
//| 3| wangwu| 25|
//如何打印scheme
peopleDF.printSchema()
//root
// |-- id: integer (nullable = false)
// |-- name: string (nullable = true)
// |-- age: integer (nullable = false)
//关闭资源
spark.stop()
}
}
  • 补充:
  • spark.assets/image-20210331172635814.png
  • 样例类的rdd到df使用比较多的,特别是字段多的时候
  • 如果需要动态增加字段,可以使用strucedType的方式

SparkSQL的DataFrame的花式操作

DSL案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package cn.spark.sparksql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

/**
* DESC:
*/
case class People1(id: Int, name: String, age: Int)

object _06DataFrameOpration {
def main(args: Array[String]): Unit = {
//申请资源
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.getOrCreate()
//For implicit conversions from RDDs to DataFrames
import spark.implicits._
//读取数据
val sc: SparkContext = spark.sparkContext
val fileRDD: RDD[String] = sc.textFile("data/baseinput/sql/people1.txt")
//切分后套入person类
val peopleRDD: RDD[People1] = fileRDD.map(_.split("\\s+")).map(x => People1(x(0).toInt, x(1), x(2).toInt))
//引入隐式反馈转化为df
val peopleDF: DataFrame = peopleRDD.toDF()
//打印输出
peopleDF.show()
//+---+--------+---+
//| id| name|age|
//+---+--------+---+
//| 1|zhangsan| 20|
//| 2| lisi| 29|
//| 3| wangwu| 25|
//如何打印scheme
peopleDF.printSchema()
//root
// |-- id: integer (nullable = false)
// |-- name: string (nullable = true)
// |-- age: integer (nullable = false)
//Spark中提供了查询的方式
//DSL基于领域查询语言
//需求1-能否将name字段筛选出来
peopleDF.select("name").show()
peopleDF.select(col("name")).show()
peopleDF.select(column("name")).show()
peopleDF.select('name).show()
//需求2-能否将name,age字段筛选出来
peopleDF.select("name", "age").show()
peopleDF.select(col("name"), col("age")).show()
peopleDF.select(column("name"), column("age")).show()
peopleDF.select('name, 'age).show()
//组合写法
peopleDF.select(col("name"), column("age")).show()
peopleDF.select(col("name"), 'age).show()
//peopleDF.select("name",'age).show()
//需求3-能否将age字段筛选出来+1
//peopleDF.select('name,'age+1).show()
//peopleDF.select("name","age"+1).show()
peopleDF.select(col("name"), col("age") + 1).show()
peopleDF.select(col("name"), column("age") + 1).show()
peopleDF.select('name, 'age + 1).show()
//需求4:过滤age大于等于25的,使用filter方法过滤
peopleDF.filter(col("age") > 25).show()
peopleDF.filter('age > 25).show()
println(peopleDF.filter('age > 25).count())
//需求5:按年龄进行分组并统计相同年龄的人数
val re1: DataFrame = peopleDF.groupBy("age").count() //这里通过count操作之后,schem中会增加count的名称
re1.show()
re1.printSchema()
re1.orderBy("count").show()
re1.orderBy('count).show()
//完整的方案
peopleDF.groupBy("age").count().orderBy("count").show()
//SQL结构化查询语言
//关闭资源
spark.stop()
}
}

SQL案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package cn.spark.sparksql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

/**
* DESC:
*/

object _06_1DataFrameOpration {
def main(args: Array[String]): Unit = {
//申请资源
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.getOrCreate()
//For implicit conversions from RDDs to DataFrames
import spark.implicits._
//读取数据
val sc: SparkContext = spark.sparkContext
val fileRDD: RDD[String] = sc.textFile("data/baseinput/sql/people1.txt")
//切分后套入person类
val peopleRDD: RDD[People1] = fileRDD.map(_.split("\\s+")).map(x => People1(x(0).toInt, x(1), x(2).toInt))
//引入隐式反馈转化为df
val peopleDF: DataFrame = peopleRDD.toDF()
//打印输出
//SQL
peopleDF.createOrReplaceTempView("peopleTable")
//需求1-能否将name字段筛选出来
spark.sql("select name from peopleTable").show()
//需求2-能否将name,age字段筛选出来
spark.sql("select name,age from peopleTable").show()
//需求3-查询年龄最大的前两名
spark.sql("select * from peopleTable order by age desc limit 2").show()
spark.sql(
"""
|select *
|from peopleTable
|order by age desc
|limit 2
|""".stripMargin).show()
//SQL结构化查询语言
//关闭资源
spark.stop()
}
}

SparkSQL的DataSet

  • wordcount

  • DSL:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package cn.spark.sparksql.wordcount

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
* DESC:
*/
object _01DSL {
def main(args: Array[String]): Unit = {
//现在我们使用SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.getOrCreate()
import spark.implicits._
//读取数据
val df1: DataFrame = spark.read.text("data/baseinput/words.txt")
val ds1: Dataset[String] = spark.read.textFile("data/baseinput/words.txt")
//这里只能使用ds
//df1.as[String].flatMap(_.split("\\s+"))
val value: Dataset[String] = ds1.flatMap(_.split("\\s+"))
value.show()
//+----------+
//| value|
//+----------+
//| hello|
//| spark|
//| hello|
//| flink|
//| hello|
//dsl
val result: Dataset[Row] = value.groupBy("value").count().orderBy('count.desc)
result.show()
spark.stop()
}
}
  • SQL:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package cn.spark.sparksql.wordcount

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
* DESC:
*/
object _02SQL {
def main(args: Array[String]): Unit = {
//现在我们使用SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.getOrCreate()
import spark.implicits._
//读取数据
val df1: DataFrame = spark.read.text("data/baseinput/words.txt")
val ds1: Dataset[String] = spark.read.textFile("data/baseinput/words.txt")
//这里只能使用ds
//df1.as[String].flatMap(_.split("\\s+"))
val value: Dataset[String] = ds1.flatMap(_.split("\\s+"))
value.show()
//+----------+
//| value|
//+----------+
//| hello|
//| spark|
//| hello|
//| flink|
//| hello|
//sql
value.createOrReplaceTempView("table")
val result: DataFrame = spark.sql(
"""
|select value,count(value) as counts
|from table
|group by value
|order by counts desc
|""".stripMargin)
//+----------+-----+
//| value|count|
//+----------+-----+
//| hello| 6|
//| sqoop| 1|
//| flink| 1|
//| pulsar| 1|
//| doris| 1|
//|clickhouse| 1|
//| spark| 1|
//+----------+-----+
result.show()
spark.stop()
}
}

SparkSQL的数据结构之间转换(掌握)

  • spark.assets/image-20210331182824551.png
  • RDD和DF和DS区别和练习
  • RDD通过scheme转化为df,df增加泛型转化ds
  • df是在运行的时候检查类型的,rdd和dataset是在编译时候进行类型检查
  • dataset在spark2.0之后基本dataframe统一,dataSet[ROW]=dataframe

电影评分案例(掌握)

  • 电影数据集统计需求分析

  • 需求:对电影评分数据进行统计分析

    • 获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)
  • 数据集的认知:

    • spark.assets/image-20210402102333287.png
  • 步骤

    • 1-首先读取数据集
    • 2-数据集的解析,使用RDD转DF(case class)
    • 3-SQL操作的实现
    • 4-使用DSL实现
    • 5-将结果存入到csv结果文件中
    • 6-将结果数据存入到MySQL
  • 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package cn.spark.sparksql.moviesPro


import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions._

/**
* DESC:
* 1-首先读取数据集
* 2-数据集的解析,使用RDD转DF(case class)
* 3-SQL操作的实现
* 4-使用DSL实现
* 5-将结果存入到csv结果文件中
* 6-将结果数据存入到MySQL中
*/
case class MoviesRatings(userId: String, moviesId: String, ratings: Double, timestamp: String)

object _01MoviesRatingLoader {
def main(args: Array[String]): Unit = {
val spark: SparkSession = {
val conf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
.set("spark.sql.shuffle.partitions", "4")//默认200
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
spark
}
import spark.implicits._
//1-首先读取数据集
val fileRDD: RDD[String] = spark.sparkContext.textFile("data/baseinput/ml-1m/ratings.dat")
//2-数据集的解析,使用RDD转DF(case class)
val moviesDF: DataFrame = fileRDD
.filter(line => line != null && line.trim.split("::").length == 4)
.mapPartitions(iter => {
iter.map(line => {
val arr: Array[String] = line.split("::")
MoviesRatings(arr(0), arr(1), arr(2).toDouble, arr(3))
})
}).toDF
moviesDF.show(3, false)
//+------+--------+-------+---------+
//|userId|moviesId|ratings|timestamp|
//+------+--------+-------+---------+
//|1 |1193 |5.0 |978300760|
//|1 |661 |3.0 |978302109|
//|1 |914 |3.0 |978301968|
//+------+--------+-------+---------+
moviesDF.printSchema()
//root
// |-- userId: string (nullable = true)
// |-- moviesId: string (nullable = true)
// |-- ratings: double (nullable = false)
// |-- timestamp: string (nullable = true)
//3-SQL操作的实现
moviesDF.createOrReplaceTempView("movies_table")
//需求:获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)
val sql =
"""
|select moviesId,round(avg(ratings),2) as avg_rating,count(moviesId) as rnt_rating
|from movies_table
|group by moviesId
|having rnt_rating>2000
|order by avg_rating desc,rnt_rating desc
|limit 10
|""".stripMargin
println("sql funtions way is:...........")
//spark.sql(sql).show()
//4-使用DSL实现
val resultDF: Dataset[Row] = moviesDF
.select("moviesId", "ratings")
.groupBy("moviesId")
.agg(
round(avg("ratings"), 2).as("avg_rating"),
count("moviesId").as("rnt_rating")
)
.filter('rnt_rating > 2000)
//.filter($"rnt_rating" >2000)
//.filter(col("rnt_rating") >2000)
//.filter(column("rnt_rating") >2000)
.orderBy($"avg_rating".desc, 'rnt_rating.desc)
.limit(10)
println("dsl funtions way is:...........")
resultDF.show()
//5-将结果存入到csv结果文件中
//+--------+----------+----------+
//|moviesId|avg_rating|rnt_rating|
//+--------+----------+----------+
//| 318| 4.55| 2227|
//resultDF
// .coalesce(1)
// .write
// .mode(SaveMode.Overwrite)
// .csv("data/baseoutput/output-2/")
// Thread.sleep(100 * 1000)
//6-将结果数据存入到MySQL中
//创建数据库和表
/* CREATE DATABASE bigdata CHARACTER SET utf8;
CREATE TABLE `tb_top10_movies` (
`movieId` INT(11) NOT NULL,
`avg_rating` FLOAT(10) DEFAULT NULL,
`cnt_rating` INT(11) DEFAULT NULL,
PRIMARY KEY (`movieId`)
) ENGINE=INNODB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
SELECT * FROM tb_top10_movies*/
//写入url,方法1
//val prop = new Properties()
//prop.setProperty("driver", "com.mysql.jdbc.Driver")
//prop.setProperty("user", "root")
//prop.setProperty("password", "root")
//resultDF
// .coalesce(1)
// .write
// .mode(SaveMode.Overwrite)
// .jdbc("jdbc:mysql://localhost:3306/bigdata", "tb_top10_movie", prop)
//写法2,https://spark.apache.org/docs/3.1.1/sql-data-sources-jdbc.html
// Saving data to a JDBC source
resultDF.write
.format("jdbc")
.mode(SaveMode.Overwrite)
.option("url", "jdbc:mysql://localhost:3306/bigdata")
.option("dbtable", "tb_top10_movie")
.option("user", "root")
.option("password", "root")
.save()
println("data write finished!")
//读取数据方法1
//val jdbcDF = spark.read
// .format("jdbc")
// .option("url", "jdbc:mysql://localhost:3306/bigdata")
// .option("dbtable", "tb_top10_movie")
// .option("user", "root")
// .option("password", "root")
// .load()
println("data reader finished!")
//读取方法2
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")
val jdbcDF2 = spark.read
.jdbc("jdbc:mysql://localhost:3306/bigdata", "tb_top10_movie", connectionProperties)
jdbcDF2.show()

spark.stop()
}
}
  • 总结:
  • spark.assets/image-20210402104901864.png
  • spark.assets/image-20210402105217031.png
  • 寻找多种数据源官网
  • spark.assets/image-20210402105609245.png
  • spark.assets/image-20210402105837056.png

Spark SQL整合Hive

SparkSQL的和Hive本地集成(为了测试)

  • SparkSQL和Hive本地测试步骤

    • 0-数据

    • 1,zhangsan,30

      2,lisi,40

      3,wangwu,50

    • 1-引入maven的依赖,spark_hive

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.5</version>
</dependency>
  • 2-在sparkconf下面引用ennableHiveSupprt支持Hive

  • 3-直接写hive的语句,使用spark.sql(hivesql)

  • 4-打印结果

  • 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package cn.spark.sparksql.toHive

import org.apache.spark.sql.SparkSession

/**
* DESC:
*
*/
object _01SparkToHive {
def main(args: Array[String]): Unit = {
//现在我们使用SparkSession
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.enableHiveSupport()//to a persistent Hive metastore
.getOrCreate()
//hive默认的元数据信息在derby中存储,后面配置了mysql

spark.sql("show databases").show()
spark.sql("create table if not exists student2(id int,name String,age int) row format delimited fields terminated by ','")
spark.sql("load data local inpath 'data/baseinput/sql/hive/students.csv' overwrite into table student2")
spark.sql("select * from student2 where age >30").show()


}
}
Spark整合集群环境的Hive
  • SParkOnHive

    • Hive—HiveOnMR
    • Shark—-HiveOnSpark—大部分用的都是Hive
    • SparkSQL—-独立实现的一套引擎
    • SparkSQL+HIve整合—-SparkOnHive–除了使用Hive元数据其他均使用SparkSQL的东西
  • 如何搭建SparkOnHive环境?

    • 1-首先让Spark知道Hive元数据的信息在哪里,需要将hive-site.xml放入saprk的conf目录下

    • spark.assets/image-20210402114644744.png
    • spark.assets/image-20210402115311922.png
    • 2-需要将hive-site.xml分发到其他两台上

    • 3-需要将mysql的jar包放在spark的jars目录下

    • 启动metastore

1
nohup /export/server/hive/bin/hive --service metastore 2>&1 >> /var/log.log &
  • 4-通过spark-sql或spark-shell测试是否整合成功

  • spark.assets/image-20210402115843898.png
  • spark.assets/image-20210402115928368.png
  • 4-通过IDEA远程连接集群环境的Hive进行相关操作

  • spark.assets/image-20210402121329177.png
  • spark.assets/image-20210402121252479.png
  • 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package cn.spark.sparkbase

import org.apache.spark.sql.SparkSession

/**
* DESC:
*/
object _02SparkToHive {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.config("spark.sql.shuffle.partitions", "4")
//在2.0.0之前hive.sql.warehouse.dir,2.0之后spark.sql.warehouse.dir
//warehouse的hive数据存放地址
.config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")
//metastore服务地址,thrift
.config("hive.metastore.uris", "thrift://node3:9083")
.enableHiveSupport() //to a persistent Hive metastore
.getOrCreate()

spark.sql("show databases").show()
spark.sql("use sparkhive3")
spark.sql("create table if not exists student2(id int,name String,age int) row format delimited fields terminated by ','")
spark.sql("show tables").show()
spark.sql("load data local inpath 'data/baseinput/sql/hive/students.csv' overwrite into table student2")
spark.sql(
"""
|select *
|from student2
|where age >30
|""".stripMargin).show()
//+---+------+---+
//| id| name|age|
//+---+------+---+
//| 2| lisi| 40|
//| 3|wangwu| 50|
//+---+------+---+
spark.stop()
}
}
  • spark.assets/image-20210402121855213.png
两种启动方式
  • spark.assets/image-20210402122223537.png
  • Spark的CLI方式

  • spark.assets/image-20210402122319835.png
  • Spark的Beeline方式,thrift

1
Spark的ThriftServer服务(类似于Hive的HiveServer2),在通过Beeline连接执行SQL
  • spark.assets/image-20210402122421815.png
1
2
3
4
5
SPARK_HOME=/export/server/spark
$SPARK_HOME/sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10001 \\同时连hive可换一个port
--hiveconf hive.server2.thrift.bind.host=node3 \
--master local[2]
  • spark.assets/image-20210402122542530.png
  • 可以在其他机器上访问sparkonhive

  • spark.assets/image-20210402122821677.png
  • 可以通过thrift访问sparkonhive的数据

SparkSQL的UDF函数(必须掌握)

  • UDF:一对一,Spark使用最多

  • UDAF:多对一,Spark基本实现

  • UDTF:一对多,Hive实现了UDTF,Spark没有实现

  • spark.assets/image-20210402144310799.png
  • spark.assets/image-20210402144358985.png

编程模式sparksession使用UDF 需要先注册

spark.udf.register() 方式

  • 1.通过匿名函数注册udf
1
2
3
4
//注册udf, 返回字符串长度
spark.udf.register("strLen", (str: String) => str.length())
//spark sql中使用udf
spark.sql("select name,strLen(name) as name_len from user").show(false)
  • 2.通过实名函数注册udf
1
2
3
4
5
6
7
8
9
//定义实名函数
def getStrLen(str: String): Int = {
str.length
}

//注册udf,要在实名函数后面加 _(注意前面有个空格)
spark.udf.register("strLen", getStrLen _)
//spark sql中使用udf
spark.sql("select name,strLen(name) as name_len from user").show(false)
  • SparkSQL中使用UDF(如果是打包单个类注意指定META-INF/MANIFEST.MF在src下,不要放在resouces,不然会找不到主类,可带一个空main函数类,注册时就不会报类路径错误)
  • 方式一:在启动spark-sql时通过–jars指定
1
2
3
4
cd $SPARK_HOME/bin
spark-sql --jars /home/hadoop/lib/udf.jar
CREATE TEMPORARY FUNCTION hello AS 'com.luogankun.udf.HelloUDF';
select hello(url) from page_views limit 1;
  • 方式二:

1)需要先将udf.jar的路径配置到spark-env.sh的SPARK_CLASSPATH中,形如:

1
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/software/mysql-connector-java-5.1.27-bin.jar:/home/hadoop/lib/udf.jar

2)再启动spark-sql,直接CREATE TEMPORARY FUNCTION即可;

1
2
3
4
cd $SPARK_HOME/bin
spark-sql
CREATE TEMPORARY FUNCTION hello AS 'com.luogankun.udf.HelloUDF';
select hello(url) from page_views limit 1;
  • 方式三:Thrift JDBC Server中使用UDF

在beeline命令行中执行:

1
2
3
4
5
6
create function base_analizer as 'com.zhengkw.udf.BaseFieldUDF' using jar 'hdfs://hadoop102:9000/user/hive/jars/hivefunction-1.0-SNAPSHOT.jar';//hdfs路径

add jar /home/hadoop/lib/udf.jar;//本地机器路径
CREATE TEMPORARY FUNCTION hello AS 'com.luogankun.udf.HelloUDF';
select hello(url) from page_views limit 1;

  • spark/hive共用自定义函数(见hive udf)
1
2
3
/**
基于DataFrame(或者DataSet) 的Java(或Python、Scale) 可以轻松的定义注册UDF,但是想在SQL(SparkSQL、Hive) 中自定义或者想共用就遇到困难。这时,可以先按照一定规约自定义函数,再向Spark(或Hive)注册为永久函数,实现在Spark和Hive共享UDF的目的。
**/
  • UDF函数案例1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package cn.spark.sparksql.udf

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

/**
* DESC:
* 1-申请资源
* 2-读取数据
* 3-执行转化为大写,适合于UDF函数
*/
case class Smaller(line: String)

object _01wordsToBigger {
def main(args: Array[String]): Unit = {
//1-申请资源
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.getOrCreate()
import spark.implicits._
//2-读取数据
val fileRDD: RDD[String] = spark.sparkContext.textFile("data\\baseinput\\sql\\udf\\udf.txt")
//3-执行转化为大写,适合于UDF函数
val wordsDF: DataFrame = fileRDD.map(x => Smaller(x)).toDF
wordsDF.show()
wordsDF.printSchema() // |-- line: string (nullable = true)
//user-defined function
//spark.udf.register("wordToBigger", new UDF1[String, String] {
// override def call(t1: String): String = {
// t1.toUpperCase()
// }
//}, StringType)
spark.udf.register("wordToBigger",(line:String)=>{
line.toUpperCase()
})
//SQL
wordsDF.createOrReplaceTempView("word_view")
val result1: DataFrame = spark.sql("select line,wordToBigger(line) as bigger from word_view")
result1.show()
//DSL
val result2: DataFrame = wordsDF.select('line,
callUDF("wordToBigger", 'line).as("bigger"))
result2.show()
}
}
  • 更多参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package cn.spark.sparksql.udf

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

/**
* DESC:
* 1-申请资源
* 2-读取数据
* 3-执行转化为大写,适合于UDF函数
*/

object _02udfDemo {
def main(args: Array[String]): Unit = {
//1-申请资源
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.getOrCreate()
import spark.implicits._

val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
spark.udf.register("simpleUDF", (n: Int) => n * n)
df.select($"id", callUDF("simpleUDF", $"value").as("pow(x,2)")).show()
val df1 = Seq(("id1", 1,6), ("id2", 4,7), ("id3", 5,8)).toDF("id", "value1","value2")
spark.udf.register("simpleUDF2",(v1:Int,v2:Int)=>{
v1*v2
})
df1.createOrReplaceTempView("table_df")
spark.sql(
"""
|select id,simpleUDF2(value1,value2) as simple
|from table_df
|""".stripMargin).show()

df1.select($"id",
callUDF("simpleUDF2",'value1,'value2).as("bigger")).show()
}
}
  • UDAF了解
  • spark.assets/image-20210402151001095.png
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package cn.spark.sparksql.udf
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

class SparkFunctionUDAF extends UserDefinedAggregateFunction{
//输入的数据类型的schema
override def inputSchema: StructType = {
StructType(StructField("input",LongType)::Nil)
}
//缓冲区数据类型schema,说白了就是转换之后的数据的schema
//求解平均值“总金额,总人数”
override def bufferSchema: StructType = {
StructType(StructField("sum",LongType)::StructField("total",LongType)::Nil)
}
//返回值的数据类型---总金额/总人数 可能会有小数,返回doubleType
override def dataType: DataType = {
DoubleType
}
//确定是否相同的输入会有相同的输出
override def deterministic: Boolean = {
true
}
//初始化内部数据结构
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L //总金额都累加到Buffer0中,初始值为0
buffer(1) = 0L //总人数都累加到buffer1参数中
}
//更新数据内部结构
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
//所有的金额相加
buffer(0) = buffer.getLong(0) + input.getLong(0)
//一共有多少条数据
buffer(1) = buffer.getLong(1) + 1 //能否累加10?--不能,+!为了求解人数
}
//来自不同分区的数据进行合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) =buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
//计算输出数据值
override def evaluate(buffer: Row): Any = {
buffer.getLong(0).toDouble / buffer.getLong(1)
}
}

object SparkFunctionUDAF {
def main(args: Array[String]): Unit = {
//获取sparkSession
val sparkSession: SparkSession = SparkSession.builder().appName("sparkUDAF").master("local[2]").getOrCreate()
//通过sparkSession读取json文件得到DataFrame
val employeeDF: DataFrame = sparkSession.read.json("data\\baseinput\\sql\\udf\\udaf.txt")
//通过DataFrame创建临时表
employeeDF.createOrReplaceTempView("employee_table")
//注册我们的自定义UDAF函数
sparkSession.udf.register("avgSal",new SparkFunctionUDAF)
//调用我们的自定义UDAF函数
sparkSession.sql("select avgSal(salary) from employee_table").show()
sparkSession.close()
}
}

SparkSQL函数

  • Hive中的开窗函数

  • 聚合类开窗函数

    • count() over(order by partition by)
  • 排序类开窗函数

    • row_number() over(partition by order by) 12345
    • rank() over(partition by order by) 446
    • dense_rank() over(partition by order by) 445
  • 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package cn.spark.sparksql.func

import org.apache.spark.sql.SparkSession

/**
* DESC:
*/
case class Score(name: String, clazz: Int, score: Int)

object _01class {
def main(args: Array[String]): Unit = {
//1-申请资源
val spark: SparkSession = SparkSession
.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.getOrCreate()
import spark.implicits._
val scoreDF = spark.sparkContext.makeRDD(Array(
Score("a1", 1, 80),
Score("a2", 1, 78),
Score("a3", 1, 95),
Score("a4", 2, 74),
Score("a5", 2, 92),
Score("a6", 3, 99),
Score("a7", 3, 99),
Score("a8", 3, 45),
Score("a9", 3, 55),
Score("a10", 3, 78),
Score("a11", 3, 100))
).toDF("name", "class", "score")
scoreDF.createOrReplaceTempView("scores")
scoreDF.printSchema()
//聚合开窗函数
spark.sql("select count(name) from scores").show()
spark.sql("select name,class,score,count(name) over() ccc from scores").show()
spark.sql("select name,class,score,count(name) over(partition by class) ccc from scores").show()
//排序开窗函数 123
spark.sql("select name,class,score,row_number() over(order by class) sss from scores").show()
spark.sql("select name,class,score,row_number() over(partition by class order by score) sss from scores").show()
//446
spark.sql("select name,class,score,rank() over(order by class) sss from scores").show()
spark.sql("select name,class,score,rank() over(partition by class order by score) sss from scores").show()
//445
spark.sql("select name,class,score,dense_rank() over(order by class) sss from scores").show()
spark.sql("select name,class,score,dense_rank() over(partition by class order by score) sss from scores").show()
//ntile
spark.sql("select name,class,score,ntile(6) over(order by class) sss from scores").show()
spark.sql("select name,class,score,ntile(6) over(partition by class order by score) sss from scores").show()
}
}

SparkSQL底层如何执行解析成RDD

  • spark.assets/image-20210402155225251.png
  • 步骤

  • spark.assets/image-20210402155752267.png
  • spark.assets/image-20210402155849640.png
  • spark.assets/image-20210402160151359.png
  • spark.assets/image-20210402160250191.png
  • 如何查看逻辑计划和物理计划

  • (1)通过WebUI查看

  • spark.assets/image-20210402160629237.png
  • (2)通过spark-shell交互式命令行,启动4040端口

  • spark.assets/image-20210402161146763.png
  • spark.assets/image-20210402161214076.png
  • 面试时候回答的SparkSQL底层解析原理?

  • ![](spark.assets/image-20210402161545086.png)

  • spark.assets/image-20210402161622186.png

SparkStreaming引入–RDD–DStream

流数据的处理模式

  • spark.assets/image-20210402162641358.png
  • 两种处理方式

  • spark.assets/image-20210402162855266.png
  • SparkStreaming官网

  • spark.assets/image-20210402163103866.png
  • spark.assets/image-20210402163419342.png

SparkStreaming数据结构

SparkStreaming原理架构

  • spark.assets/image-20210402171712399.png
  • spark.assets/image-20210402172037127.png
  • spark.assets/image-20210402172258524.png
  • spark.assets/image-20210402172527740.png
  • spark.assets/image-20210402172713126.png

SparkStreaming原理

基础原理

  • spark.assets/image-20210403102341971.png
  • spark.assets/image-20210403102446613.png

SparkStreaming原理深入

DStream两种算子

  • 底层RDD@Time时间序列构成
  • RDD分为Transormation算子和Action算子,那个DStream如何划分呢?
  • 答案:Transormation和OutPutOpration操作
  • spark.assets/image-20210402172907207.png
  • spark.assets/image-20210402172942855.png
  • 问题:对于SparkSTreaming的DStream的操作支持的算子并没有那么多,比如排序算子
  • SparkStreaming解决方案利用transform的方法将DStream转化为RDD,因为RDD的transform很多的
  • spark.assets/image-20210402173248662.png

SparkStreaming初体验-无状态

  • 需求:数据按照每5s处理一次,输入数据wordcount

  • 数据源:nc -lk 9999/9998 模拟socket数据源

  • 步骤:

    • 1-准备上下文环境
    • 2-读取数据
    • 3-flatMap
    • 4-map
    • 5-reduceBykey
    • 6-start
    • 7-awaitTermination
  • 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.spark.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* DESC:
*1-准备上下文环境
*2-读取数据
*3-flatMap
*4-map
*5-reduceBykey
*6-start
*7-awaitTermination
*/
object _01baseStreaming {
def main(args: Array[String]): Unit = {
// 1-准备上下文环境
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
//batchDuration the time interval at which streaming data will be divided into batches
val ssc = new StreamingContext(conf, Seconds(5))
// 2-读取数据--从socket读数据,需要在linux或windows中安装nc,如何安装nc;yum install -y nc 执行nc -lk 9999
val receiveRDD: ReceiverInputDStream[String] = ssc.socketTextStream("node1.spark.cn", 9999)
// 3-flatMap
val flatRDD: DStream[String] = receiveRDD.flatMap(_.split("\\s+"))
// 4-map
val mapRDD: DStream[(String, Int)] = flatRDD.map(x => (x, 1))
// 5-reduceBykey
val resultRDD: DStream[(String, Int)] = mapRDD.reduceByKey((a: Int, b: Int) => a + b)
resultRDD.print()
// 6-start
//Start the execution of the streams.
ssc.start()
// 7-awaitTermination--Wait for the execution to stop
ssc.awaitTermination()
}
}
  • 截图

    • spark.assets/image-20210402164943001.png

状态计算

updateStateByKey

  • 首先实现有状态的统计,然后我们进行transform排序

  • 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package cn.spark.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* DESC:
* 1-准备上下文环境
* 2-读取数据
* 3-flatMap
* 4-map
* 5-reduceBykey
* 6-start
* 7-awaitTermination
*/
object _02baseStreaming {
/**
* @param currentValue 当前的值
* @param historyValue 历史的值
* @return Option-none some
*/
def updateFunc(currentValue: Seq[Int], historyValue: Option[Int]): Option[Int] = {
val sumV: Int = currentValue.sum + historyValue.getOrElse(0)//如果有值就给值否则给0
//Some(sumV)
Option(sumV)
}

def main(args: Array[String]): Unit = {
// 1-准备上下文环境
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
//batchDuration the time interval at which streaming data will be divided into batches
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("data/baseoutput/output-4")
// 2-读取数据--从socket读数据,需要在linux或windows中安装nc,如何安装nc;yum install -y nc 执行nc -lk 9999
val receiveRDD: ReceiverInputDStream[String] = ssc.socketTextStream("node1.spark.cn", 9999)
// 3-flatMap
val flatRDD: DStream[String] = receiveRDD.flatMap(_.split("\\s+"))
// 4-map
val mapRDD: DStream[(String, Int)] = flatRDD.map(x => (x, 1))
// 5-reduceBykey
//在Scala中,它和Java一样也是拥有方法和函数。Scala的方法是类的一部分,而函数是一个对象可以赋值给一个变量。换句话来说,在类中定义的函数即是方法。

//Scala 中可以使用 def语句和val 语句定义函数,而定义方法只能使用def 语句。
//需要函数自动转换,不会自动转换的可以通过方法名+空格+_转化成函数
val resultRDD: DStream[(String, Int)] = mapRDD.updateStateByKey(updateFunc)
//增加排序的方法--tranform中使用rdd
val sortResultDS: DStream[(String, Int)] = resultRDD.transform(rdd => {
rdd.sortBy(_._2, false)
})
sortResultDS.print()

// 启动新的线程,希望在特殊的场合关闭SparkStreaming
new Thread(new Runnable {
override def run(): Unit = {

while ( true ) {
try {
Thread.sleep(5000)
} catch {
case ex : Exception => println(ex)
}

// 监控HDFS文件的变化
val fs: FileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration(), "root")

val state: StreamingContextState = ssc.getState()
// 如果环境对象处于活动状态,可以进行关闭操作
if ( state == StreamingContextState.ACTIVE ) {

// 判断路径是否存在
val flg: Boolean = fs.exists(new Path("hdfs://node1:8020/spark/stopSparkHistory/"+appName+"@"+ssc.sparkContext.getConf.getAppId))
if ( flg ) {
// 关闭采集器和Driver:优雅的关闭
ssc.stop(true, true)
System.exit(0)
}

}
}

}
}).start()
// 6-start
//Start the execution of the streams.
ssc.start()


// 7-awaitTermination--Wait for the execution to stop调用awaitTermination(),driver将阻塞在这里,直到流式应用意外退出。
ssc.awaitTermination()
}
}
  • spark.assets/image-20210402174214391.png

双流合并

  • 有两个流的合并union
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package cn.spark.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* DESC:
* 1-准备上下文环境
* 2-读取数据
* 3-flatMap
* 4-map
* 5-reduceBykey
* 6-start
* 7-awaitTermination
*/
object _06twoSourceStreaming {

def updateFunc(curentValue: Seq[Int], histouryValue: Option[Int]): Option[Int] = {
val sum: Int = curentValue.sum + histouryValue.getOrElse(0)
Option(sum)
}

def main(args: Array[String]): Unit = {
// 1-准备上下文环境
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[3]")
//batchDuration the time interval at which streaming data will be divided into batches
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("data/baseoutput/output-6")
// 2-读取数据--从socket读数据,需要在linux或windows中安装nc,如何安装nc;yum install -y nc 执行nc -lk 9999
val receiveRDD1: ReceiverInputDStream[String] = ssc.socketTextStream("node1.spark.cn", 9999)
val receiveRDD2: ReceiverInputDStream[String] = ssc.socketTextStream("node1.spark.cn", 9998)
val receiveRDD: DStream[String] = receiveRDD1.union(receiveRDD2)
// 3-flatMap
val flatRDD: DStream[String] = receiveRDD.flatMap(_.split("\\s+"))
// 4-map
val mapRDD: DStream[(String, Int)] = flatRDD.map(x => (x, 1))
// 5-reduceBykey
val resultRDD: DStream[(String, Int)] = mapRDD.updateStateByKey(updateFunc)
resultRDD.print()
// 6-start
//Start the execution of the streams.
ssc.start()
// 7-awaitTermination--Wait for the execution to stop
ssc.awaitTermination()
}
}

mapWithState

  • updateStateByKey的所有计算结果经过计算之后都会返回Driver端

  • mapWithState只会返回driver端更新的数据,而不是全部数据

  • spark.assets/image-20210403093630206.png
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package cn.spark.sparkstreaming

import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{MapWithStateDStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

/**
* DESC:
* 1-首先加载StreamingContext上下文对象,获取资源,内部调用sparkconf
* 2-读取外部数据源,如socket数据源
* 3-flatMap
* 4-map
* 5-mapwithstate
* 6-print
* 7-ssc.start
* 8-ssc.awaitTermination等待程序异常退出
*/
object _04mapWithState {
def main(args: Array[String]): Unit = {
//1-首先加载StreamingContext上下文对象,获取资源,内部调用sparkconf
val ssc: StreamingContext = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc
}
ssc.checkpoint("data/baseoutput/output-5")
//2-读取外部数据源,如socket数据源
val receiveStream: ReceiverInputDStream[String] = ssc.socketTextStream("node1.spark.cn", 9999)
//3-flatMap
//4-map
//5-mapwithstate
val result: MapWithStateDStream[String, Int, Int, Any] = receiveStream
.filter(line => StringUtils.isNotBlank(line))
.flatMap(_.split("\\s+"))
.map((_, 1))
.mapWithState(StateSpec.function(mappingFunction))
//6-print
result.print()
//7-ssc.start
ssc.start()
//8-ssc.awaitTermination等待程序异常退出
ssc.awaitTermination()
}

/**
* word-统计单词
* option-
* state-
*/
val mappingFunction = (word: String, option: Option[Int], state: State[Int]) => {
//1-首先查看状态的时间是否超时
if (state.isTimingOut()) {
println(word + "is time out..")
//2-没有超时状态
} else {
//3-option历史的值,结合当前的状态的值累加
val sum: Int = option.getOrElse(0) + state.getOption().getOrElse(0)
val keyFreq: (String, Int) = (word, sum)
//4-state的update的方法执行
state.update(sum)
keyFreq
}
}
}

窗口计算

  • spark.assets/image-20210403105136557.png
  • 窗口长度:被包含在窗口中的数据需要被处理

  • 窗口滑动时间间隔:窗口每过多久滑动一次

  • 数据处理时间5s处理一次,streamigcontect(second(5))

  • spark.assets/image-20210403105628523.png
  • 切分批次:没隔多久处理一次,一般设置5s

  • 窗口长度:被包括在窗口中的数据需要被计算

  • 窗口滑动时间间隔;窗口每隔多久计算一次

  • 当窗口的长度=窗口的滑动时间间隔,不会造成数据丢失或重复

  • 当窗口的长度<滑动的时间间隔,会造成数据丢失

  • 当窗口的长度>滑动时间间隔,造成数据重复,

    重复得看最后的数据怎么使用,如果说是每5min统计最近1小时的数据,只要这一小时的数据,没有重复可言,如果是要保留数据最后累计,就会有重复累计问题。

  • 上述案例下,窗口的长度设置为17s,不能随便设置

  • 一般设置,窗口的滑动时间间隔和窗口的长度必须是数据处理时间的整数倍,否则rdd需要被分为两部分,违反了rdd的不可变

  • 案例:

  • 每隔10s统计一下10s的数据

  • spark.assets/image-20210403110227029.png
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package cn.spark.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* DESC:
* 1-准备上下文环境
* 2-读取数据
* 3-flatMap
* 4-map
* 5-reduceBykey
* 6-start
* 7-awaitTermination
*/
object _07windowsOpration {


def main(args: Array[String]): Unit = {
// 1-准备上下文环境
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[3]")
//batchDuration the time interval at which streaming data will be divided into batches
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("data/baseoutput/output-6")
// 2-读取数据--从socket读数据,需要在linux或windows中安装nc,如何安装nc;yum install -y nc 执行nc -lk 9999
val receiveRDD: ReceiverInputDStream[String] = ssc.socketTextStream("node1.spark.cn", 9999)

// 3-flatMap
val flatRDD: DStream[String] = receiveRDD.flatMap(_.split("\\s+"))
// 4-map
val mapRDD: DStream[(String, Int)] = flatRDD.map(x => (x, 1))
// 5-reduceBykey
val resultRDD: DStream[(String, Int)] = mapRDD.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b,
Seconds(10),
Seconds(10))
resultRDD.print()
// 6-start
//Start the execution of the streams.
ssc.start()
// 7-awaitTermination--Wait for the execution to stop
ssc.awaitTermination()
}
}
  • 换种API实现

  • spark.assets/image-20210403110649150.png
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package cn.spark.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* DESC:
* 1-准备上下文环境
* 2-读取数据
* 3-flatMap
* 4-map
* 5-reduceBykey
* 6-start
* 7-awaitTermination
*/
object _08windowsOpration {


def main(args: Array[String]): Unit = {
// 1-准备上下文环境
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[3]")
//batchDuration the time interval at which streaming data will be divided into batches
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("data/baseoutput/output-6")
// 2-读取数据--从socket读数据,需要在linux或windows中安装nc,如何安装nc;yum install -y nc 执行nc -lk 9999
val receiveRDD: ReceiverInputDStream[String] = ssc.socketTextStream("node1.spark.cn", 9999)

// 3-flatMap
val flatRDD: DStream[String] = receiveRDD.flatMap(_.split("\\s+"))
// 4-map
val mapRDD: DStream[(String, Int)] = flatRDD.map(x => (x, 1))
// 5-reduceBykey
val resultRDD: DStream[(String, Int)] = mapRDD.window(Seconds(10), Seconds(10))
// 6-tranform
val resultRDD1: DStream[(String, Int)] = resultRDD.transform(rdd => {
rdd.reduceByKey(_ + _)
})
resultRDD1.print()
// 6-start
//Start the execution of the streams.
ssc.start()
// 7-awaitTermination--Wait for the execution to stop
ssc.awaitTermination()
}
}
  • spark.assets/image-20210403110809607.png
  • 增加排序的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package cn.spark.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* DESC:
* 1-准备上下文环境
* 2-读取数据
* 3-flatMap
* 4-map
* 5-reduceBykey
* 6-start
* 7-awaitTermination
*/
object _08windowsOpration {


def main(args: Array[String]): Unit = {
// 1-准备上下文环境
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[3]")
//batchDuration the time interval at which streaming data will be divided into batches
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("data/baseoutput/output-6")
// 2-读取数据--从socket读数据,需要在linux或windows中安装nc,如何安装nc;yum install -y nc 执行nc -lk 9999
val receiveRDD: ReceiverInputDStream[String] = ssc.socketTextStream("node1.spark.cn", 9999)

// 3-flatMap
val flatRDD: DStream[String] = receiveRDD.flatMap(_.split("\\s+"))
// 4-map
val mapRDD: DStream[(String, Int)] = flatRDD.map(x => (x, 1))
// 5-reduceBykey
val resultRDD: DStream[(String, Int)] = mapRDD.window(Seconds(10), Seconds(5))
// 6-tranform
val resultRDD1: DStream[(String, Int)] = resultRDD.transform(rdd => {
val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey(_ + _)
reduceRDD.sortBy(_._2, false)
})
resultRDD1.print()
// 6-start
//Start the execution of the streams.
ssc.start()
// 7-awaitTermination--Wait for the execution to stop
ssc.awaitTermination()
}
}

SparkStreaming和SparkSQL整合

SparkStreaming和基本数据源(监控HDFS文件夹下内容同格式文件)

  • 基础数据源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package cn.spark.sparkstreaming.filesource

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* DESC:
* 1-申请资源
* 2-读取数据源,设置文件夹
* 3-通过flatMap,map,updateStateByKey统计
* 4-ssc.start
* 5-ssc.awitTermination
* 6-ssc.stop
*/
object _01textFileSource {

def updateFunc(currenValue: Seq[Int], historyValue: Option[Int]): Option[Int] = {
val sum: Int = currenValue.sum + historyValue.getOrElse(0)
Some(sum)
}

def main(args: Array[String]): Unit = {
// 1-申请资源
val ssc: StreamingContext = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc
}
ssc.checkpoint("data/baseoutput/output-7")
// 2-读取数据源,设置文件夹
val rddDS: DStream[String] = ssc.textFileStream("hdfs://node1.spark.cn:8020/wordcount/trans/")
// 3-通过flatMap,map,updateStateByKey统计
val resultRDD: DStream[(String, Int)] = rddDS
.flatMap(_.split("\\s+"))
.map((_, 1))
.updateStateByKey(updateFunc)
resultRDD.print()
// 4-ssc.start
ssc.start()
// 5-ssc.awitTermination
ssc.awaitTermination()
// 6-ssc.stop
ssc.stop(true,true)
}
}
  • spark.assets/image-20210403114432612.png
  • 通过SparkSQL整合SparkSTreaming

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package cn.spark.sparkstreaming.filesource

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* DESC:
* 1-申请资源
* 2-读取数据源,设置文件夹
* 3-通过flatMap,map,updateStateByKey统计
* 4-ssc.start
* 5-ssc.awitTermination
* 6-ssc.stop
*/
object _03textFileSourceSparkSQL {

def updateFunc(currenValue: Seq[Int], historyValue: Option[Int]): Option[Int] = {
val sum: Int = currenValue.sum + historyValue.getOrElse(0)
Some(sum)
}

def main(args: Array[String]): Unit = {
// 1-申请资源
val ssc: StreamingContext = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc
}
ssc.checkpoint("data/baseoutput/output-7")
// 2-读取数据源,设置文件夹
val rddDS: DStream[String] = ssc.textFileStream("hdfs://node1.spark.cn:8020/wordcount/trans/")
// 3-通过flatMap,map,updateStateByKey统计
val resultRDD: DStream[String] = rddDS.flatMap(_.split("\\s+"))

val resultValue: DStream[(String, Int)] = resultRDD.map((_, 1)).updateStateByKey(updateFunc)

resultValue.foreachRDD(rdd => {
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val df: DataFrame = rdd.toDF("word", "count")
//dsl
//sql
df.createOrReplaceTempView("table_view")
val result: DataFrame = spark.sql(
"""
|select *
|from table_view
|""".stripMargin)
result.show()
})

// 4-ssc.start
ssc.start()
// 5-ssc.awitTermination
ssc.awaitTermination()
// 6-ssc.stop
ssc.stop(true, true)
}
}

SparkStreaming和Kafka整合

kafka原理

  • Kafka
  • 什么是消息:应用之间传输的数据
  • 什么是消息队列:应用之间保障消息传递的准确性
  • 消息队列分类:点对点,发布订阅者方式
  • 消息队列应用:限流消峰,应用解耦
  • Kafka:Scala+Java混编,LinkedIn(ActiveMQ)
  • Kafka:生产者,消费者,Broker
  • 当前的虚拟机的Kafka的版本是1,1.0大家之前学习的是2,4,1,高版本命令更多使用的是boostrap-server替代zk
  • spark.assets/image-20210403151702229.png

Kafka安装

  • 脚本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 当前版本是1.1.0 大家使用的是2.4.1
# 1-kafka启动
nohup bin/kafka-server-start.sh config/server.properties &
# 2-查看kafka内部的topic的内容
/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181
#3-描述topic
/export/server/kafka/bin/kafka-topics.sh --describe --zookeeper node1:2181 --topic spark_kafka
# 创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic spark_kafka
#描述topic
/export/server/kafka/bin/kafka-topics.sh --describe --zookeeper node1:2181 --topic spark_kafka
# 删除opic
/export/server/kafka/bin/kafka-topics.sh --zookeeper node1:2181 --delete --topic test

#生产者和消费者
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark_kafka
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark_kafka --from-beginning

整合两种方式区别

  • 实现SparkStreaming和kafka整合
  • 两个版本:0.8级0.10以上,目前企业中使用的是0.10版本以上
  • 两种消费方式:
    • Receiver:启动多个Receiver接收器线程拉取kafka数据过来合并,使用WAL和checkpount等
      • 问题1:需要开启多个Receiver线程拉取数据,需要合并数据源
      • 问题2:使用的HighLevelAPI对接的是offset维护在ZK中
      • 问题3:需要大量的WAL或checkpoint影响效率
    • Direct:直接采用直接连接kafka的topic的partition数据,获取区中offset
      • 解决1:直接使用spark的rdd的一个分区对接的是kafka的一个partition
      • 解决2:使用的是lowlevel api将offset在kafka的topic中存放的,还可以手动设置
      • 解决3:没有提供WAL的方式
  • spark.assets/image-20210403153026537.png
  • spark.assets/image-20210403153254881.png
  • spark.assets/image-20210403153950592.png
  • spark.assets/image-20210403153957570.png
  • spark.assets/image-20210403153421089.png
  • spark.assets/image-20210403153551432.png
  • spark.assets/image-20210403153758585.png
  • 几个特点:
  • spark.assets/image-20210403154242521.png
  • spark.assets/image-20210403154320025.png
  • spark.assets/image-20210403154606414.png
  • kafka+sparkstreaming消费者消费数据
  • spark.assets/image-20210403155215474.png

SparkStreaming010整合Kafka

  • 010以上kafka版本,使用Directly的方式

  • 需求:【自动提交偏移量】使用sparkstreaming提供的API读取kafka中的数据,使用该数据完成wordcount

  • 步骤:

    • 1-导入有kafka和spark整合的Jar包
1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>
  • 2-调用streamingCOntext

  • 3-KafkaUtils.creatDriectlyStream的方法直接连接Kafka集群的分区

  • spark.assets/image-20210403161707554.png
  • spark.assets/image-20210403162224275.png
  • 在大多数情况下使用此功能,它将在所有执行程序之间一致地分配分区。

  • LocationStrategies 的几种方式。

1
1. LocationStrategies.PreferBrokers()

仅仅在你 spark 的 executor 在相同的节点上,优先分配到存在 kafka broker 的机器上;

1
2. LocationStrategies.PreferConsistent();

大多数情况下使用,一致性的方式分配分区所有 executor 上。(主要是为了分布均匀)

1
2
3. LocationStrategies.PreferFixed(hostMap: collection.Map[TopicPartition, String])
4. LocationStrategies.PreferFixed(hostMap: ju.Map[TopicPartition, String])

如果你的负载不均衡,可以通过这两种方式来手动指定分配方式,其他没有在 map 中指定的,均采用 preferConsistent() 的方式分配;

  • spark.assets/image-20210403162530139.png
  • spark.assets/image-20210403162734342.png
  • spark.assets/image-20210403163144926.png
  • 4-获取record记录中的value的值

  • 5-根据value进行累加求和wordcount

  • 6-ssc.statrt

  • 7-ssc.awaitTermination

  • 8-ssc.stop(true,true)

  • 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package cn.spark.sparkstreaming.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* DESC:
* 1-导入有kafka和spark整合的Jar包
* 2-调用streamingCOntext
* 3-KafkaUtils.creatDriectlyStream的方法直接连接Kafka集群的分区
* 4-获取record记录中的value的值
* 5-根据value进行累加求和wordcount
* 6-ssc.statrt
* 7-ssc.awaitTermination
* 8-ssc.stop(true,true)
*/
object _01SparkStreamingKafkaAuto {
def updateFunc(curentValue: Seq[Int], histouryValue: Option[Int]): Option[Int] = {
val sum: Int = curentValue.sum + histouryValue.getOrElse(0)
Option(sum)
}

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_group",
//offset的偏移量自动设置为最新偏移量,有几种设置偏移量的方法
// //这里的auto.offset.reset代表的是自动重置offset为latest就表示的是最新的偏移量,如果没有偏移从最新的位置开始
"auto.offset.reset" -> "latest",
//是否自动提交,这里设置为自动提交,提交到kafka指导的__consumeroffset中,由kafka自己维护,如果设置为false可以使用ckeckpoint或者是将offset存入mysql
// //这里如果是false手动提交,默认由SparkStreaming提交到checkpoint中,在这里也可以根据用户或程序员将offset偏移量提交到mysql或redis中
"enable.auto.commit" -> (true: java.lang.Boolean),
//自动设置提交的时间
"auto.commit.interval.ms" -> "1000"
)



def main(args: Array[String]): Unit = {
//1-导入有kafka和spark整合的Jar包
//2-调用streamingCOntext
val ssc: StreamingContext = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc
}
ssc.checkpoint("data/baseoutput/cck1")
//3-KafkaUtils.creatDriectlyStream的方法直接连接Kafka集群的分区
//ssc: StreamingContext,
//locationStrategy: LocationStrategy,
//consumerStrategy: ConsumerStrategy[K, V]
val streamRDD: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array("spark_kafka"), kafkaParams))
//4-获取record记录中的value的值
val mapValue: DStream[String] = streamRDD.map(_.value())
//5-根据value进行累加求和wordcount
val resultRDD: DStream[(String, Int)] = mapValue
.flatMap(_.split("\\s+"))
.map((_, 1))
.updateStateByKey(updateFunc)
resultRDD.print()
//6-ssc.statrt
ssc.start()
//7-ssc.awaitTermination
ssc.awaitTermination()
//8-ssc.stop(true,true)
ssc.stop(true, true)
}
}

手动提交偏移量

(实现恰好一次,但还是存在断网没有及时提交数据)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package cn.spark.sparkstreaming.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* DESC:
* 1-导入有kafka和spark整合的Jar包
* 2-调用streamingCOntext
* 3-KafkaUtils.creatDriectlyStream的方法直接连接Kafka集群的分区
* 4-获取record记录中的value的值
* 5-根据value进行累加求和wordcount
* 6-ssc.statrt
* 7-ssc.awaitTermination
* 8-ssc.stop(true,true)
*/
object _02SparkStreamingKafkaByPass {
def updateFunc(curentValue: Seq[Int], histouryValue: Option[Int]): Option[Int] = {
val sum: Int = curentValue.sum + histouryValue.getOrElse(0)
Option(sum)
}

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_group",
//offset的偏移量自动设置为最新偏移量,有几种设置偏移量的方法
//这里的auto.offset.reset代表的是自动重置offset为latest就表示的是最新的偏移量,如果没有偏移从最新的位置开始
"auto.offset.reset" -> "latest",
//这里如果是false手动提交,默认由SparkStreaming提交到checkpoint中,在这里也可以根据用户或程序员将offset偏移量提交到mysql或redis中
"enable.auto.commit" -> (false: java.lang.Boolean)
)


def main(args: Array[String]): Unit = {
//1-导入有kafka和spark整合的Jar包
//2-调用streamingCOntext
val ssc: StreamingContext = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc
}
ssc.checkpoint("data/baseoutput/cck2")
//3-KafkaUtils.creatDriectlyStream的方法直接连接Kafka集群的分区
//ssc: StreamingContext,
//locationStrategy: LocationStrategy,
//consumerStrategy: ConsumerStrategy[K, V]
val streamRDD: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array("spark_kafka"), kafkaParams))
//实现获取offset然后手动提交offset
streamRDD.foreachRDD(f => {
if (f.count() > 0) {
//这里仅仅是为了打印
println("rdd is:", f)
f.foreach(record => {
println("record result is:", record)
val value: String = record.value()
println("value is:", value)
})
} //end id
//获取offset
val offsetRanges: Array[OffsetRange] = f.asInstanceOf[HasOffsetRanges].offsetRanges
//这里仅仅是为了打印
for(offsetRange <- offsetRanges){
println(s"topic:${offsetRange.topic} partition:${offsetRange.partition} fromoffset:${offsetRange.fromOffset} endoffset:${offsetRange.untilOffset}")
}//end for
//提交offset,手动的方式默认提交到checkpoint的目录中
streamRDD.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})


//4-获取record记录中的value的值
val mapValue: DStream[String] = streamRDD.map(_.value())
//5-根据value进行累加求和wordcount
val resultRDD: DStream[(String, Int)] = mapValue
.flatMap(_.split("\\s+"))
.map((_, 1))
.updateStateByKey(updateFunc)
resultRDD.print()
//6-ssc.statrt
ssc.start()
//7-ssc.awaitTermination
ssc.awaitTermination()
//8-ssc.stop(true,true)
ssc.stop(true, true)
}
}
  • spark.assets/image-20210403170114199.png
  • spark.assets/image-20210403170300844.png

代码模板抽取

spark.assets/image-20210403170918166.png
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package cn.spark.sparkstreaming.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* DESC:
* 1-导入有kafka和spark整合的Jar包
* 2-调用streamingCOntext
* 3-KafkaUtils.creatDriectlyStream的方法直接连接Kafka集群的分区
* 4-获取record记录中的value的值
* 5-根据value进行累加求和wordcount
* 6-ssc.statrt
* 7-ssc.awaitTermination
* 8-ssc.stop(true,true)
*/
object _03SparkStreamingKafkaModel {
def updateFunc(curentValue: Seq[Int], histouryValue: Option[Int]): Option[Int] = {
val sum: Int = curentValue.sum + histouryValue.getOrElse(0)
Option(sum)
}

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_group",
//offset的偏移量自动设置为最新偏移量,有几种设置偏移量的方法
//这里的auto.offset.reset代表的是自动重置offset为latest就表示的是最新的偏移量,如果没有偏移从最新的位置开始
"auto.offset.reset" -> "latest",
//这里如果是false手动提交,默认由SparkStreaming提交到checkpoint中,在这里也可以根据用户或程序员将offset偏移量提交到mysql或redis中
"enable.auto.commit" -> (false: java.lang.Boolean)
)


def main(args: Array[String]): Unit = {
//1-导入有kafka和spark整合的Jar包
//2-调用streamingCOntext
val ssc: StreamingContext = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc
}
ssc.checkpoint("data/baseoutput/cck3")
compute(ssc)
//6-ssc.statrt
ssc.start()
//7-ssc.awaitTermination
ssc.awaitTermination()
//8-ssc.stop(true,true)
ssc.stop(true, true)
}


def compute(ssc: StreamingContext): Unit = {
//3-KafkaUtils.creatDriectlyStream的方法直接连接Kafka集群的分区
//ssc: StreamingContext,
//locationStrategy: LocationStrategy,
//consumerStrategy: ConsumerStrategy[K, V]
val streamRDD: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array("spark_kafka"), kafkaParams))
//实现获取offset然后手动提交offset
streamRDD.foreachRDD(f => {
if (f.count() > 0) {
//这里仅仅是为了打印
println("rdd is:", f)
f.foreach(record => {
println("record result is:", record)
val value: String = record.value()
println("value is:", value)
})
} //end id
//获取offset
val offsetRanges: Array[OffsetRange] = f.asInstanceOf[HasOffsetRanges].offsetRanges
//这里仅仅是为了打印
for (offsetRange <- offsetRanges) {
println(s"topic:${offsetRange.topic} partition:${offsetRange.partition} fromoffset:${offsetRange.fromOffset} endoffset:${offsetRange.untilOffset}")
} //end for
//提交offset,手动的方式默认提交到checkpoint的目录中
streamRDD.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
//4-获取record记录中的value的值
val mapValue: DStream[String] = streamRDD.map(_.value())
//5-根据value进行累加求和wordcount
val resultRDD: DStream[(String, Int)] = mapValue
.flatMap(_.split("\\s+"))
.map((_, 1))
.updateStateByKey(updateFunc)
resultRDD.print()
}
}

Checkpoint 恢复

  • spark.assets/image-20210403171127519.png
  • spark.assets/image-20210403171318133.png
  • spark.assets/image-20210403172152495.png

    当Streaming Application再次运行时,从Checkpoint检查点目录恢复时,有时有问题,比如修改程序,再次从运行时,可能出现类型转换异常,如下所示:

    ERROR Utils: Exception encountered java.lang.ClassCastException: cannot assign instance of cn.spark.spark.ckpt.StreamingCkptState$$anonfun$streamingProcess$1 to field org.apache.spark.streaming.dstream.ForEachDStream.org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc of type scala.Function2 in instance of org.apache.spark.streaming.dstream.ForEachDStream at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)

    原因在于修改DStream转换操作,在检查点目录中存储的数据没有此类的相关代码,所以保存ClassCastException异常。

    此时无法从检查点读取偏移量信息和转态信息,所以实际开发人员:SparkStreaming中Checkpoint功能,属于鸡肋,食之无味,弃之可惜。解决方案:

    l 1)、针对状态信息:当应用启动时,从外部存储系统读取最新状态,比如从MySQL表读取,或者从Redis读取;

    l 2)、针对偏移量数据:自己管理偏移量,将偏移量存储到MySQL表、Zookeeper、HBase或Redis;

    spark.assets/的.png
  • 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package cn.spark.sparkstreaming.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* DESC:
* 1-导入有kafka和spark整合的Jar包
* 2-调用streamingCOntext
* 3-KafkaUtils.creatDriectlyStream的方法直接连接Kafka集群的分区
* 4-获取record记录中的value的值
* 5-根据value进行累加求和wordcount
* 6-ssc.statrt
* 7-ssc.awaitTermination
* 8-ssc.stop(true,true)
*/
object _04getActiveOrCreate {
def updateFunc(curentValue: Seq[Int], histouryValue: Option[Int]): Option[Int] = {
val sum: Int = curentValue.sum + histouryValue.getOrElse(0)
Option(sum)
}

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_group",
//offset的偏移量自动设置为最新偏移量,有几种设置偏移量的方法
//这里的auto.offset.reset代表的是自动重置offset为latest就表示的是最新的偏移量,如果没有偏移从最新的位置开始
"auto.offset.reset" -> "latest",
//这里如果是false手动提交,默认由SparkStreaming提交到checkpoint中,在这里也可以根据用户或程序员将offset偏移量提交到mysql或redis中
"enable.auto.commit" -> (false: java.lang.Boolean)
)


def main(args: Array[String]): Unit = {
//1-导入有kafka和spark整合的Jar包
val CHECKPOINT = "data/baseoutput/cck5"
//2-调用streamingCOntext
val ssc: StreamingContext = StreamingContext.getActiveOrCreate(CHECKPOINT, () => {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint(CHECKPOINT)
compute(ssc)
ssc
})
//6-ssc.statrt
ssc.start()
//7-ssc.awaitTermination
ssc.awaitTermination()
//8-ssc.stop(true,true)
ssc.stop(true, true)
}


def compute(ssc: StreamingContext): Unit = {
//3-KafkaUtils.creatDriectlyStream的方法直接连接Kafka集群的分区
//ssc: StreamingContext,
//locationStrategy: LocationStrategy,
//consumerStrategy: ConsumerStrategy[K, V]
val streamRDD: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array("spark_kafka"), kafkaParams))
//实现获取offset然后手动提交offset
streamRDD.foreachRDD(f => {
if (f.count() > 0) {
//这里仅仅是为了打印
println("rdd is:", f)
f.foreach(record => {
println("record result is:", record)
val value: String = record.value()
println("value is:", value)
})
} //end id
//获取offset
val offsetRanges: Array[OffsetRange] = f.asInstanceOf[HasOffsetRanges].offsetRanges
//这里仅仅是为了打印
for (offsetRange <- offsetRanges) {
println(s"topic:${offsetRange.topic} partition:${offsetRange.partition} fromoffset:${offsetRange.fromOffset} endoffset:${offsetRange.untilOffset}")
} //end for
//提交offset,手动的方式默认提交到checkpoint的目录中
streamRDD.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
//4-获取record记录中的value的值
val mapValue: DStream[String] = streamRDD.map(_.value())
//5-根据value进行累加求和wordcount
val resultRDD: DStream[(String, Int)] = mapValue
.flatMap(_.split("\\s+"))
.map((_, 1))
.updateStateByKey(updateFunc)
resultRDD.print()
}
}

偏移量存储在MySQL中

  • 实际工作中,将offset存储在mysql或redis中,可以监控redis或mysql中offset的变化,从而更好管理offset

  • spark.assets/image-20210403173545345.png
  • 需要使用MySQL的驱动包,DriverManager结合

1
2
3
4
5
6
7
CREATE TABLE `t_offset` (
`topic` varchar(255) NOT NULL,
`partition` int(11) NOT NULL,
`groupid` varchar(255) NOT NULL,
`offset` bigint(20) DEFAULT NULL,
PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 查询topic的信息

  • spark.assets/image-20210403174537296.png
  • 查看官网对于TopicPerition的讲解

  • spark.assets/image-20210403174143467.png
  • spark.assets/image-20210403174318353.png
  • 1-MySQL的偏移量查询或保存的工具类提供

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package cn.spark.sparkstreaming.kafka.toMySQL

import java.sql.{DriverManager, ResultSet}

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange

import scala.collection.mutable

object OffsetUtil {

//从数据库读取偏移量
def getOffsetMap(groupid: String, topic: String) = {
val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
val pstmt = connection.prepareStatement("select * from t_offset where groupid=? and topic=?")
pstmt.setString(1, groupid)
pstmt.setString(2, topic)
val rs: ResultSet = pstmt.executeQuery()
val offsetMap = mutable.Map[TopicPartition, Long]()
while (rs.next()) {
offsetMap += new TopicPartition(rs.getString("topic"), rs.getInt("partition")) -> rs.getLong("offset")
}
rs.close()
pstmt.close()
connection.close()
offsetMap
}

//将偏移量保存到数据库
def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {
val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
//replace into表示之前有就替换,没有就插入
val pstmt = connection.prepareStatement("replace into t_offset (`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)")
for (o <- offsetRange) {
pstmt.setString(1, o.topic)
pstmt.setInt(2, o.partition)
pstmt.setString(3, groupid)
pstmt.setLong(4, o.untilOffset)
pstmt.executeUpdate()
}
pstmt.close()
connection.close()
}
}
}
  • 2-使用MySQL偏移量的工具类给出偏移量

  • 分析一下,什么时候需要从mysql中读取偏移量

    • 首先,MysQL中是有offset的记录,才可以读取,判断Map的数是否有空
    • 接着,如果Mysql没有记录,拉取最新的偏移量直接消费即可
    • 接着,如果MySQL存放了记录,直接读取,直接在subscribe增加第三个参数
    • spark.assets/image-20210403175320379.png
  • 分析一下,什么时候应该将偏移量写入Mysql

    • 每次有新的数据消费需要更新MySQL的Offset(utilOffset)
    • spark.assets/image-20210403175612958.png
  • 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package cn.spark.sparkstreaming.kafka.toMySQL

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
* DESC:
* 1-导入有kafka和spark整合的Jar包
* 2-调用streamingCOntext
* 3-KafkaUtils.creatDriectlyStream的方法直接连接Kafka集群的分区
* 4-获取record记录中的value的值
* 5-根据value进行累加求和wordcount
* 6-ssc.statrt
* 7-ssc.awaitTermination
* 8-ssc.stop(true,true)
*/
object _01getActiveOrCreate {
def updateFunc(curentValue: Seq[Int], histouryValue: Option[Int]): Option[Int] = {
val sum: Int = curentValue.sum + histouryValue.getOrElse(0)
Option(sum)
}

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_group",
//offset的偏移量自动设置为最新偏移量,有几种设置偏移量的方法
//这里的auto.offset.reset代表的是自动重置offset为latest就表示的是最新的偏移量,如果没有偏移从最新的位置开始
"auto.offset.reset" -> "latest",
//这里如果是false手动提交,默认由SparkStreaming提交到checkpoint中,在这里也可以根据用户或程序员将offset偏移量提交到mysql或redis中
"enable.auto.commit" -> (false: java.lang.Boolean)
)


def main(args: Array[String]): Unit = {
//1-导入有kafka和spark整合的Jar包
val CHECKPOINT = "data/baseoutput/cck6"
//2-调用streamingCOntext
val ssc: StreamingContext = StreamingContext.getActiveOrCreate(CHECKPOINT, () => {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint(CHECKPOINT)
compute(ssc)
ssc
})
//6-ssc.statrt
ssc.start()
//7-ssc.awaitTermination
ssc.awaitTermination()
//8-ssc.stop(true,true)
ssc.stop(true, true)
}


def compute(ssc: StreamingContext): Unit = {
//1-首先获取偏移量
val offsetMap: mutable.Map[TopicPartition, Long] = OffsetUtil.getOffsetMap("spark_group", "spark_kafka")
var streamRDD: InputDStream[ConsumerRecord[String, String]] = null
if (offsetMap.size > 0) {
println("如果MySQL中有记录offset,则应该从该offset处开始消费")
//3-KafkaUtils.creatDriectlyStream的方法直接连接Kafka集群的分区
streamRDD = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array("spark_kafka"), kafkaParams, offsetMap))
} else {
println("如果MySQL中没有记录offset,则直接连接,从latest开始消费")
//3-KafkaUtils.creatDriectlyStream的方法直接连接Kafka集群的分区
streamRDD = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array("spark_kafka"), kafkaParams))
}
//实现获取offset然后手动提交offset
streamRDD.foreachRDD(f => {
if (f.count() > 0) {
//这里仅仅是为了打印
//println("rdd is:", f)
f.foreach(record => {
println("record result is:", record)
val value: String = record.value()
//println("value is:", value)
})
} //end id
//获取offset
val offsetRanges: Array[OffsetRange] = f.asInstanceOf[HasOffsetRanges].offsetRanges
//这里仅仅是为了打印
for (offsetRange <- offsetRanges) {
println(s"topic:${offsetRange.topic} partition:${offsetRange.partition} fromoffset:${offsetRange.fromOffset} endoffset:${offsetRange.untilOffset}")
} //end for
//提交offset,手动的方式默认提交到checkpoint的目录中
//streamRDD.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
//手动保存在MySQL中
OffsetUtil.saveOffsetRanges("spark_group",offsetRanges)
})
//4-获取record记录中的value的值
val mapValue: DStream[String] = streamRDD.map(_.value())
//5-根据value进行累加求和wordcount
val resultRDD: DStream[(String, Int)] = mapValue
.flatMap(_.split("\\s+"))
.map((_, 1))
.updateStateByKey(updateFunc)
resultRDD.print()
}
}
  • spark.assets/image-20210403180041343.png
  • spark.assets/image-20210403180025016.png

偏移量也可以保存至Zookeeper上或者Redis中,原因如下:

l 1)、保存Zookeeper上:方便使用Kafka 监控工具管理Kafka 各个Topic被消费信息;

l 2)、保存Redis上:从Redis读取数据和保存数据很快,基于内存数据库;

代码可以进一步优化,提高性能:由于每批次数据结果RDD输出以后,都需要向MySQL数据库表更新偏移量数据,频繁连接数据库,建议构建数据库连接池,每次从池子中获取连接。

StructuredStreamig引入

StructuredStreamig定义

  • StructuredStreaming结构化流–处理实时数据(流式数据)
  • 流式处理的方式:(1)原生的处理方式:来一条数据处理一条 结构化流和Flink(2)微批次处理方式:SParkStreaming
  • 底层数据结构:底层数据结构类似SparkSQL,使用DataFrame和DataSet
  • spark.assets/image-20210405103325480.png
  • spark.assets/image-20210405103642322.png

StructuredStreaming与sparkstreaming区别

SparkStreaming问题四点

  • 1-SparkStreaing的时间是基于ProcessingTime处理时间,而不是EventTime事件时间
  • 2-SparkStreaming编程API偏底层,本质上就是要去构造RDD的DAG执行图
  • 3-SparkStreaming无法实现端到端的一致性,DStream 只能保证自己的一致性语义是 exactly-once 的,而 input 接入 Spark Streaming 和 Spark Straming 输出到外部存储的语义往往需要用户自己来保证;
  • 4-SparkStreaming无法实现流批统一,有时候确实需要将的流处理逻辑运行到批数据上面,转化成RDD需要一定的工作量

StructuredStreaming应用场景

  • 结构化流解决SparkStreaing的问题
  • 1-结构化流是基于EventTime事件时间处理
  • 2-使用DataFrame和DataSet的API
  • 3-结构化流实现从source到sink的端到端的一致性exactly-once
  • 4-结构化流实现批处理和流式处理的统一
  • 结构化流使用两种API
    • ProcesstimgTime–微批次处理场景
    • CotinuceTime-实时的场景
  • spark.assets/image-20210405105542562.png

结构化流编程模型

  • spark.assets/image-20210405105945362.png
  • spark.assets/image-20210405110054378.png

入门案例

StructuredStreaming第一个案例-Socket

  • 需求:实现wordcount案例

  • spark.assets/image-20210405110902911.png
  • 分析:

  • spark.assets/image-20210405110615601.png
  • l 第一行表示从socket不断接收数据,

    l 第二行是时间轴,表示每隔1秒进行一次数据处理,

    l 第三行可以看成是之前提到的“unbound table”,

    l 第四行为最终的wordCounts是结果集。

  • 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package cn.ispark.structedstreaming

import java.util.concurrent.TimeUnit

import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
* DESC:
* 1-首先准备sparksession,底层是sparksql的引擎
* 2-读取socket的流式数据
* 3-wordcount统计
* 4-输出到控制台
*/
object _01socketSource {
def main(args: Array[String]): Unit = {
//1-首先准备sparksession,底层是sparksql的引擎
val conf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
//.set("spark.sql.shuffle.partitions","4")
.setMaster("local[*]")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
.config("spark.sql.shuffle.partitions","4")
.getOrCreate()
//spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
//2-读取socket的流式数据
val streamData: DataFrame = spark.readStream
.format("socket")
.option("host", "node1")
.option("port", 9999)
.load()
streamData.printSchema() // |-- value: string (nullable = true)
//3-wordcount统计
val result: Dataset[Row] = streamData
.as[String]
.filter(StringUtils.isNoneBlank(_))
.flatMap(x => x.split("\\s+"))
.groupBy("value")
.count()
.orderBy('count.desc)
//value.printSchema()
//4-输出到控制台
val query: StreamingQuery = result.writeStream
.format("console")
//.outputMode("complete")
.outputMode(OutputMode.Complete())
.option("numRows", 5)
.option("truncate", "false")
//.trigger(Trigger.ProcessingTime(10))
.trigger(Trigger.ProcessingTime(0))
.start() //Starts the execution of the streaming query,

query.awaitTermination()
query.stop()
}
}
  • 总结:

  • 一定是spark.sql.shuffle.partition分区个数默认为200需要设置小一些

StructuredStreaming文本数据源

  • 需求:通过文件夹不断放入文件然后统计年龄小于25岁的人群的爱好排行榜。

  • 分析:

  • 1-指定SparkSession

  • 2-读取文本数据源

  • 3-执行统计

  • 4-输出到控制台

  • 5-query.awaitTermination

  • 6-query.stop

  • 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package cn.ispark.structedstreaming.filesource

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}

/**
* DESC:
*/
object _01FileSource {
def main(args: Array[String]): Unit = {
//1-指定SparkSession
//1-首先准备sparksession,底层是sparksql的引擎
val conf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
//2-读取文本数据源---将不同名称的文件放入到文件夹中统计
val schema: StructType = StructType(
StructField("name", DataTypes.StringType, true) ::
StructField("age", DataTypes.IntegerType, true) ::
StructField("hobby", DataTypes.StringType, true) :: Nil
)
//思考:批处理如何读取数据设定查询条件?
val streamDF: DataFrame = spark.readStream
.format("csv")
.option("sep", ";")
.option("header", "false")
.schema(schema)
.load("data/baseinput/struct/")
streamDF.printSchema()
import spark.implicits._
//3-执行统计-统计年龄小于25岁的人群的爱好排行榜。
val result: DataFrame = streamDF
.filter('age < 25)
.groupBy('hobby)
.count()
//4-输出到控制台
val query: StreamingQuery = result.writeStream
.format("console")
.outputMode(OutputMode.Complete())
.option("numRows", 10)
.option("truncate", "false")
.trigger(Trigger.ProcessingTime(0))
.start()
//5-query.awaitTermination
query.awaitTermination()
//6-query.stop
query.stop()
}
}
  • 总结:

  • 读取文本数据源的参考

StructuredStreaming的Kakfa整合

  • 需求:通过StructuredStreaming结合Kakfa消费Kafka中的数据实现单词统计计数

  • 步骤:

    • 1-准备上下文环境
    • 2-读取Kafka的数据
    • 3-将Kafka的数据转化,实现单词统计技术
    • 4-将得到结果写入控制台
    • 5.query.awaitTermination
    • 6-query.stop
  • 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package cn.ispark.structedstreaming.kafka

import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
* DESC:
* * 1-准备上下文环境
* * 2-读取Kafka的数据
* * 3-将Kafka的数据转化,实现单词统计技术
* * 4-将得到结果写入控制台
* * 5.query.awaitTermination
* * 6-query.stop
*/
object _01KafkaSourceWordcount {
def main(args: Array[String]): Unit = {
//1-准备上下文环境
val conf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
//spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
//2-读取Kafka的数据
val streamDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("subscribe", "wordstopic")
.load()
//streamDF.printSchema()
//root
// |-- key: binary (nullable = true)
// |-- value: binary (nullable = true)
// |-- topic: string (nullable = true)
// |-- partition: integer (nullable = true)
// |-- offset: long (nullable = true)
// |-- timestamp: timestamp (nullable = true)
// |-- timestampType: integer (nullable = true)
//3-将Kafka的数据转化,实现单词统计技术
val result: Dataset[Row] = streamDF
.selectExpr("cast (value as string)") //因为kafka得到的数据是binary类型的数据需要使用cast转换
.as[String]
.flatMap(x => x.split("\\s+")) // |-- value: string (nullable = true)
.groupBy($"value")
.count()
.orderBy('count.desc)
//.groupBy("value")
//4-将得到结果写入控制台
val query: StreamingQuery = result
.writeStream
.format("console")
.outputMode(OutputMode.Complete())
.trigger(Trigger.ProcessingTime(0))
.option("numRows", 10)
.option("truncate", false)
.start()
//5.query.awaitTermination
query.awaitTermination()
//6-query.stop
query.stop()
}
}
  • 总结:

  • Kafka整合代码需要掌握

  • spark.assets/image-20210405155454255.png
  • spark.assets/image-20210405155649481.png
  • spark.assets/image-20210405155750912.png
  • 数据结果写入kafka

  • spark.assets/image-20210405160238993.png
1
2
3
4
5
6
7
8
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()

运营商基站数据ETL

  • 案例

  • spark.assets/image-20210405160605762.png
  • 准备Kafka的topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 实时ETL案例1
#查看topic信息
/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181
#删除topic
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic stationTopic
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic etlTopic

#创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic stationTopic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic etlTopic

#模拟生产者
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic stationTopic
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic etlTopic

#模拟消费者----------stationTopic
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic stationTopic --from-beginning
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic etlTopic --from-beginning
  • Mock模拟一个Kafka的Producer生产Station数据

  • spark.assets/image-20210405161003547.png
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package cn.ispark.structedstreaming.kafka

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import scala.util.Random

/**
* 模拟产生基站日志数据,实时发送Kafka Topic中,数据字段信息:
* 基站标识符ID, 主叫号码, 被叫号码, 通话状态, 通话时间,通话时长
*/
object MockStationLog {
def main(args: Array[String]): Unit = {
// 发送Kafka Topic
val props = new Properties()
props.put("bootstrap.servers", "node1:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](props)

val random = new Random()
val allStatus = Array(
"fail", "busy", "barring", "success", "success", "success",
"success", "success", "success", "success", "success", "success"
)

while (true) {
val callOut: String = "1860000%04d".format(random.nextInt(10000))
val callIn: String = "1890000%04d".format(random.nextInt(10000))
val callStatus: String = allStatus(random.nextInt(allStatus.length))
val callDuration = if ("success".equals(callStatus)) (1 + random.nextInt(10)) * 1000L else 0L

// 随机产生一条基站日志数据
val stationLog: StationLog = StationLog(
"station_" + random.nextInt(10),
callOut,
callIn,
callStatus,
System.currentTimeMillis(),
callDuration
)
println(stationLog.toString)
Thread.sleep(100 + random.nextInt(100))

val record = new ProducerRecord[String, String]("stationTopic", stationLog.toString)
producer.send(record)
}

producer.close() // 关闭连接
}

/**
* 基站通话日志数据
*/
case class StationLog(
stationId: String, //基站标识符ID
callOut: String, //主叫号码
callIn: String, //被叫号码
callStatus: String, //通话状态
callTime: Long, //通话时间
duration: Long //通话时长
) {
override def toString: String = {
s"$stationId,$callOut,$callIn,$callStatus,$callTime,$duration"
}
}

}
  • StationTopic开启消费者消费Producer产生的数据

  • 接着通过结构化流获取staiontopic的kafka数据,进行过滤

步骤

  • 1-首先将Kafka两个Topic构建好
  • 2-启动模拟程序模拟生产者生产station数据到staiontopic中
  • 3-启动StructuredStreaming消费staiontopic的数据
  • 4-进行etl的操作———过滤下标三个字段(callStatus=‘success’)
  • 5-将过滤出来的数据写入到etltopic中
  • 6-查看etltopic的消费者查看结果是否正确

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package cn.ispark.structedstreaming.kafka

import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
* DESC:
*/
object _02StationDataProcess {
def main(args: Array[String]): Unit = {
//1-首先将Kafka两个Topic构建好
//1-准备上下文环境
val conf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
import spark.implicits._
//2-启动模拟程序模拟生产者生产station数据到staiontopic中
//3-启动StructuredStreaming消费staiontopic的数据
val streamDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("subscribe", "stationTopic")
.load()
//4-进行etl的操作---------过滤下标三个字段(callStatus=‘success’)
val result: Dataset[String] = streamDF
.selectExpr("cast (value as string)")
.as[String]
.filter(line => StringUtils.isNotBlank(line) && "success".equals(line.trim.split(",")(3)))
//5-将过滤出来的数据写入到etltopic中
val query: StreamingQuery = result.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("topic", "etlTopic")
.option("checkpointLocation","data/baseoutput/checkpoint-2")
.start()
//6-查看etltopic的消费者查看结果是否正确
query.awaitTermination()
query.stop()
}
}

结果

  • spark.assets/image-20210405162557574.png

物联网案例

  • IOT

  • 基本配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# 物联网设备案例
#查看topic信息
/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181
#删除topic
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic iotTopic

#创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic iotTopic

#模拟生产者
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic iotTopic
#模拟消费者
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic iotTopic --from-beginning
  • 对物联网设备状态信号数据,实时统计分析:

    1)、信号强度大于30的设备;

    2)、各种设备类型的数量;

    3)、各种设备类型的平均信号强度;

  • 梳理流程:

  • 1-首先准备生产者生产IOT的设备的数据,IOT可以开启topic的消费者查看是否成功消费

  • 2-使用StructuredStreaming读取Kafka的数据

  • 3-处理IOTTopic的数据–DSL&SQL

  • 4-将数据结果写入控制台

  • 5-query.awaitTermination

  • 6-query.stop

  • 代码:

  • SQL的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package cn.ispark.structedstreaming.iot

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.types.DoubleType

/**
* DESC:
* 1-首先准备生产者生产IOT的设备的数据,IOT可以开启topic的消费者查看是否成功消费
* 2-使用StructuredStreaming读取Kafka的数据
* 3-处理IOTTopic的数据--DSL&SQL
* 4-将数据结果写入控制台
* 5-query.awaitTermination
* 6-query.stop
*/
object _01IOTStreamProcess {
def main(args: Array[String]): Unit = {
//1-首先准备生产者生产IOT的设备的数据,IOT可以开启topic的消费者查看是否成功消费
//2-使用StructuredStreaming读取Kafka的数据
val conf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
import spark.implicits._
val streamDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("subscribe", "iotTopic")
.load()
//3-处理IOTTopic的数据--DSL&SQL
val parseJsonData: DataFrame = streamDF
.selectExpr("cast (value as string)")
.as[String]
//{"device":"device_10","deviceType":"db","signal":74.0,"time":1617612014757}
.select(
get_json_object($"value", "$.device").as("device"),
get_json_object($"value", "$.deviceType").as("deviceType"),
get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),
get_json_object($"value", "$.time").as("time")
)
//SQL:signal > 30 所有数据,按照设备类型 分组,统计数量、平均信号强度
parseJsonData.createOrReplaceTempView("table_view")
val sql =
"""
|select round(avg(signal),2) as avg_signal,count(deviceType) as device_counts
|from table_view
|where signal >30
|group by deviceType
|""".stripMargin
val result: DataFrame = spark.sql(sql)
//4-将数据结果写入控制台
val query: StreamingQuery = result.writeStream
.format("console")
.outputMode(OutputMode.Complete())
.option("numRows", 10)
.option("truncate", false)
.trigger(Trigger.ProcessingTime(0))
.start()
//5-query.awaitTermination
query.awaitTermination()
//6-query.stop
query.stop()
}
}
  • 方法2:使用DSL方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package cn.ispark.structedstreaming.iot

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* DESC:
* 1-首先准备生产者生产IOT的设备的数据,IOT可以开启topic的消费者查看是否成功消费
* 2-使用StructuredStreaming读取Kafka的数据
* 3-处理IOTTopic的数据--DSL&SQL
* 4-将数据结果写入控制台
* 5-query.awaitTermination
* 6-query.stop
*/
object _02IOTStreamProcessDSL {
def main(args: Array[String]): Unit = {
//1-首先准备生产者生产IOT的设备的数据,IOT可以开启topic的消费者查看是否成功消费
//2-使用StructuredStreaming读取Kafka的数据
val conf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
import spark.implicits._
val streamDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("subscribe", "iotTopic")
.load()
//3-处理IOTTopic的数据--DSL&SQL
val parseJsonData: DataFrame = streamDF
.selectExpr("cast (value as string)")
.as[String]
//{"device":"device_10","deviceType":"db","signal":74.0,"time":1617612014757}
.select(
get_json_object($"value", "$.device").as("device"),
get_json_object($"value", "$.deviceType").as("deviceType"),
get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),
get_json_object($"value", "$.time").as("time")
)
//SQL:signal > 30 所有数据,按照设备类型 分组,统计数量、平均信号强度
parseJsonData.createOrReplaceTempView("table_view")
/* val sql =
"""
|select round(avg(signal),2) as avg_signal,count(deviceType) as device_counts
|from table_view
|where signal >30
|group by deviceType
|""".stripMargin
val result: DataFrame = spark.sql(sql)*/
val result: DataFrame = parseJsonData
//.select("signal", "deviceType")
.filter('signal > 30)
.groupBy("deviceType")
.agg(
round(avg("signal"), 2).as("avg_signal"),
count("deviceType").as("device_counts")
)

//4-将数据结果写入控制台
val query: StreamingQuery = result.writeStream
.format("console")
.outputMode(OutputMode.Complete())
.option("numRows", 10)
.option("truncate", false)
.trigger(Trigger.ProcessingTime(0))
.start()
//5-query.awaitTermination
query.awaitTermination()
//6-query.stop
query.stop()
}
}
  • 总结:

  • 解析Json的方式可以采用SparkSQL的getJsonObject的方式

  • 更复杂的Json格式解析

对于重复数据去重操作

  • 对于指定的字段或多个字段进行去重操作
  • spark.assets/image-20210405173831039.png
  • spark.assets/image-20210405174316849.png
StructureedStreaming的Foreach及ForeachBatch
  • 结构化流写入MySQL中
  • spark.assets/image-20210405145120162.png

Foreach

  • 了解foreach

  • 1-驱动包导入

1
2
3
4
5
6
7
8
9
url="jdbc:mysql://localhost:3306/database_name"
驱动:com.mysql.jdbc.Driver
驱动包Maven:
<!-- MySQL Client 5.1.38依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
  • 2-在MySQL中创建表
1
2
3
4
5
6
7
8
CREATE TABLE `t_word` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`word` varchar(255) NOT NULL,
`count` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `word` (`word`)
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;

  • 2-JDBC的驱动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//注意这里不要加format(“console”)
class JDBCSink(url: String, username: String, password: String)
extends ForeachWriter[Row] with Serializable {
var connection: Connection = _ //_表示占位符,后面会给变量赋值
var preparedStatement: PreparedStatement = _

//开启连接
override def open(partitionId: Long, version: Long): Boolean = {
connection = DriverManager.getConnection(url, username, password)
true
}

/*
CREATE TABLE `t_word` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`word` varchar(255) NOT NULL,
`count` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `word` (`word`)
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;
*/
//replace INTO `bigdata`.`t_word` (`id`, `word`, `count`) VALUES (NULL, NULL, NULL);
//处理数据--存到MySQL
override def process(row: Row): Unit = {
val word: String = row.get(0).toString
val count: String = row.get(1).toString //这可能需要转换为int,row.getLong()
println(word + ":" + count)
//REPLACE INTO:表示如果表中没有数据则插入,如果有数据则替换
//注意:REPLACE INTO要求表有主键或唯一索引
val sql = "REPLACE INTO `t_word` (`id`, `word`, `count`) VALUES (NULL, ?, ?);"
preparedStatement = connection.prepareStatement(sql)
preparedStatement.setString(1, word)
preparedStatement.setInt(2, Integer.parseInt(count))
preparedStatement.executeUpdate()
}

//关闭资源
override def close(errorOrNull: Throwable): Unit = {
if (connection != null) {
connection.close()
}
if (preparedStatement != null) {
preparedStatement.close()
}
}
}
  • 演示如何使用这个工具
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package cn.ispark.structedstreaming.toMySQL

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}

/**
* DESC:
* 1-首先申请资源
* 2-通过Spark读取Socket的数据,实现wordcount
* 3-统计wordcountt
* 4-将wordcount写入MySQL
* 5-query.awaitTermination
* 6-query.stop
*/
object _01ForeachWayToMySQL {
def main(args: Array[String]): Unit = {
// 1-首先申请资源
//1-首先准备sparksession,底层是sparksql的引擎
val conf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
//spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
// 2-通过Spark读取Socket的数据,实现wordcount
val lines = spark.readStream
.format("socket")
.option("host", "node1")
.option("port", 9999)
.load()


// 3-统计wordcount
// Split the lines into words
val words = lines
.as[String]
.flatMap(_.split("\\s+"))
// Generate running word count
val wordCounts = words
.groupBy("value")
.count()
val jDBCSink = new JDBCSink("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
// 4-将wordcount写入MySQL
val query: StreamingQuery = wordCounts
.writeStream
.foreach(jDBCSink)
.outputMode(OutputMode.Complete())
.trigger(Trigger.ProcessingTime(0))
.start()
// 5-query.awaitTermination
query.awaitTermination()
// 6-query.stop
query.stop()
}

//注意这里不要加format(“console”)
class JDBCSink(url: String, username: String, password: String)
extends ForeachWriter[Row] with Serializable {
var connection: Connection = _ //_表示占位符,后面会给变量赋值
var preparedStatement: PreparedStatement = _

//开启连接
override def open(partitionId: Long, version: Long): Boolean = {
connection = DriverManager.getConnection(url, username, password)
true
}

/*
CREATE TABLE `t_word` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`word` varchar(255) NOT NULL,
`count` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `word` (`word`)
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;
*/
//replace INTO `bigdata`.`t_word` (`id`, `word`, `count`) VALUES (NULL, NULL, NULL);
//处理数据--存到MySQL
override def process(row: Row): Unit = {
val word: String = row.get(0).toString
val count: String = row.get(1).toString //这可能需要转换为int,row.getLong()
println(word + ":" + count)
//REPLACE INTO:表示如果表中没有数据则插入,如果有数据则替换
//注意:REPLACE INTO要求表有主键或唯一索引
val sql = "REPLACE INTO `t_word` (`id`, `word`, `count`) VALUES (NULL, ?, ?);"
preparedStatement = connection.prepareStatement(sql)
preparedStatement.setString(1, word)
preparedStatement.setInt(2, Integer.parseInt(count))
preparedStatement.executeUpdate()
}

//关闭资源
override def close(errorOrNull: Throwable): Unit = {
if (connection != null) {
connection.close()
}
if (preparedStatement != null) {
preparedStatement.close()
}
}
}

  • 上述的方法就是集成ForeachWriter的类实现其中open,process和close方法,通过foreach传入对应参数即可

ForeachBatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
  package cn.ispark.structedstreaming.toMySQL

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SaveMode, SparkSession}

/**
* DESC:
* 1-首先申请资源
* 2-通过Spark读取Socket的数据,实现wordcount
* 3-统计wordcountt
* 4-将wordcount写入MySQL
* 5-query.awaitTermination
* 6-query.stop
*/
object _02ForeeachBatchWayToMySQL {
def main(args: Array[String]): Unit = {
// 1-首先申请资源
//1-首先准备sparksession,底层是sparksql的引擎
val conf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
//spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
// 2-通过Spark读取Socket的数据,实现wordcount
val lines = spark.readStream
.format("socket")
.option("host", "node1")
.option("port", 9999)
.load()


// 3-统计wordcount
// Split the lines into words
val words = lines
.as[String]
.flatMap(_.split("\\s+"))
// Generate running word count
val wordCounts = words
.groupBy("value")
.count()
// 4-将结果写入到MySQL中
val query: StreamingQuery = wordCounts
.writeStream
.outputMode(OutputMode.Complete())
.foreachBatch((data: DataFrame, batchID: Long) => {
println("BatchId is:", batchID)
data
.coalesce(1)
.write
.mode(SaveMode.Overwrite)
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8")
.option("user", "root")
.option("password", "root")
.option("dbtable", "bigdata.tb_word_count2")
.save()
}).start()

// 5-query.awaitTermination
query.awaitTermination()
// 6-query.stop
query.stop()
}
}
  • 需要注意的是foreach需要传入的是数据集本身,batchId

  • 默认情况下,foreachBatch仅提供至少一次写保证。 但是,可以使用提供给该函数的batchId作为重复数据删除输出并获得一次性保证的方法。

    foreachBatch不适用于连续处理模式,因为它从根本上依赖于流式查询的微批量执行。 如果以连续模式写入数据,请改用foreach。

背压处理

在结构化流中自动实现反压机制,控制kafka数据接入

spark.streaming积压

spark.assets/image-20210405184406571.png spark.assets/image-20210405184455933.png

事件时间–水印机制

  • spark.assets/image-20210405174638291.png
  • SparkStreaming框架仅仅支持处理时间ProcessTime,

    StructuredStreaming支持事件时间和处理时间,

    Flink框架支持三种时间数据操作,

  • 基于事件时间处理

  • spark.assets/image-20210405175656065.png
  • spark.assets/image-20210405180128446.png
  • spark.assets/image-20210405180409294.png
  • 让Spark SQL引擎****自动追踪数据中当前事件时间EventTime,依据规则清除旧的状态数据****。

  • 图中的processing time是触发计算(多个未销毁的窗口)和水位线更新的时间,但不一定窗口会销毁,窗口销毁的时间是wartermark>=窗口的结束时间。

  • 加了水印之后就可以查到之前的数据,至于查多久,关注即将销毁的窗口数据(水位线在开始时间较小的那个窗口内,从这个窗口之后的聚合数据都还保存,只不过输出模式只输出有更新的key),销毁之后之前的窗口就不会再更新,意味着那个窗口的累计数据已经确定了。

  • spark.assets/image-20210405180756809.png

    获取上一个窗口水印–>计算该事件时间分布在哪个窗口(滑动窗口会有两个),判断该事件窗口是否还没被销毁,没被销毁就聚合更新–>有更新的窗口数据,等待按照update模式触发时间拉取输出

  • spark.assets/image-20210405181735338-1677680980955.png
  • spark.assets/image-20210405182046499.png
  • spark.assets/image-20210405182310907.png
  • spark.assets/image-20210405182404774.png spark.assets/image-20230301223705309.png spark.assets/image-20230301224233613.png spark.assets/image-20230301224330200.png spark.assets/image-20230301224841089.png
  • spark.assets/image-20230301225122318.png
  • 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package cn.ispark.structedstreaming.kafka

import java.sql.Timestamp

import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* 基于Structured Streaming 读取TCP Socket读取数据,事件时间窗口统计词频,将结果打印到控制台
* 每5秒钟统计最近10秒内的数据(词频:WordCount),设置水位Watermark时间为10秒
* 2019-10-10 12:00:07,dog
* 2019-10-10 12:00:08,owl
*
* 2019-10-10 12:00:14,dog
* 2019-10-10 12:00:09,cat
*
* 2019-10-10 12:00:15,cat
* 2019-10-10 12:00:08,dog
* 2019-10-10 12:00:13,owl
* 2019-10-10 12:00:21,owl
*
* 2019-10-10 12:00:04,donkey --丢失
* 2019-10-10 12:00:17,owl --不丢失
*/
object StructuredWindow {
def main(args: Array[String]): Unit = {
// 1. 构建SparkSession实例对象,传递sparkConf参数
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.config("spark.sql.shuffle.partitions", "3")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import org.apache.spark.sql.functions._
import spark.implicits._

// 2. 使用SparkSession从TCP Socket读取流式数据
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1")
.option("port", 9999)
.load()

// 3. 针对获取流式DStream进行词频统计
val resultStreamDF = inputStreamDF
.as[String]
.filter(StringUtils.isNotBlank(_))
// 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog
.flatMap(line => {
val arr = line.trim.split(",")
val timestampStr: String = arr(0)
val wordsStr: String = arr(1)
wordsStr
.split("\\s+")
//(时间戳,单词)
.map((Timestamp.valueOf(timestampStr), _))
})
// 设置列的名称
.toDF("timestamp", "word")
// TODO:设置水位Watermark
.withWatermark("timestamp", "10 seconds")
//依据事件时间生成窗口
// TODO:设置基于事件时间(event time)窗口 -> time, 每5秒统计最近10秒内数据
.groupBy(
//structed streaming window 滑动步长不填默认窗口步长使用窗口长度,spark streaming 不填默认是使用 ssc 读取源文件每个batch 的步长,两者不一样。
window($"timestamp", "10 seconds", "5 seconds"),
$"word"
).count()
// 按照窗口字段降序排序
//.orderBy($"window")

/*
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- word: string (nullable = true)
|-- count: long (nullable = false)
*/
//resultStreamDF.printSchema()

// 4. 将计算的结果输出,打印到控制台
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Update())
.format("console")
.option("numRows", "100")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
query.awaitTermination()
query.stop()
}
}
spark.assets/image-20210405183458836.png

StructuredStreaming的Continue的连续处理机制

  • 连续处理(Continuous Processing)是Spark 2.3中引入的一种新的实验性流执行模式,可实现毫秒级端到端延迟(类似Storm、Flink),但只能保证At-Least-Once 最少一次:即不会丢失数据,但可能会有重复结果。

    默认的微批处理(micro-batch processing)引擎,可以实现Exactly-Once 精确一次保证,但最多可实现100ms的延迟。

  • 结构化流提供了处于试验阶段的Continue的连续处理机制

  • 需要在writeStream.trigger(Trigger.ContinueTime())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
* 从Spark 2.3版本开始,StructuredStreaming结构化流中添加新流式数据处理方式:Continuous processing
* 持续流数据处理:当数据一产生就立即处理,类似Storm、Flink框架,延迟性达到100ms以下,目前属于实验开发阶段
*/
object StructuredContinuous {
def main(args: Array[String]): Unit = {
// 1. 构建SparkSession会话实例对象,设置属性信息
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.config("spark.sql.shuffle.partitions", "3")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import org.apache.spark.sql.functions._
import spark.implicits._

// 1. 从KAFKA读取数据
val kafkaStreamDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("subscribe", "stationTopic")
.load()

// 2. 对基站日志数据进行ETL操作
// station_0,18600004405,18900009049,success,1589711564033,9000
val etlStreamDF: Dataset[String] = kafkaStreamDF
// 获取value字段的值,转换为String类型
.selectExpr("CAST(value AS STRING)")
// 转换为Dataset类型
.as[String]
// 过滤数据:通话状态为success
.filter(log => StringUtils.isNotBlank(log) && "success".equals(log.trim.split(",")(3)))

// 3. 针对流式应用来说,输出的是流
val query: StreamingQuery = etlStreamDF.writeStream
.outputMode(OutputMode.Append())
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("topic", "etlTopic")
.option("checkpointLocation", "./ckp" + System.currentTimeMillis())
// TODO: 设置持续流处理 Continuous Processing, 指定CKPT时间间隔
//the continuous processing engine will records the progress of the query every second
//持续流处理引擎,将每1秒中记录当前查询Query进度状态
.trigger(Trigger.Continuous("1 second"))
.start()
query.awaitTermination()
query.stop()
}
}
  • 注意其中有很多限制,使用的时候需要注意