大数据 spark KNOWU 2024-11-13 2024-11-13 Spark入门 四代计算引擎
第一代引擎:MR
第二代引擎:Hive(MR,Spark,Tez) 部分支持DAG(有向无环图)
第三代引擎:Spark和Impala(完全支持DAG)
第四代计算引擎:批流统一FLink(完全支持DAG)
技术发展:
面试题:Hadoop的基于进程的计算和Spark基于线程方式优缺点?
Spark的部署
local模式
1 2 3 4 5 6 # Spark-local模式提交任务 bin/spark-submit \ --master local[3] \ --class org.apache.spark.examples.SparkPi \ /export/server/spark/examples/jars/spark-examples_2.11-2.4.5.jar \ 10
Standalone
2-Standalone模式学会查看
如何安装?
根据架构安装
配置文件中需要更改:
1-指定谁是Master,谁是Worker
2-指定Master的通信地址,7077,指定Master的WebUi地址,8080
3-可选项指定WOrker通信地址和Worker的WebUi地址
4-配置Spark的历史日志服务器
为什么配置?因为如果执行Spark-Shell启动4040在关闭当前应用窗口之后无法查看UI
如何配置?需要将Spark的历史日志服务器的日志写入到HDFS分布式文件系统
配置:
查看WebUi
如何spark-shell?
bin/spark-shell –master spark://node1:7077
如何提交Spark任务?
1 2 3 4 5 bin/spark-submit \ --master spark://node1:7077 \ --class org.apache.spark.examples.SparkPi \ /export/server/spark/examples/jars/spark-examples_2.11-2.4.5.jar \ 10
StandaloneHA
1 2 3 4 5 bin/spark-submit \ --master spark://node1:7077,node2:7077 \ --class org.apache.spark.examples.SparkPi \ /export/server/spark/examples/jars/spark-examples_2.11-2.4.5.jar \ 10
SparkOnYarn
1 2 3 4 5 bin/spark-submit \ --master yarn \ --class org.apache.spark.examples.SparkPi \ /export/server/spark/examples/jars/spark-examples_2.11-2.4.5.jar \ 10
重难点知识
Spark部署模式
SparkOnYarn的两种deploymode模式原理
掌握基于Scala的WOrdcount的IDEA编码
学会提交Jar包集群跑
需要形成属于自己的脚本
Spark的两种Mode模式(集群模式均可用)
1 2 3 4 5 6 bin/spark-submit \ --master spark://node1:7077 \ --deploy-mode client \ --class org.apache.spark.examples.SparkPi \ /export/server/spark-2.4.5-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.5.jar \ 10
1 2 3 4 5 6 7 # Spark-standalone模式提交任务-cluster bin/spark-submit \ --master spark://node1:7077 \ --deploy-mode cluster \ --class org.apache.spark.examples.SparkPi \ /export/server/spark-2.4.5-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.5.jar \ 10
Spark的OnYarn原理详解
1-首先Driver启动在Client端的
2-向ResourceManager申请启动AppMaster
3-由AppMaster向ResourceManager申请资源
4-由ResourceManager(返回资源列表到AppMaster在)指定NodeManager启动Executor进程
5-Executor就是资源反向注册到Driver端
6-Driver端继续执行计算任务—–DAGScheduler和TaskScheduler
注意:执行到Action算子时,触发一个Job,并根据宽依赖开始划分Stage,每个Stage生成对应的TaskSet,之后将Task分发到各个Executor上执行 。
命令
1 2 3 4 5 6 bin/spark-submit \ --master yarn \ --deploy-mode client \ --class org.apache.spark.examples.SparkPi \ /export/server/spark/examples/jars/spark-examples_2.11-2.4.5.jar \ 10
1 2 3 4 5 6 bin/spark-submit \ --master yarn \ --deploy-mode cluster \ --class org.apache.spark.examples.SparkPi \ /export/server/spark/examples/jars/spark-examples_2.11-2.4.5.jar \ 10
如何查询结果(conf 底下的log4j.properties.tmplete需复制一份log4j.properties,否则看不到stdout的东西):
查看Container容器和Executor的关系
答案:Executor是运行在Continer里面
Executor是进程,Task是线程,最终执行计算的是一个线程执行一个分区的Task任务
Spark的IDEA编程指南 创建项目
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 <properties > <encoding > UTF-8</encoding > <scala.version > 2.11.12</scala.version > <scala.binary.version > 2.11</scala.binary.version > <spark.version > 2.4.5</spark.version > <hadoop.version > 2.7.5</hadoop.version > </properties > <dependencies > <dependency > <groupId > org.scala-lang</groupId > <artifactId > scala-library</artifactId > <version > ${scala.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-core_${scala.binary.version}</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-client</artifactId > <version > ${hadoop.version}</version > </dependency > <dependency > <groupId > junit</groupId > <artifactId > junit</artifactId > <version > 4.10</version > <scope > provided</scope > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.38</version > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.47</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-hive_2.11</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-hive-thriftserver_2.11</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-streaming_2.11</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-mllib_2.11</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-streaming-kafka-0-10_2.11</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-sql-kafka-0-10_2.11</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > com.hankcs</groupId > <artifactId > hanlp</artifactId > <version > portable-1.7.7</version > </dependency > <dependency > <groupId > redis.clients</groupId > <artifactId > jedis</artifactId > <version > 2.9.0</version > </dependency > </dependencies > <build > <outputDirectory > target/classes</outputDirectory > <testOutputDirectory > target/test-classes</testOutputDirectory > <resources > <resource > <directory > ${project.basedir}/src/main/resources</directory > </resource > </resources > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.0</version > <configuration > <source > 1.8</source > <target > 1.8</target > <encoding > UTF-8</encoding > </configuration > </plugin > <plugin > <groupId > net.alchim31.maven</groupId > <artifactId > scala-maven-plugin</artifactId > <version > 3.2.0</version > <executions > <execution > <goals > <goal > compile</goal > <goal > testCompile</goal > </goals > </execution > </executions > </plugin > </plugins > </build >
Scala编程指南[掌握]
3-构建基础Scala的WordCount版本
需求:使用IDEA实现wordcount案例
步骤:
1-首先创建SparkContent上下文环境
2-从外部文件数据源读取数据
3-执行flatmap执行扁平化操作
4-执行map转化操作,(word,1)
5-reduceByKey(预聚合)
6-输出到文件系统或打印即可
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package cn.spark.sparkbaseimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf , SparkContext }object _01SparkWordCount { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf ().setAppName("_01SparkWordCount" ).setMaster("local[*]" ) val sc: SparkContext = new SparkContext (conf) val fileRDD: RDD [String ] = sc.textFile("data/baseinput/words.txt" ) val valueRDD: RDD [String ] = fileRDD.flatMap(x => x.split("\\s+" )) val mapRDD: RDD [(String , Int )] = valueRDD.map(x => (x, 1 )) val resultRDD: RDD [(String , Int )] = mapRDD.reduceByKey((a, b) => a + b) resultRDD.foreach(x=>println(x)) println("=============================" ) resultRDD.collect().foreach(println(_)) println("=============================" ) resultRDD.saveAsTextFile("data/baseoutput/output-1" ) sc.stop() } }
Java编程指南[了解]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package cn.spark.sparkbase;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import scala.Tuple2;import java.util.Arrays;import java.util.List;public class _01SparkFirst { public static void main (String[] args) { SparkConf conf = new SparkConf ().setAppName("_01SparkFirst" ).setMaster("local[*]" ); JavaSparkContext jsc = new JavaSparkContext (conf); JavaRDD<String> fileRDD = jsc.textFile("data/baseinput/words.txt" ); JavaRDD<String> flatMapRDD = fileRDD.flatMap(x -> Arrays.asList(x.split("\\s+" )).iterator()); JavaPairRDD<String, Integer> mapRDD = flatMapRDD.mapToPair(x -> new Tuple2 <>(x, 1 )); JavaPairRDD<String, Integer> resultRDD = mapRDD.reduceByKey((a, b) -> a + b); List<Tuple2<String, Integer>> collectResult = resultRDD.collect(); collectResult.forEach(System.out::println); jsc.stop(); } }
总结1:
setAppName撰写this.getClass.getSimpleName.stripSuffix(“$”)方式
总结2:
1 2 //这里设置线程休眠为了能够看到WebUI的展示的DAG等监控界面 Thread.sleep(100*1000)
打包上传后提交任务
后续再讲解打包上传
需求:需要实现代码并提交到集群上运行
步骤;
1-首先pom实现package打包
2-上传虚拟机中
3-执行任务
1 2 3 4 5 6 7 8 # 自己实现的代码放在集群上跑任务,尝试使用Spark Yarn-client模式 bin/spark-submit \ --master yarn \ --deploy-mode client \ --class cn.spark.sparkbase._01SparkWordCountTar \ /export/data/spark-base_2.11-1.0.0.jar \ hdfs://node1.spark.cn:8020/wordcount/input \ hdfs://node1.spark.cn:8020/wordcount/output-8
查看结果
该任务需要掌握
SparkSubmit提交任务的参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 YARN: --master MASTER_URL集群资源管理器 spark://host:port,host1:port, mesos://host:port, yarn k8s://https://host:port, or local (Default: local[*]). --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). --class CLASS_NAME Your application's main class (for Java / Scala apps). --jars JARS Comma-separated list of jars to include on the driverand executor classpaths. --conf PROP=VALUE Arbitrary Spark configuration property. # 关键部分,Driver端和Executor端的配置 # Driver申请资源执行计算任务 --driver-memory MEM Driver端的内存默认1G Memory for driver (e.g. 1000M, 2G) (Default: 1024M). --driver-cores NUM Number of cores used by the driver, only in cluster mode(Default: 1). 这里注意,为什么local不需要使用driver-cores,因为使用local[*]模拟本机多线程 # Executor是真正执行资源和计算任务的 --num-executors NUM Number of executors to launch (Default: 2).启动多少个executors,默认2个 If dynamic allocation is enabled, the initial number of executors will be at least NUM.还可以动态开启 --executor-memory MEM 每个Executor的内存,默认1G Memory per executor (e.g. 1000M, 2G) (Default: 1G). --executor-cores NUM 每个executor有多少cores,yarn默认为1 Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode) --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). 如果有一个需求的数据量,需要满足Executor端的内存一定超越给定的数据量, cpu-cores越多越好(cpu-cores模拟的线程,每个线程执行1个分区的数据,如果业务数据分区越多,开启cpucores越多) bin/spark-submit \ --master yarn \ --deploy_mode cluster \ --driver-memory 2g \ --driver-cores 2 \ --num-executors 10 \ --executor-memory 2g \ --executor-cores 3 \ --class cn.spark.apple.mainclass \ jar包路径 \ 程序需要参数
IDEA直接读取HDFS文件
SparkCore–RDD 重难点知识
使用IDEA完成HDFS文件读取
RDD的引入
RDD的特性–面试必问[重点]
RDD的创建
RDD的转换算子
RDD的行动算子
RDD的案例实战[重点]
RDD是什么 RDD*(Resilient Distributed Dataset)*是弹性分布式数据集,是不可变,可分区,可并行计算 的集合。
RDD的五大属性
1-分区列表,每一个RDD都是不同分区构成的
2-计算函数:每个RDD的分区都有计算函数作用
3-依赖关系:每个RDD有一定依赖关系
4-可选分区器:也就是RDD有分区器,默认是Hash-Partitioner
5-可选位置优先性:移动计算不要移动存储
稍后以WordCount案例梳理五大属性
1-如何查看分区个数
2-如何查看分区器
补充图示
RDD创建的方法
并行度
思考:spark.default.parallelism啥东西
如何在代码中设置上述参数
查看textFile如何控制并行度?
如何使用Spark的rdd读取很多小文件?
1-数据集含义:userid-moviesid-rating-timestamp,一个用户在什么时间给什么电影评分
2-sc.textFile遇到小文件没有办法很好合并小文件的,即便重写第二个参数也没有作用
Spark中提供了小文件的处理方案sc.wholetextFile的方式,不会根据文件多少得到多少分区
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package cn.spark.sparkbase.rddoprationimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf , SparkContext }object _02SparkRDDFirst { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf () .setMaster("local[*]" ) .setAppName(this .getClass.getSimpleName.stripSuffix("$" )) .set("spark.default.parallelism" , "4" ) val sc = new SparkContext (conf) val fileRDD: RDD [String ] = sc.textFile("data/baseinput/words.txt" ) println("partitons length:" + fileRDD.getNumPartitions) println(s"partiiton length is:${fileRDD.partitions.length} " ) val filesRDD1: RDD [String ] = sc.textFile("data/baseinput/ratings100/" ) println("partitons length:" + filesRDD1.getNumPartitions) val filesRDD2: RDD [(String , String )] = sc.wholeTextFiles("data/baseinput/ratings100/" ) filesRDD2.take(3 ).foreach(println(_)) sc.stop() } }
补充案例
用textFile时,它的partition的数量是与文件夹下的文件数量(实例中用3个xxx.log文件)相关,一个文件就是一个partition(既然3个文件就是:partition=3),文件大小均匀的情况下,可以这么估算,具体按照hadoop的文件切片规则(goalsize->splitsize->1.1split每个大文件会进行切片)。
textFile分片源码(日志打开info,搜索input split)
spark wholeTextFile合并小文件的过程较为简单,mapreduce的CombineTextInputFormat合并过程较为复杂。
https://blog.csdn.net/qq_35241080/article/details/106065442
RDD的算子分类
Transformation转化算子
Action行动算子,使用action算子不会返回rdd,且会将结果返回给driver端。
第一种:RDD的单Value类型RDD
第二种:RDD的双Value的RDD
第三种:RDD的Key和value类型的RDD
RDD单Value类型算子
map算子
1-含义:经过fun的操作得到新的RDD
2-操作
filter算子
1-含义:经过fun的操作得到新的RDD,元素需要满足条件
2-操作
flatMap算子
1-含义:扁平化
2-操作
mapPartitions算子
1-含义
2-操作
mapParittionWithIndex算子
1-含义
2-操作
sample算子
1-含义
seed:保证每次随机切分的数据的可重复性
2-操作
glom算子
1-含义:查看每个分区的内容
2-操作
sortBy算子
1-含义:根据key或者value进行排序
2-操作:
coalese算子(合并)
如果缩减分区直接给定缩减到的数量,扩分区需要需要开启Shuffle为true
1-含义
2-操作
repartition算子
1-含义:重分区,调用coalesce(numPartitions,shuffle=true),因此缩减分区尽量用coalesce。
2-操作
源码:
repartitionAndSortWithinPartitions 重分区的时候就排序,比分完区再排序高效
RDD的双Value类型算子
集合的交集并集补集
union并集
intersection交集
distinct去重
subtract差集
zip拉链
需要保持元素和分区个数一致
RDD的Key和Value的算子
partitionBy:
1-含义:
2-操作:
****注意:Spark采用的分区有三种****:第一、水平分区,也就是sc.makerdd按照下标元素划分,第二、Hash划分根据数据确定性划分到某个分区,一般只给定分区数。第三、Range分区该方法一般按照元素大小进行划分不同区域,每个分区表示一个数据区域,如数组中每个数是[0,100]之间的随机数,Range划分首先将区域划分为10份,然后将数组中每个数字分发到不同的分区,比如将18分到(10,20]的分区,最后对每个分区进行排序。
reduceByKey
1-含义:在shuffle前进行预聚合,减少shuffle拉取的数据量
2-操作:
groupByKey
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package cn.spark.sparkbase.baseimport org.apache.spark.rdd.{RDD , ShuffledRDD }import org.apache.spark.{Aggregator , SparkConf , SparkContext }import scala.collection.mutable.ArrayBuffer object _01groupByKeyOperation { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val sc: SparkContext = new SparkContext (conf) val filerdd: RDD [String ] = sc.makeRDD(Array ("hello you" , "hello me hello she" , "hello spark" )) val flatmapRDD: RDD [String ] = filerdd.flatMap(_.split("\\s+" )) val mapRDD: RDD [(String , Int )] = flatmapRDD.map(x => (x, 1 )) val groupRDD1: RDD [(String , Iterable [Int ])] = mapRDD.groupByKey() groupRDD1.collect().foreach(println(_)) val createCombiner = (v: Int ) => ArrayBuffer (v) val mergeValue = (buf: ArrayBuffer [Int ], v: Int ) => buf += v val mergeCombiners = (c1: ArrayBuffer [Int ], c2: ArrayBuffer [Int ]) => c1 ++= c2 val valueRDD: ShuffledRDD [String , Int , ArrayBuffer [Int ]] = new ShuffledRDD [String , Int , ArrayBuffer [Int ]](mapRDD, new org.apache.spark.HashPartitioner (4 )) .setAggregator(new Aggregator ( createCombiner, mergeValue, mergeCombiners)) .setMapSideCombine(false ) println("source code create groupBykey" ) valueRDD.foreach(println(_)) sc.stop() } }
reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?
这四个算子底层调用的都是同一个方法combineByKeyWithClassTag只不过他们的参数传值不同:
reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同 FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同 AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同 CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则可以不相同。
1 2 3 4 5 CombineByKey与combineByKeyWithClassTag是一样的,为了兼容所以保留着 This method is here for backward compatibility. It does not provide combiner classtag information to the shuffle. jvm的类型擦除问题,泛型T,K,U,V的类型信息,类Java语言只能在编译的阶段获取到类型参数,一旦代码被送入JVM运行,就会被擦除. 1. 只要在定义泛型函数或泛型类的时候,在泛型标识后加上:ClassTag关键字,并且导入 scala.reflect.ClassTag 包即可 2. 或者使用:Manifest关键字,这个不需要导包
用法举例: FoldByKey(同一分区同一key与初始值计算一次,n个分区同一key与初始值计算n次)
aggreateByKey 不同分区的最大值相加
为避免reduceByKey内存问题,可用aggregateByKey。
1 To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.
combineByKey(可用来求平均值)
sortByKey
思考 :sortBy是全局排序吗?是。 rdd. sortBy(_._2,false). foreach(println) //虽然sortBy是全局排序,但由于不止一个分区,foreach输出的时候分区的先后顺序随机,又把全局排序后的数据打乱了。
按照RangePartitioner shuffle会有倾斜的问题
1-含义:
2-操作:
join
1-含义:
2-操作:
cogroup
1-含义:
2-操作:
cartisian
1-含义:
2-操作:
mapvalue
1-含义:
对Value进行操作
2-操作:
RDD的Action算子
reduce
collect(拉取的分区结果数据很大的情况下,会造成driver端的内存溢出,可以foreach打印出来,或者saveasTextFile保存到硬盘查看)
count
first
take(从第一个分区开始,满足要求就不再遍历获取,collect)
takeSample
takeorder
aggreate
fold
countByKey
foreach
rdd.foreach方法在执行的过程中的打印方法是在Executor中执行的,每个Executor在执行完自己的逻辑之后就执行foreach进行打印,因此在本地多线程执行的时候,可能List(3,4)是有可能先执行完成,所以会存在顺序错乱的情况。多线程,多分区才会有这种情况。而value.collect().foreach(println)这种写法的print是在数据采集到Driver之后,在Driver端打印的。所以顺序不会乱
常见的RDD统计操作
RDD的关键算子练习
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 package cn.spark.sparkbase.baseimport org.apache.spark.{SparkConf , SparkContext , TaskContext }import org.junit.Test class _02RDDTest { private val sc = new SparkContext (new SparkConf ().setAppName("_02RDDTest" ).setMaster("local[*]" )) @Test def test01 (): Unit = { sc.parallelize(Seq (1 , 2 , 3 , 4 , 5 )) .map(x => x * 2 ) .foreach(println(_)) sc.parallelize(Seq (1 , 2 , 3 , 4 , 5 )) .filter(x => x > 3 ) .foreach(println(_)) } @Test def test02 : Unit = { sc.parallelize(Array (1 , 2 , 3 , 4 , 5 , 6 ), 2 ) .map(x => (x * 2 )) .foreach(println(_)) sc.parallelize(Array (1 , 2 , 3 , 4 , 5 , 6 ), 2 ) .mapPartitions(iter => { iter.foreach(println(_)) iter }).collect() sc.parallelize(Array (1 , 2 , 3 , 4 , 5 , 6 ), 2 ) .mapPartitions(iter => { val iterator: Iterator [Int ] = iter.map(item => item * 2 ) iterator }).collect().foreach(println(_)) println("上面的等价写法" ) sc.parallelize(Array (1 , 2 , 3 , 4 , 5 , 6 ), 2 ) .mapPartitions(iter => { iter.map(item => item * 2 ) }).collect().foreach(println(_)) } @Test def test03 : Unit = { sc.parallelize(Array (1 , 2 , 3 , 4 , 5 , 6 ), 2 ) .mapPartitionsWithIndex((index, iter) => { println("index:" + index) iter.map(_ * 2 ) }).collect().foreach(println(_)) println("改进的方法" ) sc.parallelize(Array (1 , 2 , 3 , 4 , 5 , 6 ), 2 ) .mapPartitionsWithIndex((index, iter) => { iter.map(x => "index is:" + index + "\tvalue is:" + x * 2 ) }).collect().foreach(println(_)) println("想用mapPartition的方法实现分区有哪些元素" ) sc.parallelize(Array (1 , 2 , 3 , 4 , 5 , 6 ), 2 ) .mapPartitions(iter => { println("partitionID:" , TaskContext .getPartitionId()) iter.map(x => x * 2 ) }).collect().foreach(println(_)) println("想用mapPartition的方法实现分区有哪些元素,改进方法" ) sc.parallelize(Array (1 , 2 , 3 , 4 , 5 , 6 ), 2 ) .mapPartitions(iter => { iter.map(x => "partitionID:" + TaskContext .getPartitionId() + "\tvalue is:" + x * 2 ) }).collect().foreach(println(_)) } }
案例1:groupByKey(要求和需要再sum)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Test def test01 (): Unit = { val value: RDD [(String , Int )] = sc.parallelize(Seq (("a" , 1 ), ("a" , 1 ), ("b" , 1 ))) value .groupByKey() .collect() .foreach(println(_)) value .groupByKey() .map(x => (x._1, x._2.sum)) .collect() .foreach(println(_)) }
案例2:combineBykey(平均值)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Test def test02 : Unit = { val rdd: RDD [(String , Double )] = sc.parallelize(Seq ( ("zhangsan" , 99.0 ), ("zhangsan" , 96.0 ), ("lisi" , 97.0 ), ("lisi" , 98.0 ), ("zhangsan" , 97.0 )) ) val createCombiner = (curr: Double ) => (curr, 1 ) val mergeValue = (curr: (Double , Int ), nextValue: Double ) => (curr._1 + nextValue, curr._2 + 1 ) val mergeCombiners = (curr1: (Double , Int ), agg1: (Double , Int )) => (curr1._1 + agg1._1, curr1._2 + agg1._2) val valueRDD: RDD [(String , (Double , Int ))] = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners) valueRDD.foreach(println(_)) valueRDD.map(x => (x._1, x._2._1 / x._2._2)).foreach(println(_)) }
案例3:foldByKey
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Test def test03 : Unit = { sc.parallelize(Seq (("a" , 1 ), ("a" , 1 ), ("b" , 1 ))) .aggregateByKey(0 )(_ + _, _ + _) .collect() .foreach(println(_)) sc.parallelize(Seq (("a" , 1 ), ("a" , 1 ), ("b" , 1 ))) .foldByKey(0 )(_ + _) .collect() .foreach(println(_)) sc.parallelize(Seq (("a" , 1 ), ("a" , 1 ), ("b" , 1 ))) .foldByKey(0 )((a, b) => a + b) .collect() .foreach(println(_)) }
1 2 3 4 5 @Test def test04 (): Unit ={ val rdd = sc.parallelize(List (("a" , 1 ), ("a" , 3 ), ("b" , 2 ))) rdd.collectAsMap().foreach(println(_)) }
1 2 3 4 5 6 7 8 9 10 11 12 13 @Test def test04 (): Unit ={ val rdd = sc.parallelize(List (("a" , 1 ), ("a" , 3 ), ("b" , 2 ))) rdd.collectAsMap().foreach(println(_)) val array: Array [RDD [(String , Int )]] = rdd.randomSplit(Array (0.6 , 0.4 ), 123 L) val traingSet: RDD [(String , Int )] = array(0 ) val testSet: RDD [(String , Int )] = array(1 ) println("=======================" ) traingSet.collect().foreach(println(_)) }
1 2 3 4 5 6 7 8 9 10 11 12 13 @Test def testJoin (): Unit ={ val empRDD: RDD [(Int , String )] = sc.parallelize( Seq ((1001 , "zhangsan" ), (1002 , "lisi" ), (1003 , "wangwu" ), (1004 , "zhangliu" )) ) val deptRDD: RDD [(Int , String )] = sc.parallelize( Seq ((1001 , "sales" ), (1002 , "tech" )) ) empRDD.join(deptRDD).foreach(println(_)) println("============" ) empRDD.leftOuterJoin(deptRDD).foreach(println(_)) }
实战案例 实战1:Spark实战PV、UV、访问来源TOPN
1 2 3 4 5 日志格式: 127.0.0.1 - - [28/Nov/2019:08:37:25 +0800] "GET / HTTP/1.1" 200 57621 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36" 1、pv的全称是page view,译为页面浏览量或点击量,通常是衡量一个网站甚至一条网络新闻的指标。PV反映的是浏览某网站的页面数,所以每刷新一次也算一次。就是说PV与来访者的数量成正比,但PV并不是页面的来访者数量,而是网站被访问的页面数量。 2、uv的全称是unique view,译为通过互联网访问、浏览这个网页的自然人,访问网站的一台电脑客户端被视为一个访客,在同一天内相同的客户端只被计算一次。
*
步骤:
实现PV-PageView:
1-首先读取数据
2-通过map转换函数对于每一行数据都统计为一个PV
3-需要通过reduceByKey累加计算(AggerateByKey,FoldByKey)
4-实现PV排序操作
5-打印输出或保存起来
实现UV-UserView
1-读取数据
2-通过map转换函数将IP地址选择出来
3-使用distinct去重,并实现统计
4-使用reduceByKey实现相同key的value的统计
5-实现UV的排序输出
6-saveAsTextFile
实现TopK-排名前几位
1-读取数据
2-通过map选择出用户点击的URL或APP等,X(10)
3-进一步实现过滤统计
4-实现TopK输出
5-saveAsTextFile
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 package cn.spark.sparkbase.proimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf , SparkContext }object _01SparkPvUvTopK { def main (args: Array [String ]): Unit = { val sc: SparkContext = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val sc = new SparkContext (conf) sc } val fileRDD: RDD [String ] = sc.textFile("data/baseinput/access.log" ) val mapRDD: RDD [(String , Int )] = fileRDD.map(x => ("PV" , 1 )) val resuleRDD: RDD [(String , Int )] = mapRDD.reduceByKey(_ + _) resuleRDD.foreach(println(_)) val ipValue: RDD [String ] = fileRDD.map(x => x.split("\\s+" )).map(x => x(0 )) val uvValue: RDD [(String , Int )] = ipValue.distinct().map(x => ("UV" , 1 )) val uvResult: RDD [(String , Int )] = uvValue.reduceByKey(_ + _) uvResult.foreach(println(_)) val value1RDD: RDD [(String , Int )] = fileRDD.map(_.split("\\s+" )).filter(x => x.length > 10 ).map(x => (x(10 ), 1 )) val result1RDD: RDD [(String , Int )] = value1RDD.reduceByKey(_ + _).sortBy(x => x._2, false ).filter(x => x._1 != "\"-\"" ) result1RDD.take(5 ).foreach(println(_)) sc.stop() } }
实战2:Spark实战区域热点查询
需求:
需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬
数据集:
用户IP日志信息:记录的是用户IP
1 2 20090121000132095572000|125.213.100.12|show.51.com|/ 关注的就是第二个字段
1 1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
125.213.100.12 对比 1.0.1.0|1.0.3.255 无法比较大小
方法:1.0.1.0|1.0.3.255部分已经转化为Long类型数据16777472|16778239
如果125.213.100.12 转化为Long类型数据,可以拿到该数据和IP.txt数据进行对比
如何对比?一个数字在一个连续数字区域中?这里选择的是二分查找法
统计:如果一个用户IP位于城市IP端之内,可以实现统计分析
步骤分析:
1-创建SparkContext环境
2-读取两个文件,ip和用户地址
3-将用户IP对比IP信息段
3-1首先将用户ip转化为long类型
3-2将long类型ip放在ip地址信息段中进行查询(二分查找法)
3-3根据查找的索引下标,得到经度和维度
4-根据经度和维度统计区域热度
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 package cn.spark.sparkbase.proimport org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf , SparkContext }object _05IPCheck {/ def binarySearch (ipLong: Long , boradcastValue: Array [(String , String , String , String )]): Int = { var start = 0 var end = boradcastValue.length - 1 while (start <= end) { var middle = (start + end) / 2 if (ipLong >= boradcastValue(middle)._1.toLong && ipLong <= boradcastValue(middle)._2.toLong) { return middle } if (ipLong > boradcastValue(middle)._2.toLong) { start = middle + 1 } if (ipLong < boradcastValue(middle)._1.toLong) { end = middle - 1 } } -1 } def ipToLong (ip: String ): Long = { val ipArr: Array [Int ] = ip.split("[.]" ).map(s => Integer .parseInt(s)) var ipnum = 0 L for (i <- ipArr) { 0.00000000 .00000000 .11110000 ipnum = i | (ipnum << 8 ) } ipnum } def main (args: Array [String ]): Unit = { val sc: SparkContext = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val sc = new SparkContext (conf) sc } val userIPRDD: RDD [String ] = sc.textFile("data/baseinput/ip/20190121000132.394251.http.format" ) val userSplitRDD: RDD [String ] = userIPRDD.map(_.split("\\|" )).map(x => x(1 )) val ipRangeRDD: RDD [String ] = sc.textFile("data/baseinput/ip/ip.txt" ) val ipRDD: RDD [(String , String , String , String )] = ipRangeRDD.map(_.split("\\|" )).map(x => (x(2 ), x(3 ), x(x.length - 2 ), x(x.length - 1 ))) val broadcastIpValue: Broadcast [Array [(String , String , String , String )]] = sc.broadcast(ipRDD.collect()) val resultRDD: RDD [((String , String ), Int )] = userSplitRDD.mapPartitions(iter => { val boradcastValue: Array [(String , String , String , String )] = broadcastIpValue.value iter.map(ip => { val ipLong: Long = ipToLong(ip) val index: Int = binarySearch(ipLong, boradcastValue) ((boradcastValue(index)._3, boradcastValue(index)._4), 1 ) }) }) resultRDD.reduceByKey(_ + _).sortBy(_._2, false ).take(5 ).foreach(println(_)) sc.stop() } }
总结:
直接利用ip转化为long类型的额工具类
注意使用二分查找方法
需要使用广播变量
实战3:Spark实战搜狗分词统计查询
1 2 3 4 5 <dependency > <groupId > com.hankcs</groupId > <artifactId > hanlp</artifactId > <version > portable-1.7.7</version > </dependency >
读取数据
搜索关键词统计
用户搜索点击统计
搜索时间段统计
步骤
读取数据:sc.textFile,数据处理,这里可以使用map或mappartition,map(x=>x.split(“\\s+”)
搜索关键词统计:首先拿到查询词,对查询词进行分词,根据分词进行统计
用户搜索点击统计:需要拿到用户ID和用户查询词,然后根据(用户ID,用户查询词)进行统计
搜索时间段统计:获取用户的搜索的时间段,根据时间段统计排序
代码
1-分词代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package cn.spark.sparkbase.proimport java.utilimport com.hankcs.hanlp.HanLP import com.hankcs.hanlp.seg.common.Term import com.hankcs.hanlp.tokenizer.StandardTokenizer import scala.collection.JavaConverters ._object _02HanLp { def main (args: Array [String ]): Unit = { val terms: util.List [Term ] = HanLP .segment("我是黑马程序员的一匹黑马" ) println(terms) println(terms.asScala.map(_.word.trim)) val terms1: util.List [Term ] = StandardTokenizer .segment("我是黑马程序员的一匹黑马" ) println(terms1.asScala.map(_.word.trim)) val arr: Array [String ] = """00:00:00 2982199073774412 [360安全卫士] 8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html""" .split("\\s+" ) println(arr(2 ).replaceAll("\\[|\\]" , "" )) } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 package cn.spark.sparkbase.proimport java.utilimport com.hankcs.hanlp.HanLP import com.hankcs.hanlp.seg.common.Term import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf , SparkContext }import scala.collection.JavaConverters ._import scala.collection.mutableobject _04SougouCount { def main (args: Array [String ]): Unit = { val sc: SparkContext = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val sc = new SparkContext (conf) sc } val sougouRDD: RDD [String ] = sc.textFile("data/baseinput/sougu/SogouQ.reduced" ) println(s"sougouRDD count value is:${sougouRDD.count()} " ) val recordRDD: RDD [SogouRecord ] = sougouRDD .filter(line => line != null && line.trim.split("\\s+" ).length == 6 ) .mapPartitions(iter => { iter.map(record => { val arr: Array [String ] = record.split("\\s+" ) SogouRecord (arr(0 ), arr(1 ), arr(2 ).replaceAll("\\[|\\]" , "" ), arr(3 ).toInt, arr(4 ).toInt, arr(5 )) }) }) recordRDD.take(5 ).foreach(println(_)) val valueRDD: RDD [String ] = recordRDD.mapPartitions(iter => { iter.flatMap(record => { val terms: util.List [Term ] = HanLP .segment(record.queryWords) terms.asScala.map(_.word.trim) }) }) val keyWordCount: RDD [(String , Int )] = valueRDD.map(x => (x, 1 )).reduceByKey(_ + _).sortBy(_._2, false ) println("========搜索关键词统计============" ) keyWordCount.take(5 ).foreach(println(_)) sc.stop() } }
RDD的DAG
为什么需要有DAG?
Spark作为第三代计算引擎,通过有向无环图构建Spark的任务。
什么是DAG?
有向无环图,Spark或Impala或Flink都是基于DAG构建任务
通过4040端口查看DAG有向无环图
DAG如何划分Stage ?
划分Stages的依据是发生宽依赖 ,从当前job的最后一个算子往前推,遇到宽依赖,那么当前在这个批次中的所有算子操作都划分成一个stage
DAG划分Stage优势是什么?
同一个Stage内可以并行计算,这样能够加快计算速度
Spark调度流程分析
spark的调度分为资源申请和任务调度。
如下图 110为资源申请,1219为任务调度。
12其实在driver启动后初始化就创建了DAGSchedluer和TaskScheduler,TaskScheduler通过后台进程去向ResourceManager申请资源。
粗粒度资源申请(Spark)
在Application执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的task执行完成后,才会释放这部分资源。
优点:在Application执行之前,所有的资源都申请完毕,每一个task运行时直接使用资源就可以了,不需要task运行时在执行前自己去申请资源,task启动就快了,task执行快了,stage执行就快了,job就快了,application执行就快了。
缺点:直到最后一个task执行完成才会释放资源,集群的资源无法充分利用。当数据倾斜时更严重。
细粒度资源申请(MapReduce)
Application执行之前不需要先去申请资源,而是直接执行,让job中的每一个task在执行前自己去申请资源,task执行完成就释放资源。
优点:集群的资源可以充分利用。
缺点:task自己去申请资源,task启动变慢,Application的运行就相应的变慢了。
RDD依赖关系 案例
为什么有依赖
依赖关系分为几类?
窄依赖:
一个子RDD依赖于一个父RDD,就是窄依赖(错误)
一个父RDD对应一个子RDD,就是窄依赖
宽依赖:
一个子RDD依赖于多个父RDD,就是宽依赖 (错误)
一个父RDD对应多个子RDD,就是宽依赖
shuffle
总结:
区分宽依赖还是窄依赖,看父RDD会不会被子RDD分享,被分享就是宽依赖,否则就是窄依赖。
区分是否发生shuffle,父RDD的数据会不会被重新分桶 (分到一个或多个桶,可能shuffle前后分区数一样.)
RDD缓存
什么是缓存
Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或者缓存数据集。
为什么需要缓存?
对于某些比较昂贵的算子 可以将计算结果缓存起来,重复利用加快计算速度
如果将数据缓存起来可以进行容错 ,因为缓存可以将数据保存在内存或磁盘中
缓存有几种分类?
两种:Cache和Persist
缓存的级别有哪些?
1-内存
2-磁盘
3-堆外内存(off_heap,不受限于jvm管理)
4-序列化—网络传输
5-副本—容错
缓存如何选择?
尽量选择内存,如果内存放不下可以尝试序列化,除非算子昂贵可以放在磁盘,如果容错恢复增加副本机制
缓存怎么使用?
cache
persist
unpersist
经过缓存的数据明显加快计算速度,一般用于昂贵算子,如shuffle算子的缓存
缓存有什么问题?
缓存的数据可能存在丢失的情况,考虑使用非易失介质如HDFS分布式文件系统
引出checkpoint检查点机制
RDD的CheckPoint
共享变量 广播变量
关键点:
1-广播变量,是在driver端定义的,executor端拥有副本,在executor端是不能改变广播变量的值
2-广播变量获取的时候是从BlockManager中获取数据,如果本地没有从Driver端获取变量副本
3-如何使用:sc.broadcast(map.collect)
案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package cn.spark.sparkbase.baseimport org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf , SparkContext }object _04broadcast { def main (args: Array [String ]): Unit = { val sc:SparkContext ={ val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val sc = new SparkContext (conf) sc } val kvFruit: RDD [(Int , String )] = sc.parallelize(List ((1 ,"apple" ),(2 ,"orange" ),(3 ,"banana" ),(4 ,"grape" ))) val fruitMap: collection.Map [Int , String ] =kvFruit.collectAsMap val fruitsIds: RDD [Int ] = sc.parallelize(Array (2 , 4 , 1 , 3 )) fruitsIds.map(x=>fruitMap(x)).collect().foreach(println(_)) val broadMap: Broadcast [collection.Map [Int , String ]] = sc.broadcast(fruitMap) fruitsIds.map(x=>broadMap.value(x)).collect().foreach(println(_)) sc.stop() } }
累加器
Accumulator只提供了累加的功能,只能累加,不能减少。
累加器只能在Driver端构建,并只能从Driver端读取结果,在Task端只能进行累加。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package cn.spark.sparkbase.baseimport org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{Accumulator , SparkConf , SparkContext }object _05accumulate { def main (args: Array [String ]): Unit = { val sc: SparkContext = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val sc = new SparkContext (conf) sc } var counter1 = 0 val seq = Seq (1 , 2 , 3 ) seq.map(x => counter1 += x) println("counter result is:" + counter1) var counter2 = 0 val rdd1: RDD [Int ] = sc.parallelize(seq) rdd1.foreach(x => counter2 += x) println(counter2) val acc: Accumulator [Int ] = sc.accumulator(0 ) rdd1.foreach(x=>acc+=x) println(acc) val acc_count: LongAccumulator = sc.longAccumulator("acc_count" ) rdd1.foreach(x=>acc_count.add(x)) println(acc_count) println(acc_count.value) } }
累加器重复累加问题
Spark中的一系列transform操作都会构造成一长串的任务链,此时就需要通过一个action操作来触发(lazy的特性),accumulator也是如此。
因此在一个action操作之后,调用value方法查看,是没有任何变化
第一次action操作之后,调用value方法查看,变成了5
第二次action操作之后,调用value方法查看,变成了10
原因就在于第二次action操作的时候,又执行了一次累加器的操作,同个累加器,在原有的基础上又加了5,从而变成了10
解决方案具体来说,在以下场景中累加器的行为会表现出重复累加:
没有缓存的情况下 :如果没有使用cache或persist,每次调用action(例如count、collect等),RDD会重新执行所有依赖的transform操作,累加器的值会在每次action时重新计算并增加。
有缓存的情况下 :通过使用cache或persist,可以将RDD的中间结果缓存到内存或磁盘中。这样,后续的action操作会直接从缓存中读取结果,而不再重新执行整个任务链,这样累加器的值也不会再次增加。
共享变量案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package cn.spark.sparkbase.baseimport org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf , SparkContext }object _06acc_broadcast { def main (args: Array [String ]): Unit = { val sc: SparkContext = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val sc = new SparkContext (conf) sc } val list: List [String ] = List ("," , "." , "!" , "#" , "$" , "%" ) val broadcastList: Broadcast [List [String ]] = sc.broadcast(list) val acc_count: LongAccumulator = sc.longAccumulator("acc_count" ) val fileRDD: RDD [String ] = sc.textFile("data/baseinput/words1.txt" ) val wordscount: RDD [String ] = fileRDD .filter(line => line != null && line.trim.length > 0 ) .flatMap(line => line.split("\\s+" )) .filter(word => { val listValue: List [String ] = broadcastList.value val isFlag: Boolean = listValue.contains(word) if (isFlag) { acc_count.add(1 L) } !isFlag }) println("wordcount的结果" ) wordscount.map((_, 1 )).reduceByKey(_ + _).collect().foreach(println(_)) println("非单词的组合:" , acc_count.value) } }
Kryo序列化
1 2 3 4 5 6 7 8 1. spark.serializer:序列化时用的类,需要申明为org.apache.spark.serializer.KryoSerializer。这个设置不仅控制各个worker节点之间的混洗数据序列化格式,同时还控制RDD存到磁盘上的序列化格式及广播变量的序列化格式。 2. spark.kryoserializer.buffer:每个Executor中的每个core对应着一个序列化buffer。如果你的对象很大,可能需要增大该配置项。其值不能超过spark.kryoserializer.buffer.max 3. spark.kryoserializer.buffer.max:允许使用序列化buffer的最大值 4. spark.kryo.classesToRegister:向Kryo注册自定义的的类型,类名间用逗号分隔 5. spark.kryo.referenceTracking:跟踪对同一个对象的引用情况,这对发现有循环引用或同一对象有多个副本的情况是很有用的。设置为false可以提高性能 6. spark.kryo.registrationRequired:是否需要在Kryo登记注册?如果为true,则序列化一个未注册的类时会抛出异常 7. spark.kryo.registrator:为Kryo设置这个类去注册你自定义的类。最后,如果你不注册需要序列化的自定义类型,Kryo也能工作,不过每一个对象实例的序列化结果都会包含一份完整的类名,这有点浪费空间 8. spark.kryo.unsafe:如果想更加提升性能,可以使用Kryo unsafe方式
kryo的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class MyKryoRegistrator implements KryoRegistrator { @Override public void registerClasses( Kryo kryo) { kryo.register(User .class ); kryo.register(User [].class ); kryo.register(scala.collection.convert.Wrappers $.class ); } } @Data public class User implements Serializable { int id; String name; String city; List <String > hobby; } val conf = new SparkConf ().setAppName(this .getClass.getSimpleName) conf .set("spark.serializer" , "org.apache.spark.serializer.KryoSerializer" ) .set("spark.kryo.registrationRequired" , "true" ) .set("spark.kryo.registrator" ,classOf[MyKryoRegistrator ].getName) val sc = new SparkContext (conf) conf .set("spark.serializer" , "org.apache.spark.serializer.KryoSerializer" ) .registerKryoClasses(Array (classOf[User ]))
声明序列化为kryo
扩展2:groupBy什么时候是窄依赖或宽依赖?
内存模型
问题1:
Spark和Flink不一样,Flink的内存对用户无法操作,Spark的内存用户可以配置
SparkCore的内存模型
问题2:
Spark的shuffle
Spark不同版本的shuffle
1.2之前HashShuffleManager
1.2之后SortShuffleManager
Shuffle阶段
shuffle write:mapper阶段,上一个stage得到最后的结果写出 shuffle read :reduce阶段,下一个stage拉取上一个stage进行合并
HashShuffleManager
未经优化的hashShuffleManager
优化的hashShuffleManager
SortShuffleManager
bypassMerge机制触发条件如下
shuffle write task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(*默认为200* )
不能是聚合类的算子,比如reduceByKey,本质是map side combine是否为true
此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
普通机制
(1)该模式下,数据会先写入一个内存数据结构中(默认5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。
(2) 接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
注意:
shuffle中的定时器:定时器会检查内存数据结构的大小,如果内存数据结构空间不够,那么会申请额外的内存,申请的大小满足如下公式:
applyMemory=nowMenory*2-oldMemory
申请的内存=当前的数据内存情况*2-上一次的内嵌情况
意思就是说内存数据结构的大小的动态变化,如果存储的数据超出内存数据结构的大小,将申请内存数据结构存储的数据*2-内存数据结构的设定值的内存大小空间。申请到了,内存数据结构的大小变大,内存不够,申请不到,则发生溢写。
由于Spark是一个内存计算框架,没有办法严格控制Executor内存,只能采用监控方式监控内存,内存初始值为5M,当超出的时候,如5.02M,监控内存数据的对象会去申请5.022-5=5.04M内存,*如果申请到了就不需要溢写,否则会发生溢写。
区别:
(a)Spark内存数据初始值为5M,他可以申请扩大,而MR固定的Buffer为100M
(b)溢写磁盘文件还带有索引文件,索引文件是对磁盘文件的描述,还记录每个分区的起始位置start offset 和终止位置end offset 。
(3)排序
在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。
(4)溢写
排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java 的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
(5)merge
一个task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并成1个磁盘文件,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。
bypass和普通机制的区别
第一,磁盘写机制不同;
第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
两个最终都是会产生2M(map task number)个磁盘小文件。
总结: SortShuffle分为普通机制和bypass机制,普通机制在内存数据结构(默认为5M)完成排序,而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 下面列出Shuffle 类不同分类算子 去重 def distinct ()def distinct (numPartitions: Int )聚合 def reduceByKey (func: (V , V ) => V , numPartitions: Int ): RDD [(K , V )]def reduceByKey (partitioner: Partitioner , func: (V , V ) => V ): RDD [(K , V )]def groupBy [K ](f: T => K , p: Partitioner ):RDD [(K , Iterable [V ])]def groupByKey (partitioner: Partitioner ):RDD [(K , Iterable [V ])]def aggregateByKey [U : ClassTag ](zeroValue: U , partitioner: Partitioner ): RDD [(K , U )]def aggregateByKey [U : ClassTag ](zeroValue: U , numPartitions: Int ): RDD [(K , U )]def combineByKey [C ](createCombiner: V => C , mergeValue: (C , V ) => C , mergeCombiners: (C , C ) => C ): RDD [(K , C )]def combineByKey [C ](createCombiner: V => C , mergeValue: (C , V ) => C , mergeCombiners: (C , C ) => C , numPartitions: Int ): RDD [(K , C )]def combineByKey [C ](createCombiner: V => C , mergeValue: (C , V ) => C , mergeCombiners: (C , C ) => C , partitioner: Partitioner , mapSideCombine: Boolean = true , serializer: Serializer = null ): RDD [(K , C )]排序 def sortByKey (ascending: Boolean = true , numPartitions: Int = self.partitions.length): RDD [(K , V )]def sortBy [K ](f: (T ) => K , ascending: Boolean = true , numPartitions: Int = this .partitions.length)(implicit ord: Ordering [K ], ctag: ClassTag [K ]): RDD [T ]重分区 def coalesce (numPartitions: Int , shuffle: Boolean = false , partitionCoalescer: Option [PartitionCoalescer ] = Option .empty)def repartition (numPartitions: Int )(implicit ord: Ordering [T ] = null )集合或者表操作 def intersection (other: RDD [T ]): RDD [T ]def intersection (other: RDD [T ], partitioner: Partitioner )(implicit ord: Ordering [T ] = null ): RDD [T ]def intersection (other: RDD [T ], numPartitions: Int ): RDD [T ]def subtract (other: RDD [T ], numPartitions: Int ): RDD [T ]def subtract (other: RDD [T ], p: Partitioner )(implicit ord: Ordering [T ] = null ): RDD [T ]def subtractByKey [W : ClassTag ](other: RDD [(K , W )]): RDD [(K , V )]def subtractByKey [W : ClassTag ](other: RDD [(K , W )], numPartitions: Int ): RDD [(K , V )]def subtractByKey [W : ClassTag ](other: RDD [(K , W )], p: Partitioner ): RDD [(K , V )]def join [W ](other: RDD [(K , W )], partitioner: Partitioner ): RDD [(K , (V , W ))]def join [W ](other: RDD [(K , W )]): RDD [(K , (V , W ))]def join [W ](other: RDD [(K , W )], numPartitions: Int ): RDD [(K , (V , W ))]def leftOuterJoin [W ](other: RDD [(K , W )]): RDD [(K , (V , Option [W ]))]
SparkSQL引入
SparkCore撰写代码非常复杂,引入SparkSQL处理结构化数据
SparkSQL基于Hive之上做了改进
什么是sparksql
SparkSQL和Hive的关系(发展历程)
SparkSQL的数据结构
SparkCore数据结构:RDD
SparkSQL数据结构:DataFrame和DataSet
思考:三种数据结构关系
在2.0中DataFrame = DataSet[Row]
DataFrame是在运行时候进行类型的检查,而Dataset是在编译的时候进行类型检查,DataSet安全性更好
SparkSQL的DataFrame创建的四种方法
SparkSession=sqlcontext+hivecontext+sparkcontext
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package cn.spark.sparksqlimport org.apache.spark.SparkContext import org.apache.spark.sql.{Dataset , SparkSession }object _01SparkSession { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN" ) val valueDS: Dataset [String ] = spark.read.textFile("data/baseinput/words.txt" ) println("counts:" + valueDS.count()) } }
RDD转DF的方式
1-RDD配合样例类实现转化为DF
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package cn.spark.sparksqlimport org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame , SparkSession }case class People (id: Int , name: String , age: Int )object _02toDFWay1 { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .getOrCreate() import spark.implicits._ val sc: SparkContext = spark.sparkContext val fileRDD: RDD [String ] = sc.textFile("data/baseinput/sql/people1.txt" ) val peopleRDD: RDD [People ] = fileRDD.map(_.split("\\s+" )).map(x => People (x(0 ).toInt, x(1 ), x(2 ).toInt)) val peopleDF: DataFrame = peopleRDD.toDF() peopleDF.show() peopleDF.printSchema() spark.stop() } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package cn.spark.sparksqlimport org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{DataTypes , StructType }import org.apache.spark.sql.{DataFrame , Row , SparkSession }object _04toDFWay3 { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .getOrCreate() import spark.implicits._ val sc: SparkContext = spark.sparkContext val fileRDD: RDD [String ] = sc.textFile("data/baseinput/sql/people1.txt" ) val peopleRDD: RDD [Row ] = fileRDD.map(_.split("\\s+" )).map(x => Row (x(0 ).toInt, x(1 ), x(2 ).toInt)) val schema: StructType = new StructType () .add("id" , DataTypes .IntegerType , true ) .add("name" , "string" , true ) .add("age" , "int" , true ) val peopleDF: DataFrame = spark.createDataFrame(peopleRDD, schema) peopleDF.show(2 ,false ) peopleDF.printSchema() spark.stop() } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 package cn.spark.sparksqlimport org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{DataTypes , IntegerType , LongType , StringType , StructField , StructType }import org.apache.spark.sql.{DataFrame , Row , SparkSession }object _04toDFWay4 { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .getOrCreate() val sc: SparkContext = spark.sparkContext val fileRDD: RDD [String ] = sc.textFile("data/baseinput/sql/people1.txt" ) val peopleRDD: RDD [Row ] = fileRDD.map(_.split("\\s+" )).map(x => Row (x(0 ).toInt, x(1 ), x(2 ).toInt)) val schema: StructType = StructType ( StructField ("f1" , IntegerType , true ) :: StructField ("f2" , StringType , false ) :: StructField ("f3" , IntegerType , false ) :: Nil ) val peopleDF: DataFrame = spark.createDataFrame(peopleRDD, schema) peopleDF.show(2 , false ) peopleDF.printSchema() spark.stop() } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package cn.spark.sparksqlimport org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame , SparkSession }object _03toDFWay2 { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .getOrCreate() import spark.implicits._ val sc: SparkContext = spark.sparkContext val fileRDD: RDD [String ] = sc.textFile("data/baseinput/sql/people1.txt" ) val peopleRDD = fileRDD.map(_.split("\\s+" )).map(x => (x(0 ).toInt, x(1 ), x(2 ).toInt)) val peopleDF: DataFrame = peopleRDD.toDF("id" ,"name" ,"age" ) peopleDF.show() peopleDF.printSchema() spark.stop() } }
补充:
样例类的rdd到df使用比较多的,特别是字段多的时候
如果需要动态增加字段,可以使用strucedType的方式
SparkSQL的DataFrame的花式操作
DSL案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 package cn.spark.sparksqlimport org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame , SparkSession }import org.apache.spark.sql.functions._case class People1 (id: Int , name: String , age: Int )object _06DataFrameOpration { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .getOrCreate() import spark.implicits._ val sc: SparkContext = spark.sparkContext val fileRDD: RDD [String ] = sc.textFile("data/baseinput/sql/people1.txt" ) val peopleRDD: RDD [People1 ] = fileRDD.map(_.split("\\s+" )).map(x => People1 (x(0 ).toInt, x(1 ), x(2 ).toInt)) val peopleDF: DataFrame = peopleRDD.toDF() peopleDF.show() peopleDF.printSchema() peopleDF.select("name" ).show() peopleDF.select(col("name" )).show() peopleDF.select(column("name" )).show() peopleDF.select('name).show() peopleDF.select("name" , "age" ).show() peopleDF.select(col("name" ), col("age" )).show() peopleDF.select(column("name" ), column("age" )).show() peopleDF.select('name, 'age).show() peopleDF.select(col("name" ), column("age" )).show() peopleDF.select(col("name" ), 'age).show() peopleDF.select(col("name" ), col("age" ) + 1 ).show() peopleDF.select(col("name" ), column("age" ) + 1 ).show() peopleDF.select('name, 'age + 1 ).show() peopleDF.filter(col("age" ) > 25 ).show() peopleDF.filter('age > 25 ).show() println(peopleDF.filter('age > 25 ).count()) val re1: DataFrame = peopleDF.groupBy("age" ).count() re1.show() re1.printSchema() re1.orderBy("count" ).show() re1.orderBy('count).show() peopleDF.groupBy("age" ).count().orderBy("count" ).show() spark.stop() } }
SQL案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package cn.spark.sparksqlimport org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame , SparkSession }import org.apache.spark.sql.functions._object _06_1DataFrameOpration { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .getOrCreate() import spark.implicits._ val sc: SparkContext = spark.sparkContext val fileRDD: RDD [String ] = sc.textFile("data/baseinput/sql/people1.txt" ) val peopleRDD: RDD [People1 ] = fileRDD.map(_.split("\\s+" )).map(x => People1 (x(0 ).toInt, x(1 ), x(2 ).toInt)) val peopleDF: DataFrame = peopleRDD.toDF() peopleDF.createOrReplaceTempView("peopleTable" ) spark.sql("select name from peopleTable" ).show() spark.sql("select name,age from peopleTable" ).show() spark.sql("select * from peopleTable order by age desc limit 2" ).show() spark.sql( """ |select * |from peopleTable |order by age desc |limit 2 |""" .stripMargin).show() spark.stop() } }
SparkSQL的DataSet
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package cn.spark.sparksql.wordcountimport org.apache.spark.sql.{DataFrame , Dataset , Row , SparkSession }object _01DSL { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .getOrCreate() import spark.implicits._ val df1: DataFrame = spark.read.text("data/baseinput/words.txt" ) val ds1: Dataset [String ] = spark.read.textFile("data/baseinput/words.txt" ) val value: Dataset [String ] = ds1.flatMap(_.split("\\s+" )) value.show() val result: Dataset [Row ] = value.groupBy("value" ).count().orderBy('count.desc) result.show() spark.stop() } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 package cn.spark.sparksql.wordcountimport org.apache.spark.sql.{DataFrame , Dataset , Row , SparkSession }object _02SQL { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .getOrCreate() import spark.implicits._ val df1: DataFrame = spark.read.text("data/baseinput/words.txt" ) val ds1: Dataset [String ] = spark.read.textFile("data/baseinput/words.txt" ) val value: Dataset [String ] = ds1.flatMap(_.split("\\s+" )) value.show() value.createOrReplaceTempView("table" ) val result: DataFrame = spark.sql( """ |select value,count(value) as counts |from table |group by value |order by counts desc |""" .stripMargin) result.show() spark.stop() } }
SparkSQL的数据结构之间转换(掌握)
RDD和DF和DS区别和练习
RDD通过scheme转化为df,df增加泛型转化ds
df是在运行的时候检查类型的,rdd和dataset是在编译时候进行类型检查
dataset在spark2.0之后基本dataframe统一,dataSet[ROW]=dataframe
电影评分案例(掌握)
电影数据集统计需求分析
需求:对电影评分数据进行统计分析
获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)
数据集的认知:
步骤
1-首先读取数据集
2-数据集的解析,使用RDD转DF(case class)
3-SQL操作的实现
4-使用DSL实现
5-将结果存入到csv结果文件中
6-将结果数据存入到MySQL
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 package cn.spark.sparksql.moviesProimport java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame , Dataset , Row , SaveMode , SparkSession }import org.apache.spark.sql.functions._case class MoviesRatings (userId: String , moviesId: String , ratings: Double , timestamp: String )object _01MoviesRatingLoader { def main (args: Array [String ]): Unit = { val spark: SparkSession = { val conf: SparkConf = new SparkConf () .setAppName(this .getClass.getSimpleName.stripSuffix("$" )) .setMaster("local[*]" ) .set("spark.sql.shuffle.partitions" , "4" ) val spark: SparkSession = SparkSession .builder().config(conf).getOrCreate() spark } import spark.implicits._ val fileRDD: RDD [String ] = spark.sparkContext.textFile("data/baseinput/ml-1m/ratings.dat" ) val moviesDF: DataFrame = fileRDD .filter(line => line != null && line.trim.split("::" ).length == 4 ) .mapPartitions(iter => { iter.map(line => { val arr: Array [String ] = line.split("::" ) MoviesRatings (arr(0 ), arr(1 ), arr(2 ).toDouble, arr(3 )) }) }).toDF moviesDF.show(3 , false ) moviesDF.printSchema() moviesDF.createOrReplaceTempView("movies_table" ) val sql = """ |select moviesId,round(avg(ratings),2) as avg_rating,count(moviesId) as rnt_rating |from movies_table |group by moviesId |having rnt_rating>2000 |order by avg_rating desc,rnt_rating desc |limit 10 |""" .stripMargin println("sql funtions way is:..........." ) val resultDF: Dataset [Row ] = moviesDF .select("moviesId" , "ratings" ) .groupBy("moviesId" ) .agg( round(avg("ratings" ), 2 ).as("avg_rating" ), count("moviesId" ).as("rnt_rating" ) ) .filter('rnt_rating > 2000 ) .orderBy($"avg_rating" .desc, 'rnt_rating.desc) .limit(10 ) println("dsl funtions way is:..........." ) resultDF.show() resultDF.write .format("jdbc" ) .mode(SaveMode .Overwrite ) .option("url" , "jdbc:mysql://localhost:3306/bigdata" ) .option("dbtable" , "tb_top10_movie" ) .option("user" , "root" ) .option("password" , "root" ) .save() println("data write finished!" ) println("data reader finished!" ) val connectionProperties = new Properties () connectionProperties.put("user" , "root" ) connectionProperties.put("password" , "root" ) val jdbcDF2 = spark.read .jdbc("jdbc:mysql://localhost:3306/bigdata" , "tb_top10_movie" , connectionProperties) jdbcDF2.show() spark.stop() } }
Spark SQL整合Hive SparkSQL的和Hive本地集成(为了测试)
SparkSQL和Hive本地测试步骤
0-数据
1,zhangsan,30
2,lisi,40
3,wangwu,50
1-引入maven的依赖,spark_hive
1 2 3 4 5 <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-hive_2.11</artifactId > <version > 2.4.5</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package cn.spark.sparksql.toHiveimport org.apache.spark.sql.SparkSession object _01SparkToHive { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .enableHiveSupport() .getOrCreate() spark.sql("show databases" ).show() spark.sql("create table if not exists student2(id int,name String,age int) row format delimited fields terminated by ','" ) spark.sql("load data local inpath 'data/baseinput/sql/hive/students.csv' overwrite into table student2" ) spark.sql("select * from student2 where age >30" ).show() } }
Spark整合集群环境的Hive
SParkOnHive
Hive—HiveOnMR
Shark—-HiveOnSpark—大部分用的都是Hive
SparkSQL—-独立实现的一套引擎
SparkSQL+HIve整合—-SparkOnHive –除了使用Hive元数据其他均使用SparkSQL的东西
如何搭建SparkOnHive环境?
1-首先让Spark知道Hive元数据的信息在哪里,需要将hive-site.xml放入saprk的conf目录下
2-需要将hive-site.xml分发到其他两台上
3-需要将mysql的jar包放在spark的jars目录下
启动metastore
1 nohup /export/server/hive/bin/hive --service metastore 2>&1 >> /var/log.log &
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package cn.spark.sparkbaseimport org.apache.spark.sql.SparkSession object _02SparkToHive { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .config("spark.sql.shuffle.partitions" , "4" ) .config("spark.sql.warehouse.dir" , "hdfs://node1:8020/user/hive/warehouse" ) .config("hive.metastore.uris" , "thrift://node3:9083" ) .enableHiveSupport() .getOrCreate() spark.sql("show databases" ).show() spark.sql("use sparkhive3" ) spark.sql("create table if not exists student2(id int,name String,age int) row format delimited fields terminated by ','" ) spark.sql("show tables" ).show() spark.sql("load data local inpath 'data/baseinput/sql/hive/students.csv' overwrite into table student2" ) spark.sql( """ |select * |from student2 |where age >30 |""" .stripMargin).show() spark.stop() } }
两种启动方式
Spark的CLI方式
Spark的Beeline方式,thrift
1 Spark的ThriftServer服务(类似于Hive的HiveServer2),在通过Beeline连接执行SQL
1 2 3 4 5 SPARK_HOME=/export/server/spark $ SPARK_HOME/sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=10001 \\同时连hive可换一个port --hiveconf hive.server2.thrift.bind.host=node3 \ --master local[2]
SparkSQL的UDF函数(必须掌握)
编程模式sparksession使用UDF 需要先注册
spark.udf.register() 方式
1 2 3 4 spark.udf.register("strLen" , (str: String ) => str.length()) spark.sql("select name,strLen(name) as name_len from user" ).show(false )
1 2 3 4 5 6 7 8 9 def getStrLen (str: String ): Int = { str.length } spark.udf.register("strLen" , getStrLen _) spark.sql("select name,strLen(name) as name_len from user" ).show(false )
SparkSQL中使用UDF (如果是打包单个类注意指定META-INF/MANIFEST.MF在src下,不要放在resouces,不然会找不到主类,可带一个空main函数类,注册时就不会报类路径错误)
方式一:在启动spark-sql时通过–jars指定
1 2 3 4 cd $SPARK_HOME/bin spark-sql --jars /home/hadoop/lib/udf.jar CREATE TEMPORARY FUNCTION hello AS 'com.luogankun.udf.HelloUDF'; select hello(url) from page_views limit 1;
1)需要先将udf.jar的路径配置到spark-env.sh的SPARK_CLASSPATH中,形如:
1 export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/software/mysql-connector-java-5.1.27-bin.jar:/home/hadoop/lib/udf.jar
2)再启动spark-sql,直接CREATE TEMPORARY FUNCTION即可;
1 2 3 4 cd $SPARK_HOME/bin spark-sql CREATE TEMPORARY FUNCTION hello AS 'com.luogankun.udf.HelloUDF'; select hello(url) from page_views limit 1;
方式三:Thrift JDBC Server中使用UDF
在beeline命令行中执行:
1 2 3 4 5 6 create function base_analizer as 'com.zhengkw.udf.BaseFieldUDF' using jar 'hdfs://hadoop102:9000/user/hive/jars/hivefunction-1.0-SNAPSHOT.jar';//hdfs路径 add jar /home/hadoop/lib/udf.jar;//本地机器路径 CREATE TEMPORARY FUNCTION hello AS 'com.luogankun.udf.HelloUDF'; select hello(url) from page_views limit 1;
spark/hive共用自定义函数(见hive udf)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package cn.spark.sparksql.udfimport org.apache.spark.rdd.RDD import org.apache.spark.sql.api.java.UDF1 import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{DataFrame , SparkSession }import org.apache.spark.sql.functions._case class Smaller (line: String )object _01wordsToBigger { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .getOrCreate() import spark.implicits._ val fileRDD: RDD [String ] = spark.sparkContext.textFile("data\\baseinput\\sql\\udf\\udf.txt" ) val wordsDF: DataFrame = fileRDD.map(x => Smaller (x)).toDF wordsDF.show() wordsDF.printSchema() spark.udf.register("wordToBigger" ,(line:String )=>{ line.toUpperCase() }) wordsDF.createOrReplaceTempView("word_view" ) val result1: DataFrame = spark.sql("select line,wordToBigger(line) as bigger from word_view" ) result1.show() val result2: DataFrame = wordsDF.select('line, callUDF("wordToBigger" , 'line).as("bigger" )) result2.show() } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package cn.spark.sparksql.udfimport org.apache.spark.rdd.RDD import org.apache.spark.sql.api.java.UDF1 import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{DataFrame , SparkSession }import org.apache.spark.sql.functions._object _02udfDemo { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .getOrCreate() import spark.implicits._ val df = Seq (("id1" , 1 ), ("id2" , 4 ), ("id3" , 5 )).toDF("id" , "value" ) spark.udf.register("simpleUDF" , (n: Int ) => n * n) df.select($"id" , callUDF("simpleUDF" , $"value" ).as("pow(x,2)" )).show() val df1 = Seq (("id1" , 1 ,6 ), ("id2" , 4 ,7 ), ("id3" , 5 ,8 )).toDF("id" , "value1" ,"value2" ) spark.udf.register("simpleUDF2" ,(v1:Int ,v2:Int )=>{ v1*v2 }) df1.createOrReplaceTempView("table_df" ) spark.sql( """ |select id,simpleUDF2(value1,value2) as simple |from table_df |""" .stripMargin).show() df1.select($"id" , callUDF("simpleUDF2" ,'value1,'value2).as("bigger" )).show() } }
UDAF了解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package cn.spark.sparksql.udfimport org.apache.spark.sql.{DataFrame , Row , SparkSession }import org.apache.spark.sql.expressions.{MutableAggregationBuffer , UserDefinedAggregateFunction }import org.apache.spark.sql.types._class SparkFunctionUDAF extends UserDefinedAggregateFunction { override def inputSchema : StructType = { StructType (StructField ("input" ,LongType )::Nil ) } override def bufferSchema : StructType = { StructType (StructField ("sum" ,LongType )::StructField ("total" ,LongType )::Nil ) } override def dataType : DataType = { DoubleType } override def deterministic : Boolean = { true } override def initialize (buffer: MutableAggregationBuffer ): Unit = { buffer(0 ) = 0 L buffer(1 ) = 0 L } override def update (buffer: MutableAggregationBuffer , input: Row ): Unit = { buffer(0 ) = buffer.getLong(0 ) + input.getLong(0 ) buffer(1 ) = buffer.getLong(1 ) + 1 } override def merge (buffer1: MutableAggregationBuffer , buffer2: Row ): Unit = { buffer1(0 ) =buffer1.getLong(0 ) + buffer2.getLong(0 ) buffer1(1 ) = buffer1.getLong(1 ) + buffer2.getLong(1 ) } override def evaluate (buffer: Row ): Any = { buffer.getLong(0 ).toDouble / buffer.getLong(1 ) } } object SparkFunctionUDAF { def main (args: Array [String ]): Unit = { val sparkSession: SparkSession = SparkSession .builder().appName("sparkUDAF" ).master("local[2]" ).getOrCreate() val employeeDF: DataFrame = sparkSession.read.json("data\\baseinput\\sql\\udf\\udaf.txt" ) employeeDF.createOrReplaceTempView("employee_table" ) sparkSession.udf.register("avgSal" ,new SparkFunctionUDAF ) sparkSession.sql("select avgSal(salary) from employee_table" ).show() sparkSession.close() } }
SparkSQL函数
Hive中的开窗函数
聚合类开窗函数
count() over(order by partition by)
排序类开窗函数
row_number() over(partition by order by) 12345
rank() over(partition by order by) 446
dense_rank() over(partition by order by) 445
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package cn.spark.sparksql.funcimport org.apache.spark.sql.SparkSession case class Score (name: String , clazz: Int , score: Int )object _01class { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .getOrCreate() import spark.implicits._ val scoreDF = spark.sparkContext.makeRDD(Array ( Score ("a1" , 1 , 80 ), Score ("a2" , 1 , 78 ), Score ("a3" , 1 , 95 ), Score ("a4" , 2 , 74 ), Score ("a5" , 2 , 92 ), Score ("a6" , 3 , 99 ), Score ("a7" , 3 , 99 ), Score ("a8" , 3 , 45 ), Score ("a9" , 3 , 55 ), Score ("a10" , 3 , 78 ), Score ("a11" , 3 , 100 )) ).toDF("name" , "class" , "score" ) scoreDF.createOrReplaceTempView("scores" ) scoreDF.printSchema() spark.sql("select count(name) from scores" ).show() spark.sql("select name,class,score,count(name) over() ccc from scores" ).show() spark.sql("select name,class,score,count(name) over(partition by class) ccc from scores" ).show() spark.sql("select name,class,score,row_number() over(order by class) sss from scores" ).show() spark.sql("select name,class,score,row_number() over(partition by class order by score) sss from scores" ).show() spark.sql("select name,class,score,rank() over(order by class) sss from scores" ).show() spark.sql("select name,class,score,rank() over(partition by class order by score) sss from scores" ).show() spark.sql("select name,class,score,dense_rank() over(order by class) sss from scores" ).show() spark.sql("select name,class,score,dense_rank() over(partition by class order by score) sss from scores" ).show() spark.sql("select name,class,score,ntile(6) over(order by class) sss from scores" ).show() spark.sql("select name,class,score,ntile(6) over(partition by class order by score) sss from scores" ).show() } }
SparkSQL底层如何执行解析成RDD
SparkStreaming引入–RDD–DStream 流数据的处理模式
SparkStreaming数据结构 SparkStreaming原理架构
SparkStreaming原理 基础原理
SparkStreaming原理深入
DStream两种算子
底层RDD@Time时间序列构成
RDD分为Transormation算子和Action算子,那个DStream如何划分呢?
答案:Transormation和OutPutOpration操作
问题:对于SparkSTreaming的DStream的操作支持的算子并没有那么多,比如排序算子
SparkStreaming解决方案利用transform的方法将DStream转化为RDD,因为RDD的transform很多的
SparkStreaming初体验-无状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package cn.spark.sparkstreamingimport org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream , ReceiverInputDStream }import org.apache.spark.streaming.{Seconds , StreamingContext }object _01baseStreaming { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val ssc = new StreamingContext (conf, Seconds (5 )) val receiveRDD: ReceiverInputDStream [String ] = ssc.socketTextStream("node1.spark.cn" , 9999 ) val flatRDD: DStream [String ] = receiveRDD.flatMap(_.split("\\s+" )) val mapRDD: DStream [(String , Int )] = flatRDD.map(x => (x, 1 )) val resultRDD: DStream [(String , Int )] = mapRDD.reduceByKey((a: Int , b: Int ) => a + b) resultRDD.print() ssc.start() ssc.awaitTermination() } }
状态计算 updateStateByKey
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 package cn.spark.sparkstreamingimport org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream , ReceiverInputDStream }import org.apache.spark.streaming.{Seconds , StreamingContext }object _02baseStreaming { def updateFunc (currentValue: Seq [Int ], historyValue: Option [Int ]): Option [Int ] = { val sumV: Int = currentValue.sum + historyValue.getOrElse(0 ) Option (sumV) } def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val ssc = new StreamingContext (conf, Seconds (5 )) ssc.checkpoint("data/baseoutput/output-4" ) val receiveRDD: ReceiverInputDStream [String ] = ssc.socketTextStream("node1.spark.cn" , 9999 ) val flatRDD: DStream [String ] = receiveRDD.flatMap(_.split("\\s+" )) val mapRDD: DStream [(String , Int )] = flatRDD.map(x => (x, 1 )) val resultRDD: DStream [(String , Int )] = mapRDD.updateStateByKey(updateFunc) val sortResultDS: DStream [(String , Int )] = resultRDD.transform(rdd => { rdd.sortBy(_._2, false ) }) sortResultDS.print() new Thread (new Runnable { override def run (): Unit = { while ( true ) { try { Thread .sleep(5000 ) } catch { case ex : Exception => println(ex) } val fs: FileSystem = FileSystem .get(new URI ("hdfs://node1:8020" ), new Configuration (), "root" ) val state: StreamingContextState = ssc.getState() if ( state == StreamingContextState .ACTIVE ) { val flg: Boolean = fs.exists(new Path ("hdfs://node1:8020/spark/stopSparkHistory/" +appName+"@" +ssc.sparkContext.getConf.getAppId)) if ( flg ) { ssc.stop(true , true ) System .exit(0 ) } } } } }).start() ssc.start() ssc.awaitTermination() } }
双流合并
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package cn.spark.sparkstreamingimport org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream , ReceiverInputDStream }import org.apache.spark.streaming.{Seconds , StreamingContext }object _06twoSourceStreaming { def updateFunc (curentValue: Seq [Int ], histouryValue: Option [Int ]): Option [Int ] = { val sum: Int = curentValue.sum + histouryValue.getOrElse(0 ) Option (sum) } def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[3]" ) val ssc = new StreamingContext (conf, Seconds (5 )) ssc.checkpoint("data/baseoutput/output-6" ) val receiveRDD1: ReceiverInputDStream [String ] = ssc.socketTextStream("node1.spark.cn" , 9999 ) val receiveRDD2: ReceiverInputDStream [String ] = ssc.socketTextStream("node1.spark.cn" , 9998 ) val receiveRDD: DStream [String ] = receiveRDD1.union(receiveRDD2) val flatRDD: DStream [String ] = receiveRDD.flatMap(_.split("\\s+" )) val mapRDD: DStream [(String , Int )] = flatRDD.map(x => (x, 1 )) val resultRDD: DStream [(String , Int )] = mapRDD.updateStateByKey(updateFunc) resultRDD.print() ssc.start() ssc.awaitTermination() } }
mapWithState
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 package cn.spark.sparkstreamingimport org.apache.commons.lang3.StringUtils import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{MapWithStateDStream , ReceiverInputDStream }import org.apache.spark.streaming.{Seconds , State , StateSpec , StreamingContext }object _04mapWithState { def main (args: Array [String ]): Unit = { val ssc: StreamingContext = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val ssc = new StreamingContext (conf, Seconds (5 )) ssc } ssc.checkpoint("data/baseoutput/output-5" ) val receiveStream: ReceiverInputDStream [String ] = ssc.socketTextStream("node1.spark.cn" , 9999 ) val result: MapWithStateDStream [String , Int , Int , Any ] = receiveStream .filter(line => StringUtils .isNotBlank(line)) .flatMap(_.split("\\s+" )) .map((_, 1 )) .mapWithState(StateSpec .function(mappingFunction)) result.print() ssc.start() ssc.awaitTermination() } val mappingFunction = (word: String , option: Option [Int ], state: State [Int ]) => { if (state.isTimingOut()) { println(word + "is time out.." ) } else { val sum: Int = option.getOrElse(0 ) + state.getOption().getOrElse(0 ) val keyFreq: (String , Int ) = (word, sum) state.update(sum) keyFreq } } }
窗口计算
窗口长度:被包含在窗口中的数据需要被处理
窗口滑动时间间隔:窗口每过多久滑动一次
数据处理时间5s处理一次,streamigcontect(second(5))
切分批次:没隔多久处理一次,一般设置5s
窗口长度:被包括在窗口中的数据需要被计算
窗口滑动时间间隔;窗口每隔多久计算一次
当窗口的长度=窗口的滑动时间间隔,不会造成数据丢失或重复
当窗口的长度<滑动的时间间隔,会造成数据丢失
当窗口的长度>滑动时间间隔,造成数据重复,
(重复得看最后的数据怎么使用,如果说是每5min统计最近1小时的数据,只要这一小时的数据,没有重复可言,如果是要保留数据最后累计,就会有重复累计问题。 )
上述案例下,窗口的长度设置为17s,不能随便设置
一般设置,窗口的滑动时间间隔和窗口的长度必须是数据处理时间的整数倍,否则rdd需要被分为两部分,违反了rdd的不可变
案例:
每隔10s统计一下10s的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package cn.spark.sparkstreamingimport org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream , ReceiverInputDStream }import org.apache.spark.streaming.{Seconds , StreamingContext }object _07windowsOpration { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[3]" ) val ssc = new StreamingContext (conf, Seconds (5 )) ssc.checkpoint("data/baseoutput/output-6" ) val receiveRDD: ReceiverInputDStream [String ] = ssc.socketTextStream("node1.spark.cn" , 9999 ) val flatRDD: DStream [String ] = receiveRDD.flatMap(_.split("\\s+" )) val mapRDD: DStream [(String , Int )] = flatRDD.map(x => (x, 1 )) val resultRDD: DStream [(String , Int )] = mapRDD.reduceByKeyAndWindow( (a: Int , b: Int ) => a + b, Seconds (10 ), Seconds (10 )) resultRDD.print() ssc.start() ssc.awaitTermination() } }
换种API实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package cn.spark.sparkstreamingimport org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream , ReceiverInputDStream }import org.apache.spark.streaming.{Seconds , StreamingContext }object _08windowsOpration { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[3]" ) val ssc = new StreamingContext (conf, Seconds (5 )) ssc.checkpoint("data/baseoutput/output-6" ) val receiveRDD: ReceiverInputDStream [String ] = ssc.socketTextStream("node1.spark.cn" , 9999 ) val flatRDD: DStream [String ] = receiveRDD.flatMap(_.split("\\s+" )) val mapRDD: DStream [(String , Int )] = flatRDD.map(x => (x, 1 )) val resultRDD: DStream [(String , Int )] = mapRDD.window(Seconds (10 ), Seconds (10 )) val resultRDD1: DStream [(String , Int )] = resultRDD.transform(rdd => { rdd.reduceByKey(_ + _) }) resultRDD1.print() ssc.start() ssc.awaitTermination() } }
增加排序的功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package cn.spark.sparkstreamingimport org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream , ReceiverInputDStream }import org.apache.spark.streaming.{Seconds , StreamingContext }object _08windowsOpration { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[3]" ) val ssc = new StreamingContext (conf, Seconds (5 )) ssc.checkpoint("data/baseoutput/output-6" ) val receiveRDD: ReceiverInputDStream [String ] = ssc.socketTextStream("node1.spark.cn" , 9999 ) val flatRDD: DStream [String ] = receiveRDD.flatMap(_.split("\\s+" )) val mapRDD: DStream [(String , Int )] = flatRDD.map(x => (x, 1 )) val resultRDD: DStream [(String , Int )] = mapRDD.window(Seconds (10 ), Seconds (5 )) val resultRDD1: DStream [(String , Int )] = resultRDD.transform(rdd => { val reduceRDD: RDD [(String , Int )] = rdd.reduceByKey(_ + _) reduceRDD.sortBy(_._2, false ) }) resultRDD1.print() ssc.start() ssc.awaitTermination() } }
SparkStreaming和SparkSQL整合 SparkStreaming和基本数据源(监控HDFS文件夹下内容同格式文件)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package cn.spark.sparkstreaming.filesourceimport org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds , StreamingContext }object _01textFileSource { def updateFunc (currenValue: Seq [Int ], historyValue: Option [Int ]): Option [Int ] = { val sum: Int = currenValue.sum + historyValue.getOrElse(0 ) Some (sum) } def main (args: Array [String ]): Unit = { val ssc: StreamingContext = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val ssc = new StreamingContext (conf, Seconds (5 )) ssc } ssc.checkpoint("data/baseoutput/output-7" ) val rddDS: DStream [String ] = ssc.textFileStream("hdfs://node1.spark.cn:8020/wordcount/trans/" ) val resultRDD: DStream [(String , Int )] = rddDS .flatMap(_.split("\\s+" )) .map((_, 1 )) .updateStateByKey(updateFunc) resultRDD.print() ssc.start() ssc.awaitTermination() ssc.stop(true ,true ) } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 package cn.spark.sparkstreaming.filesourceimport org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame , SparkSession }import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds , StreamingContext }object _03textFileSourceSparkSQL { def updateFunc (currenValue: Seq [Int ], historyValue: Option [Int ]): Option [Int ] = { val sum: Int = currenValue.sum + historyValue.getOrElse(0 ) Some (sum) } def main (args: Array [String ]): Unit = { val ssc: StreamingContext = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val ssc = new StreamingContext (conf, Seconds (5 )) ssc } ssc.checkpoint("data/baseoutput/output-7" ) val rddDS: DStream [String ] = ssc.textFileStream("hdfs://node1.spark.cn:8020/wordcount/trans/" ) val resultRDD: DStream [String ] = rddDS.flatMap(_.split("\\s+" )) val resultValue: DStream [(String , Int )] = resultRDD.map((_, 1 )).updateStateByKey(updateFunc) resultValue.foreachRDD(rdd => { val spark = SparkSession .builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ val df: DataFrame = rdd.toDF("word" , "count" ) df.createOrReplaceTempView("table_view" ) val result: DataFrame = spark.sql( """ |select * |from table_view |""" .stripMargin) result.show() }) ssc.start() ssc.awaitTermination() ssc.stop(true , true ) } }
SparkStreaming和Kafka整合 kafka原理
Kafka
什么是消息:应用之间传输的数据
什么是消息队列:应用之间保障消息传递的准确性
消息队列分类:点对点,发布订阅者方式
消息队列应用:限流消峰,应用解耦
Kafka:Scala+Java混编,LinkedIn(ActiveMQ)
Kafka:生产者,消费者,Broker
当前的虚拟机的Kafka的版本是1,1.0大家之前学习的是2,4,1,高版本命令更多使用的是boostrap-server替代zk
Kafka安装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # 当前版本是1.1.0 大家使用的是2.4.1 # 1-kafka启动 nohup bin/kafka-server-start.sh config/server.properties & # 2-查看kafka内部的topic的内容 /export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181 # 3-描述topic /export/server/kafka/bin/kafka-topics.sh --describe --zookeeper node1:2181 --topic spark_kafka # 创建topic /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic spark_kafka # 描述topic /export/server/kafka/bin/kafka-topics.sh --describe --zookeeper node1:2181 --topic spark_kafka # 删除opic /export/server/kafka/bin/kafka-topics.sh --zookeeper node1:2181 --delete --topic test # 生产者和消费者 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark_kafka /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark_kafka --from-beginning
整合两种方式区别
实现SparkStreaming和kafka整合
两个版本:0.8级0.10以上,目前企业中使用的是0.10版本以上
两种消费方式:
Receiver:启动多个Receiver接收器线程拉取kafka数据过来合并,使用WAL和checkpount等
问题1:需要开启多个Receiver线程拉取数据,需要合并数据源
问题2:使用的HighLevelAPI对接的是offset维护在ZK中
问题3:需要大量的WAL或checkpoint影响效率
Direct:直接采用直接连接kafka的topic的partition数据,获取区中offset
解决1:直接使用spark的rdd的一个分区对接的是kafka的一个partition
解决2:使用的是lowlevel api将offset在kafka的topic中存放的,还可以手动设置
解决3:没有提供WAL的方式
几个特点:
kafka+sparkstreaming消费者消费数据
SparkStreaming010整合Kafka
1 2 3 4 5 <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-streaming-kafka-0-10_2.11</artifactId > <version > 2.4.5</version > </dependency >
2-调用streamingCOntext
3-KafkaUtils.creatDriectlyStream的方法直接连接Kafka集群的分区
在大多数情况下使用此功能,它将在所有执行程序之间一致地分配分区。
LocationStrategies 的几种方式。
1 1. LocationStrategies .PreferBrokers ()
仅仅在你 spark 的 executor 在相同的节点上,优先分配到存在 kafka broker 的机器上;
1 2. LocationStrategies .PreferConsistent ();
大多数情况下使用,一致性的方式分配分区所有 executor 上。(主要是为了分布均匀)
1 2 3. LocationStrategies .PreferFixed (hostMap: collection.Map [TopicPartition , String ])4. LocationStrategies .PreferFixed (hostMap: ju.Map [TopicPartition , String ])
如果你的负载不均衡,可以通过这两种方式来手动指定分配方式,其他没有在 map 中指定的,均采用 preferConsistent() 的方式分配;
4-获取record记录中的value的值
5-根据value进行累加求和wordcount
6-ssc.statrt
7-ssc.awaitTermination
8-ssc.stop(true,true)
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 package cn.spark.sparkstreaming.kafkaimport org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream , InputDStream }import org.apache.spark.streaming.kafka010.{ConsumerStrategies , KafkaUtils , LocationStrategies }import org.apache.spark.streaming.{Seconds , StreamingContext }object _01SparkStreamingKafkaAuto { def updateFunc (curentValue: Seq [Int ], histouryValue: Option [Int ]): Option [Int ] = { val sum: Int = curentValue.sum + histouryValue.getOrElse(0 ) Option (sum) } val kafkaParams = Map [String , Object ]( "bootstrap.servers" -> "node1:9092" , "key.deserializer" -> classOf[StringDeserializer ], "value.deserializer" -> classOf[StringDeserializer ], "group.id" -> "spark_group" , "auto.offset.reset" -> "latest" , "enable.auto.commit" -> (true : java.lang.Boolean ), "auto.commit.interval.ms" -> "1000" ) def main (args: Array [String ]): Unit = { val ssc: StreamingContext = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val ssc = new StreamingContext (conf, Seconds (5 )) ssc } ssc.checkpoint("data/baseoutput/cck1" ) val streamRDD: InputDStream [ConsumerRecord [String , String ]] = KafkaUtils .createDirectStream[String , String ](ssc, LocationStrategies .PreferConsistent , ConsumerStrategies .Subscribe [String , String ](Array ("spark_kafka" ), kafkaParams)) val mapValue: DStream [String ] = streamRDD.map(_.value()) val resultRDD: DStream [(String , Int )] = mapValue .flatMap(_.split("\\s+" )) .map((_, 1 )) .updateStateByKey(updateFunc) resultRDD.print() ssc.start() ssc.awaitTermination() ssc.stop(true , true ) } }
手动提交偏移量 (实现恰好一次,但还是存在断网没有及时提交数据)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 package cn.spark.sparkstreaming.kafkaimport org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream , InputDStream }import org.apache.spark.streaming.kafka010.{CanCommitOffsets , ConsumerStrategies , HasOffsetRanges , KafkaUtils , LocationStrategies , OffsetRange }import org.apache.spark.streaming.{Seconds , StreamingContext }object _02SparkStreamingKafkaByPass { def updateFunc (curentValue: Seq [Int ], histouryValue: Option [Int ]): Option [Int ] = { val sum: Int = curentValue.sum + histouryValue.getOrElse(0 ) Option (sum) } val kafkaParams = Map [String , Object ]( "bootstrap.servers" -> "node1:9092" , "key.deserializer" -> classOf[StringDeserializer ], "value.deserializer" -> classOf[StringDeserializer ], "group.id" -> "spark_group" , "auto.offset.reset" -> "latest" , "enable.auto.commit" -> (false : java.lang.Boolean ) ) def main (args: Array [String ]): Unit = { val ssc: StreamingContext = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val ssc = new StreamingContext (conf, Seconds (5 )) ssc } ssc.checkpoint("data/baseoutput/cck2" ) val streamRDD: InputDStream [ConsumerRecord [String , String ]] = KafkaUtils .createDirectStream[String , String ](ssc, LocationStrategies .PreferConsistent , ConsumerStrategies .Subscribe [String , String ](Array ("spark_kafka" ), kafkaParams)) streamRDD.foreachRDD(f => { if (f.count() > 0 ) { println("rdd is:" , f) f.foreach(record => { println("record result is:" , record) val value: String = record.value() println("value is:" , value) }) } val offsetRanges: Array [OffsetRange ] = f.asInstanceOf[HasOffsetRanges ].offsetRanges for (offsetRange <- offsetRanges){ println(s"topic:${offsetRange.topic} partition:${offsetRange.partition} fromoffset:${offsetRange.fromOffset} endoffset:${offsetRange.untilOffset} " ) } streamRDD.asInstanceOf[CanCommitOffsets ].commitAsync(offsetRanges) }) val mapValue: DStream [String ] = streamRDD.map(_.value()) val resultRDD: DStream [(String , Int )] = mapValue .flatMap(_.split("\\s+" )) .map((_, 1 )) .updateStateByKey(updateFunc) resultRDD.print() ssc.start() ssc.awaitTermination() ssc.stop(true , true ) } }
代码模板抽取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 package cn.spark.sparkstreaming.kafkaimport org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream , InputDStream }import org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds , StreamingContext }object _03SparkStreamingKafkaModel { def updateFunc (curentValue: Seq [Int ], histouryValue: Option [Int ]): Option [Int ] = { val sum: Int = curentValue.sum + histouryValue.getOrElse(0 ) Option (sum) } val kafkaParams = Map [String , Object ]( "bootstrap.servers" -> "node1:9092" , "key.deserializer" -> classOf[StringDeserializer ], "value.deserializer" -> classOf[StringDeserializer ], "group.id" -> "spark_group" , "auto.offset.reset" -> "latest" , "enable.auto.commit" -> (false : java.lang.Boolean ) ) def main (args: Array [String ]): Unit = { val ssc: StreamingContext = { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val ssc = new StreamingContext (conf, Seconds (5 )) ssc } ssc.checkpoint("data/baseoutput/cck3" ) compute(ssc) ssc.start() ssc.awaitTermination() ssc.stop(true , true ) } def compute (ssc: StreamingContext ): Unit = { val streamRDD: InputDStream [ConsumerRecord [String , String ]] = KafkaUtils .createDirectStream[String , String ](ssc, LocationStrategies .PreferConsistent , ConsumerStrategies .Subscribe [String , String ](Array ("spark_kafka" ), kafkaParams)) streamRDD.foreachRDD(f => { if (f.count() > 0 ) { println("rdd is:" , f) f.foreach(record => { println("record result is:" , record) val value: String = record.value() println("value is:" , value) }) } val offsetRanges: Array [OffsetRange ] = f.asInstanceOf[HasOffsetRanges ].offsetRanges for (offsetRange <- offsetRanges) { println(s"topic:${offsetRange.topic} partition:${offsetRange.partition} fromoffset:${offsetRange.fromOffset} endoffset:${offsetRange.untilOffset} " ) } streamRDD.asInstanceOf[CanCommitOffsets ].commitAsync(offsetRanges) }) val mapValue: DStream [String ] = streamRDD.map(_.value()) val resultRDD: DStream [(String , Int )] = mapValue .flatMap(_.split("\\s+" )) .map((_, 1 )) .updateStateByKey(updateFunc) resultRDD.print() } }
Checkpoint 恢复
当Streaming Application再次运行时,从Checkpoint检查点目录恢复时,有时有问题,比如修改程序,再次从运行时 ,可能出现类型转换异常 ,如下所示:
ERROR Utils: Exception encountered java.lang.ClassCastException: cannot assign instance of cn.spark.spark.ckpt.StreamingCkptState$$anonfun$streamingProcess$1 to field org.apache.spark.streaming.dstream.ForEachDStream.org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc of type scala.Function2 in instance of org.apache.spark.streaming.dstream.ForEachDStream at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
原因在于修改DStream转换操作,在检查点目录中存储的数据没有此类的相关代码,所以保存ClassCastException异常。
此时无法从检查点读取偏移量信息和转态信息,所以实际开发人员:SparkStreaming中Checkpoint功能,属于鸡肋,食之无味,弃之可惜。解决方案:
l 1)、针对状态信息:当应用启动时,从外部存储系统读取最新状态,比如从MySQL表读取,或者从Redis读取;
l 2)、针对偏移量数据:自己管理偏移量,将偏移量存储到MySQL表、Zookeeper、HBase或Redis;
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 package cn.spark.sparkstreaming.kafkaimport org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream , InputDStream }import org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds , StreamingContext }object _04getActiveOrCreate { def updateFunc (curentValue: Seq [Int ], histouryValue: Option [Int ]): Option [Int ] = { val sum: Int = curentValue.sum + histouryValue.getOrElse(0 ) Option (sum) } val kafkaParams = Map [String , Object ]( "bootstrap.servers" -> "node1:9092" , "key.deserializer" -> classOf[StringDeserializer ], "value.deserializer" -> classOf[StringDeserializer ], "group.id" -> "spark_group" , "auto.offset.reset" -> "latest" , "enable.auto.commit" -> (false : java.lang.Boolean ) ) def main (args: Array [String ]): Unit = { val CHECKPOINT = "data/baseoutput/cck5" val ssc: StreamingContext = StreamingContext .getActiveOrCreate(CHECKPOINT , () => { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val ssc = new StreamingContext (conf, Seconds (5 )) ssc.checkpoint(CHECKPOINT ) compute(ssc) ssc }) ssc.start() ssc.awaitTermination() ssc.stop(true , true ) } def compute (ssc: StreamingContext ): Unit = { val streamRDD: InputDStream [ConsumerRecord [String , String ]] = KafkaUtils .createDirectStream[String , String ](ssc, LocationStrategies .PreferConsistent , ConsumerStrategies .Subscribe [String , String ](Array ("spark_kafka" ), kafkaParams)) streamRDD.foreachRDD(f => { if (f.count() > 0 ) { println("rdd is:" , f) f.foreach(record => { println("record result is:" , record) val value: String = record.value() println("value is:" , value) }) } val offsetRanges: Array [OffsetRange ] = f.asInstanceOf[HasOffsetRanges ].offsetRanges for (offsetRange <- offsetRanges) { println(s"topic:${offsetRange.topic} partition:${offsetRange.partition} fromoffset:${offsetRange.fromOffset} endoffset:${offsetRange.untilOffset} " ) } streamRDD.asInstanceOf[CanCommitOffsets ].commitAsync(offsetRanges) }) val mapValue: DStream [String ] = streamRDD.map(_.value()) val resultRDD: DStream [(String , Int )] = mapValue .flatMap(_.split("\\s+" )) .map((_, 1 )) .updateStateByKey(updateFunc) resultRDD.print() } }
偏移量存储在MySQL中
1 2 3 4 5 6 7 CREATE TABLE `t_offset` ( `topic` varchar(255) NOT NULL, `partition` int(11) NOT NULL, `groupid` varchar(255) NOT NULL, `offset` bigint(20) DEFAULT NULL, PRIMARY KEY (`topic`,`partition`,`groupid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
查询topic的信息
查看官网对于TopicPerition的讲解
1-MySQL的偏移量查询或保存的工具类提供
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package cn.spark.sparkstreaming.kafka.toMySQLimport java.sql.{DriverManager , ResultSet }import org.apache.kafka.common.TopicPartition import org.apache.spark.streaming.kafka010.OffsetRange import scala.collection.mutableobject OffsetUtil { def getOffsetMap (groupid: String , topic: String ) = { val connection = DriverManager .getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8" , "root" , "root" ) val pstmt = connection.prepareStatement("select * from t_offset where groupid=? and topic=?" ) pstmt.setString(1 , groupid) pstmt.setString(2 , topic) val rs: ResultSet = pstmt.executeQuery() val offsetMap = mutable.Map [TopicPartition , Long ]() while (rs.next()) { offsetMap += new TopicPartition (rs.getString("topic" ), rs.getInt("partition" )) -> rs.getLong("offset" ) } rs.close() pstmt.close() connection.close() offsetMap } def saveOffsetRanges (groupid: String , offsetRange: Array [OffsetRange ]) = { val connection = DriverManager .getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8" , "root" , "root" ) val pstmt = connection.prepareStatement("replace into t_offset (`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)" ) for (o <- offsetRange) { pstmt.setString(1 , o.topic) pstmt.setInt(2 , o.partition) pstmt.setString(3 , groupid) pstmt.setLong(4 , o.untilOffset) pstmt.executeUpdate() } pstmt.close() connection.close() } } }
2-使用MySQL偏移量的工具类给出偏移量
分析一下,什么时候需要从mysql中读取偏移量
首先,MysQL中是有offset的记录,才可以读取,判断Map的数是否有空
接着,如果Mysql没有记录,拉取最新的偏移量直接消费即可
接着,如果MySQL存放了记录,直接读取,直接在subscribe增加第三个参数
分析一下,什么时候应该将偏移量写入Mysql
每次有新的数据消费需要更新MySQL的Offset(utilOffset)
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 package cn.spark.sparkstreaming.kafka.toMySQLimport org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream , InputDStream }import org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds , StreamingContext }import scala.collection.mutableobject _01getActiveOrCreate { def updateFunc (curentValue: Seq [Int ], histouryValue: Option [Int ]): Option [Int ] = { val sum: Int = curentValue.sum + histouryValue.getOrElse(0 ) Option (sum) } val kafkaParams = Map [String , Object ]( "bootstrap.servers" -> "node1:9092" , "key.deserializer" -> classOf[StringDeserializer ], "value.deserializer" -> classOf[StringDeserializer ], "group.id" -> "spark_group" , "auto.offset.reset" -> "latest" , "enable.auto.commit" -> (false : java.lang.Boolean ) ) def main (args: Array [String ]): Unit = { val CHECKPOINT = "data/baseoutput/cck6" val ssc: StreamingContext = StreamingContext .getActiveOrCreate(CHECKPOINT , () => { val conf: SparkConf = new SparkConf ().setAppName(this .getClass.getSimpleName.stripSuffix("$" )).setMaster("local[*]" ) val ssc = new StreamingContext (conf, Seconds (5 )) ssc.checkpoint(CHECKPOINT ) compute(ssc) ssc }) ssc.start() ssc.awaitTermination() ssc.stop(true , true ) } def compute (ssc: StreamingContext ): Unit = { val offsetMap: mutable.Map [TopicPartition , Long ] = OffsetUtil .getOffsetMap("spark_group" , "spark_kafka" ) var streamRDD: InputDStream [ConsumerRecord [String , String ]] = null if (offsetMap.size > 0 ) { println("如果MySQL中有记录offset,则应该从该offset处开始消费" ) streamRDD = KafkaUtils .createDirectStream[String , String ](ssc, LocationStrategies .PreferConsistent , ConsumerStrategies .Subscribe [String , String ](Array ("spark_kafka" ), kafkaParams, offsetMap)) } else { println("如果MySQL中没有记录offset,则直接连接,从latest开始消费" ) streamRDD = KafkaUtils .createDirectStream[String , String ](ssc, LocationStrategies .PreferConsistent , ConsumerStrategies .Subscribe [String , String ](Array ("spark_kafka" ), kafkaParams)) } streamRDD.foreachRDD(f => { if (f.count() > 0 ) { f.foreach(record => { println("record result is:" , record) val value: String = record.value() }) } val offsetRanges: Array [OffsetRange ] = f.asInstanceOf[HasOffsetRanges ].offsetRanges for (offsetRange <- offsetRanges) { println(s"topic:${offsetRange.topic} partition:${offsetRange.partition} fromoffset:${offsetRange.fromOffset} endoffset:${offsetRange.untilOffset} " ) } OffsetUtil .saveOffsetRanges("spark_group" ,offsetRanges) }) val mapValue: DStream [String ] = streamRDD.map(_.value()) val resultRDD: DStream [(String , Int )] = mapValue .flatMap(_.split("\\s+" )) .map((_, 1 )) .updateStateByKey(updateFunc) resultRDD.print() } }
偏移量也可以保存至Zookeeper上或者Redis中,原因如下:
l 1)、保存Zookeeper上:方便使用Kafka 监控工具管理Kafka 各个Topic被消费信息;
l 2)、保存Redis上:从Redis读取数据和保存数据很快,基于内存数据库;
代码可以进一步优化,提高性能:由于每批次数据结果RDD输出以后,都需要向MySQL数据库表更新偏移量数据,频繁连接数据库,建议构建数据库连接池,每次从池子中获取连接。
StructuredStreamig引入 StructuredStreamig定义
StructuredStreaming结构化流 –处理实时数据(流式数据)
流式处理的方式 :(1)原生的处理方式:来一条数据处理一条 结构化流和Flink(2)微批次处理方式 :SParkStreaming
底层数据结构 :底层数据结构类似SparkSQL,使用DataFrame和DataSet
StructuredStreaming与sparkstreaming区别 SparkStreaming问题四点
1-SparkStreaing的时间是基于ProcessingTime处理时间,而不是EventTime事件时间
2-SparkStreaming编程API偏底层,本质上就是要去构造RDD的DAG执行图
3-SparkStreaming无法实现端到端的一致性,DStream 只能保证自己的一致性语义是 exactly-once 的 ,而 input 接入 Spark Streaming 和 Spark Straming 输出到外部存储的语义往往需要用户自己来保证;
4-SparkStreaming无法实现流批统一,有时候确实需要将的流处理逻辑运行到批数据上面,转化成RDD需要一定的工作量
StructuredStreaming应用场景
结构化流解决SparkStreaing的问题
1-结构化流是基于EventTime事件时间处理
2-使用DataFrame和DataSet的API
3-结构化流实现从source到sink的端到端的一致性exactly-once
4-结构化流实现批处理和流式处理的统一
结构化流使用两种API
ProcesstimgTime–微批次处理场景
CotinuceTime-实时的场景
结构化流编程模型
入门案例 StructuredStreaming第一个案例-Socket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package cn.ispark.structedstreamingimport java.util.concurrent.TimeUnit import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkConf import org.apache.spark.sql.streaming.{OutputMode , StreamingQuery , Trigger }import org.apache.spark.sql.{DataFrame , Dataset , Row , SparkSession }object _01socketSource { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf () .setAppName(this .getClass.getSimpleName.stripSuffix("$" )) .setMaster("local[*]" ) val spark: SparkSession = SparkSession .builder() .config(conf) .config("spark.sql.shuffle.partitions" ,"4" ) .getOrCreate() import spark.implicits._ val streamData: DataFrame = spark.readStream .format("socket" ) .option("host" , "node1" ) .option("port" , 9999 ) .load() streamData.printSchema() val result: Dataset [Row ] = streamData .as[String ] .filter(StringUtils .isNoneBlank(_)) .flatMap(x => x.split("\\s+" )) .groupBy("value" ) .count() .orderBy('count.desc) val query: StreamingQuery = result.writeStream .format("console" ) .outputMode(OutputMode .Complete ()) .option("numRows" , 5 ) .option("truncate" , "false" ) .trigger(Trigger .ProcessingTime (0 )) .start() query.awaitTermination() query.stop() } }
StructuredStreaming文本数据源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package cn.ispark.structedstreaming.filesourceimport org.apache.spark.SparkConf import org.apache.spark.sql.types.{DataTypes , StructField , StructType }import org.apache.spark.sql.{DataFrame , SparkSession }import org.apache.spark.sql.functions._import org.apache.spark.sql.streaming.{OutputMode , StreamingQuery , Trigger }object _01FileSource { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf () .setAppName(this .getClass.getSimpleName.stripSuffix("$" )) .setMaster("local[*]" ) val spark: SparkSession = SparkSession .builder() .config(conf) .config("spark.sql.shuffle.partitions" , "4" ) .getOrCreate() val schema: StructType = StructType ( StructField ("name" , DataTypes .StringType , true ) :: StructField ("age" , DataTypes .IntegerType , true ) :: StructField ("hobby" , DataTypes .StringType , true ) :: Nil ) val streamDF: DataFrame = spark.readStream .format("csv" ) .option("sep" , ";" ) .option("header" , "false" ) .schema(schema) .load("data/baseinput/struct/" ) streamDF.printSchema() import spark.implicits._ val result: DataFrame = streamDF .filter('age < 25 ) .groupBy('hobby) .count() val query: StreamingQuery = result.writeStream .format("console" ) .outputMode(OutputMode .Complete ()) .option("numRows" , 10 ) .option("truncate" , "false" ) .trigger(Trigger .ProcessingTime (0 )) .start() query.awaitTermination() query.stop() } }
StructuredStreaming的Kakfa整合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 package cn.ispark.structedstreaming.kafkaimport org.apache.spark.SparkConf import org.apache.spark.sql.streaming.{OutputMode , StreamingQuery , Trigger }import org.apache.spark.sql.{DataFrame , Dataset , Row , SparkSession }object _01KafkaSourceWordcount { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf () .setAppName(this .getClass.getSimpleName.stripSuffix("$" )) .setMaster("local[*]" ) val spark: SparkSession = SparkSession .builder() .config(conf) .config("spark.sql.shuffle.partitions" , "4" ) .getOrCreate() import spark.implicits._ val streamDF: DataFrame = spark.readStream .format("kafka" ) .option("kafka.bootstrap.servers" , "node1:9092" ) .option("subscribe" , "wordstopic" ) .load() val result: Dataset [Row ] = streamDF .selectExpr("cast (value as string)" ) .as[String ] .flatMap(x => x.split("\\s+" )) .groupBy($"value" ) .count() .orderBy('count.desc) val query: StreamingQuery = result .writeStream .format("console" ) .outputMode(OutputMode .Complete ()) .trigger(Trigger .ProcessingTime (0 )) .option("numRows" , 10 ) .option("truncate" , false ) .start() query.awaitTermination() query.stop() } }
总结:
Kafka整合代码需要掌握
数据结果写入kafka
1 2 3 4 5 6 7 8 val ds = df .selectExpr("CAST(key AS STRING)" , "CAST(value AS STRING)" ) .writeStream .format("kafka" ) .option("kafka.bootstrap.servers" , "host1:port1,host2:port2" ) .option("topic" , "topic1" ) .start()
运营商基站数据ETL
案例
准备Kafka的topic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 # 实时ETL案例1 # 查看topic信息 /export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181 # 删除topic /export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic stationTopic /export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic etlTopic # 创建topic /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic stationTopic /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic etlTopic # 模拟生产者 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic stationTopic /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic etlTopic # 模拟消费者----------stationTopic /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic stationTopic --from-beginning /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic etlTopic --from-beginning
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 package cn.ispark.structedstreaming.kafkaimport java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer , ProducerRecord }import org.apache.kafka.common.serialization.StringSerializer import scala.util.Random object MockStationLog { def main (args: Array [String ]): Unit = { val props = new Properties () props.put("bootstrap.servers" , "node1:9092" ) props.put("acks" , "1" ) props.put("retries" , "3" ) props.put("key.serializer" , classOf[StringSerializer ].getName) props.put("value.serializer" , classOf[StringSerializer ].getName) val producer = new KafkaProducer [String , String ](props) val random = new Random () val allStatus = Array ( "fail" , "busy" , "barring" , "success" , "success" , "success" , "success" , "success" , "success" , "success" , "success" , "success" ) while (true ) { val callOut: String = "1860000%04d" .format(random.nextInt(10000 )) val callIn: String = "1890000%04d" .format(random.nextInt(10000 )) val callStatus: String = allStatus(random.nextInt(allStatus.length)) val callDuration = if ("success" .equals(callStatus)) (1 + random.nextInt(10 )) * 1000 L else 0 L val stationLog: StationLog = StationLog ( "station_" + random.nextInt(10 ), callOut, callIn, callStatus, System .currentTimeMillis(), callDuration ) println(stationLog.toString) Thread .sleep(100 + random.nextInt(100 )) val record = new ProducerRecord [String , String ]("stationTopic" , stationLog.toString) producer.send(record) } producer.close() } case class StationLog ( stationId: String , callOut: String , callIn: String , callStatus: String , callTime: Long , duration: Long ) { override def toString : String = { s"$stationId ,$callOut ,$callIn ,$callStatus ,$callTime ,$duration " } } }
步骤
1-首先将Kafka两个Topic构建好
2-启动模拟程序模拟生产者生产station数据到staiontopic中
3-启动StructuredStreaming消费staiontopic的数据
4-进行etl的操作———过滤下标三个字段(callStatus=‘success’)
5-将过滤出来的数据写入到etltopic中
6-查看etltopic的消费者查看结果是否正确
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package cn.ispark.structedstreaming.kafkaimport org.apache.commons.lang3.StringUtils import org.apache.spark.SparkConf import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame , Dataset , SparkSession }object _02StationDataProcess { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf () .setAppName(this .getClass.getSimpleName.stripSuffix("$" )) .setMaster("local[*]" ) val spark: SparkSession = SparkSession .builder() .config(conf) .config("spark.sql.shuffle.partitions" , "4" ) .getOrCreate() import spark.implicits._ val streamDF: DataFrame = spark.readStream .format("kafka" ) .option("kafka.bootstrap.servers" , "node1:9092" ) .option("subscribe" , "stationTopic" ) .load() val result: Dataset [String ] = streamDF .selectExpr("cast (value as string)" ) .as[String ] .filter(line => StringUtils .isNotBlank(line) && "success" .equals(line.trim.split("," )(3 ))) val query: StreamingQuery = result.writeStream .format("kafka" ) .option("kafka.bootstrap.servers" , "node1:9092" ) .option("topic" , "etlTopic" ) .option("checkpointLocation" ,"data/baseoutput/checkpoint-2" ) .start() query.awaitTermination() query.stop() } }
结果
物联网案例
1 2 3 4 5 6 7 8 9 10 11 12 13 # 物联网设备案例 # 查看topic信息 /export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181 # 删除topic /export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic iotTopic # 创建topic /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic iotTopic # 模拟生产者 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic iotTopic # 模拟消费者 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic iotTopic --from-beginning
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 package cn.ispark.structedstreaming.iotimport org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame , SparkSession }import org.apache.spark.sql.functions._import org.apache.spark.sql.streaming.{OutputMode , StreamingQuery , Trigger }import org.apache.spark.sql.types.DoubleType object _01IOTStreamProcess { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf () .setAppName(this .getClass.getSimpleName.stripSuffix("$" )) .setMaster("local[*]" ) val spark: SparkSession = SparkSession .builder() .config(conf) .config("spark.sql.shuffle.partitions" , "4" ) .getOrCreate() import spark.implicits._ val streamDF: DataFrame = spark.readStream .format("kafka" ) .option("kafka.bootstrap.servers" , "node1:9092" ) .option("subscribe" , "iotTopic" ) .load() val parseJsonData: DataFrame = streamDF .selectExpr("cast (value as string)" ) .as[String ] .select( get_json_object($"value" , "$.device" ).as("device" ), get_json_object($"value" , "$.deviceType" ).as("deviceType" ), get_json_object($"value" , "$.signal" ).cast(DoubleType ).as("signal" ), get_json_object($"value" , "$.time" ).as("time" ) ) parseJsonData.createOrReplaceTempView("table_view" ) val sql = """ |select round(avg(signal),2) as avg_signal,count(deviceType) as device_counts |from table_view |where signal >30 |group by deviceType |""" .stripMargin val result: DataFrame = spark.sql(sql) val query: StreamingQuery = result.writeStream .format("console" ) .outputMode(OutputMode .Complete ()) .option("numRows" , 10 ) .option("truncate" , false ) .trigger(Trigger .ProcessingTime (0 )) .start() query.awaitTermination() query.stop() } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 package cn.ispark.structedstreaming.iotimport org.apache.spark.SparkConf import org.apache.spark.sql.functions._import org.apache.spark.sql.streaming.{OutputMode , StreamingQuery , Trigger }import org.apache.spark.sql.types.DoubleType import org.apache.spark.sql.{DataFrame , SparkSession }object _02IOTStreamProcessDSL { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf () .setAppName(this .getClass.getSimpleName.stripSuffix("$" )) .setMaster("local[*]" ) val spark: SparkSession = SparkSession .builder() .config(conf) .config("spark.sql.shuffle.partitions" , "4" ) .getOrCreate() import spark.implicits._ val streamDF: DataFrame = spark.readStream .format("kafka" ) .option("kafka.bootstrap.servers" , "node1:9092" ) .option("subscribe" , "iotTopic" ) .load() val parseJsonData: DataFrame = streamDF .selectExpr("cast (value as string)" ) .as[String ] .select( get_json_object($"value" , "$.device" ).as("device" ), get_json_object($"value" , "$.deviceType" ).as("deviceType" ), get_json_object($"value" , "$.signal" ).cast(DoubleType ).as("signal" ), get_json_object($"value" , "$.time" ).as("time" ) ) parseJsonData.createOrReplaceTempView("table_view" ) val result: DataFrame = parseJsonData .filter('signal > 30 ) .groupBy("deviceType" ) .agg( round(avg("signal" ), 2 ).as("avg_signal" ), count("deviceType" ).as("device_counts" ) ) val query: StreamingQuery = result.writeStream .format("console" ) .outputMode(OutputMode .Complete ()) .option("numRows" , 10 ) .option("truncate" , false ) .trigger(Trigger .ProcessingTime (0 )) .start() query.awaitTermination() query.stop() } }
对于重复数据去重操作
对于指定的字段或多个字段进行去重操作
StructureedStreaming的Foreach及ForeachBatch
结构化流写入MySQL中
Foreach
1 2 3 4 5 6 7 8 9 url="jdbc:mysql://localhost:3306/database_name" 驱动:com.mysql.jdbc.Driver 驱动包Maven: <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.38</version > </dependency >
1 2 3 4 5 6 7 8 CREATE TABLE `t_word` ( `id` int (11 ) NOT NULL AUTO_INCREMENT, `word` varchar (255 ) NOT NULL , `count` int (11 ) DEFAULT NULL , PRIMARY KEY (`id`), UNIQUE KEY `word` (`word`) ) ENGINE= InnoDB AUTO_INCREMENT= 26 DEFAULT CHARSET= utf8;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 class JDBCSink (url: String , username: String , password: String ) extends ForeachWriter [Row ] with Serializable { var connection: Connection = _ var preparedStatement: PreparedStatement = _ override def open (partitionId: Long , version: Long ): Boolean = { connection = DriverManager .getConnection(url, username, password) true } override def process (row: Row ): Unit = { val word: String = row.get(0 ).toString val count: String = row.get(1 ).toString println(word + ":" + count) val sql = "REPLACE INTO `t_word` (`id`, `word`, `count`) VALUES (NULL, ?, ?);" preparedStatement = connection.prepareStatement(sql) preparedStatement.setString(1 , word) preparedStatement.setInt(2 , Integer .parseInt(count)) preparedStatement.executeUpdate() } override def close (errorOrNull: Throwable ): Unit = { if (connection != null ) { connection.close() } if (preparedStatement != null ) { preparedStatement.close() } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 package cn.ispark.structedstreaming.toMySQLimport java.sql.{Connection , DriverManager , PreparedStatement }import org.apache.spark.SparkConf import org.apache.spark.sql.streaming.{OutputMode , StreamingQuery , Trigger }import org.apache.spark.sql.{ForeachWriter , Row , SparkSession }object _01ForeachWayToMySQL { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf () .setAppName(this .getClass.getSimpleName.stripSuffix("$" )) .setMaster("local[*]" ) val spark: SparkSession = SparkSession .builder() .config(conf) .config("spark.sql.shuffle.partitions" , "4" ) .getOrCreate() import spark.implicits._ val lines = spark.readStream .format("socket" ) .option("host" , "node1" ) .option("port" , 9999 ) .load() val words = lines .as[String ] .flatMap(_.split("\\s+" )) val wordCounts = words .groupBy("value" ) .count() val jDBCSink = new JDBCSink ("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8" , "root" , "root" ) val query: StreamingQuery = wordCounts .writeStream .foreach(jDBCSink) .outputMode(OutputMode .Complete ()) .trigger(Trigger .ProcessingTime (0 )) .start() query.awaitTermination() query.stop() } class JDBCSink (url: String , username: String , password: String ) extends ForeachWriter [Row ] with Serializable { var connection: Connection = _ var preparedStatement: PreparedStatement = _ override def open (partitionId: Long , version: Long ): Boolean = { connection = DriverManager .getConnection(url, username, password) true } override def process (row: Row ): Unit = { val word: String = row.get(0 ).toString val count: String = row.get(1 ).toString println(word + ":" + count) val sql = "REPLACE INTO `t_word` (`id`, `word`, `count`) VALUES (NULL, ?, ?);" preparedStatement = connection.prepareStatement(sql) preparedStatement.setString(1 , word) preparedStatement.setInt(2 , Integer .parseInt(count)) preparedStatement.executeUpdate() } override def close (errorOrNull: Throwable ): Unit = { if (connection != null ) { connection.close() } if (preparedStatement != null ) { preparedStatement.close() } } }
上述的方法就是集成ForeachWriter的类实现其中open,process和close方法,通过foreach传入对应参数即可
ForeachBatch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package cn.ispark.structedstreaming.toMySQL import java.sql.{Connection , DriverManager , PreparedStatement } import org.apache.spark.SparkConf import org.apache.spark.sql.streaming.{OutputMode , StreamingQuery , Trigger } import org.apache.spark.sql.{DataFrame , ForeachWriter , Row , SaveMode , SparkSession } object _02ForeeachBatchWayToMySQL { def main (args: Array [String ]): Unit = { val conf: SparkConf = new SparkConf () .setAppName(this .getClass.getSimpleName.stripSuffix("$" )) .setMaster("local[*]" ) val spark: SparkSession = SparkSession .builder() .config(conf) .config("spark.sql.shuffle.partitions" , "4" ) .getOrCreate() import spark.implicits._ val lines = spark.readStream .format("socket" ) .option("host" , "node1" ) .option("port" , 9999 ) .load() val words = lines .as[String ] .flatMap(_.split("\\s+" )) val wordCounts = words .groupBy("value" ) .count() val query: StreamingQuery = wordCounts .writeStream .outputMode(OutputMode .Complete ()) .foreachBatch((data: DataFrame , batchID: Long ) => { println("BatchId is:" , batchID) data .coalesce(1 ) .write .mode(SaveMode .Overwrite ) .format("jdbc" ) .option("driver" , "com.mysql.jdbc.Driver" ) .option("url" , "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8" ) .option("user" , "root" ) .option("password" , "root" ) .option("dbtable" , "bigdata.tb_word_count2" ) .save() }).start() query.awaitTermination() query.stop() } }
需要注意的是foreach需要传入的是数据集本身,batchId
默认情况下,foreachBatch仅提供至少一次写保证。 但是,可以使用提供给该函数的batchId作为重复数据删除输出并获得一次性保证的方法。
foreachBatch不适用于连续处理模式,因为它从根本上依赖于流式查询的微批量执行。 如果以连续模式写入数据,请改用foreach。
背压处理 在结构化流中自动实现反压机制,控制kafka数据接入
spark.streaming积压
事件时间–水印机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 package cn.ispark.structedstreaming.kafkaimport java.sql.Timestamp import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkContext import org.apache.spark.sql.streaming.{OutputMode , StreamingQuery , Trigger }import org.apache.spark.sql.{DataFrame , SparkSession }object StructuredWindow { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .config("spark.sql.shuffle.partitions" , "3" ) .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN" ) import org.apache.spark.sql.functions._ import spark.implicits._ val inputStreamDF: DataFrame = spark.readStream .format("socket" ) .option("host" , "node1" ) .option("port" , 9999 ) .load() val resultStreamDF = inputStreamDF .as[String ] .filter(StringUtils .isNotBlank(_)) .flatMap(line => { val arr = line.trim.split("," ) val timestampStr: String = arr(0 ) val wordsStr: String = arr(1 ) wordsStr .split("\\s+" ) .map((Timestamp .valueOf(timestampStr), _)) }) .toDF("timestamp" , "word" ) .withWatermark("timestamp" , "10 seconds" ) .groupBy( window($"timestamp" , "10 seconds" , "5 seconds" ), $"word" ).count() val query: StreamingQuery = resultStreamDF.writeStream .outputMode(OutputMode .Update ()) .format("console" ) .option("numRows" , "100" ) .option("truncate" , "false" ) .trigger(Trigger .ProcessingTime ("5 seconds" )) .start() query.awaitTermination() query.stop() } }
StructuredStreaming的Continue的连续处理机制
连续处理(Continuous Processing)是Spark 2.3中引入的一种新的实验性流执行模式,可实现毫秒级端到端延迟(类似Storm、Flink),但只能保证At-Least-Once 最少一次:即不会丢失数据,但可能会有重复结果。
默认的微批处理(micro-batch processing)引擎,可以实现Exactly-Once 精确一次保证,但最多可实现100ms的延迟。
结构化流提供了处于试验阶段的Continue的连续处理机制
需要在writeStream.trigger(Trigger.ContinueTime())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkContext import org.apache.spark.sql.streaming.{OutputMode , StreamingQuery , Trigger }import org.apache.spark.sql.{DataFrame , Dataset , SparkSession }object StructuredContinuous { def main (args: Array [String ]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this .getClass.getSimpleName.stripSuffix("$" )) .master("local[*]" ) .config("spark.sql.shuffle.partitions" , "3" ) .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN" ) import org.apache.spark.sql.functions._ import spark.implicits._ val kafkaStreamDF: DataFrame = spark.readStream .format("kafka" ) .option("kafka.bootstrap.servers" , "node1:9092" ) .option("subscribe" , "stationTopic" ) .load() val etlStreamDF: Dataset [String ] = kafkaStreamDF .selectExpr("CAST(value AS STRING)" ) .as[String ] .filter(log => StringUtils .isNotBlank(log) && "success" .equals(log.trim.split("," )(3 ))) val query: StreamingQuery = etlStreamDF.writeStream .outputMode(OutputMode .Append ()) .format("kafka" ) .option("kafka.bootstrap.servers" , "node1:9092" ) .option("topic" , "etlTopic" ) .option("checkpointLocation" , "./ckp" + System .currentTimeMillis()) .trigger(Trigger .Continuous ("1 second" )) .start() query.awaitTermination() query.stop() } }