Spark架构及编程
Spark架构及编程
接Spark环境的安装
四、Spark运行框架
4.1 运行框架
![image-20200602225134787](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200602225135.png)
--1. Spark框架可以理解三个部分组成
第一部分: Driver + Executor --> 任务执行和调度
第二部分: Master + Worker --> Saprk自身的资源调度框架
第三部分: Cluster Manager --> 集群管理,中间件
- Driver + Executor
-- Driver:驱动器
1. 用于执行Spark任务中的main()方法,负责实际代码的执行
2. 将用户程序转化为job;
3. 调度Executor之间的任务(task);
4. 跟踪Executor的执行情况;
5. 通过UI展示查询运行的结果。
-- Executor: 执行器
1. 是worker的一个JVM进程,负责Spark执行具体的任务(task);
2. 负责运行组成Spark应用的任务,并将结果返回给驱动器进程;
3. 在执行器内有块管理器,为用户程序中要求缓存的RDD提供内存的缓存。
- Master + worker
-- Master
1. 类似YARN中的RM;
2. 负责资源的调度和分配,并进行集群的监控
-- worker
1. 类似YARN中的NM;
2. 是一个进程,一个Worker运行在集群的一个服务器上,由Master分配资源对数据进行并行的处理和计算。
-- 只有独立部署的模式下才有Master和Worker.
- ApplicationMaster
ApplicationMaster就是用来为RM(资源) 和Driver(计算) 之间解耦合
-- 如何理解解耦合?
Application作为Driver和RM的中间人,Driver和RM不需要直接连接,这样当Driver出现变动时,RM并不需要做任何代码的修改,也就是"你的改变,我看不见~~
![image-20200603085844513](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200603085844.png)
4.2 核心概念
4.2.1 执行器Exerutor
如下是执行器的参数,有执行器的个数、每个执行器的内存大小、每个执行器虚拟的CPU 的核数
名称 | 说明 |
---|---|
--num-executors | 配置Executor的数量 |
--executor-memory | 配置每个Executor的内存大小 |
--executor-cores | 配置每个Executor的虚拟CPU core数量 |
4.2.2 并行度(Parallelism)
-- 什么是并行度
整个集群并行执行任务的数量称为并行度。
4.2.3 有向无环图(DAG)
-- 什么是有向无环图
DAG(Directed Acycli Graph):由点和线组成的拓扑图形,该图形具有方向,不会闭环。
-- tez :是作业和作业之间的有向无环图
-- Spark:是作业内部的有向无环图
![image-20200602232357360](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200602232357.png)
4.2.4 提交流程
"先作为了解,后续再详细讲解"
Spark应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client和Cluster。两种模式,主要区别在于:"Driver程序的运行节点"
![image-20200602232448093](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200602232448.png)
五、Spark核心编程
--Spark计算框架为了提供高吞吐、高并发的数据处理,提供了三大数据结构,处理不同数据的应用场景。
框架1:RDD:(Resillient distribute dataset) 弹性分布式数据集
框架2:累加器:分布式共享"只写"变量
框架3:广播变量:分布式共享"只读"变量
5.1 RDD
-- 什么是RDD?
弹性分布式数据集,一种数据处理的"模型"&"数据结构","可以理解为在java中我们创建了一个类,在类中构建了很多属性和方法,然后最后使用一个行动算子,将这些方法全部启动运行,并将运行的结果返回",是一个抽象类。如汽车模型,航母模型、手机模型等。
-- RDD的特点:
1. 可分区:提高消费能力,更适合并发计算,类似kafka的消费者消费数据,"一个分区对应一个task,在executor中,一个core对应一个task,这样就体现了并发计算"。
2. 弹性:变化,可变。
a、存储弹性:可以在磁盘和内存之间自动切换;"shuffle阶段,就会将数据存入磁盘中,避免数据量过大,导致任务失败。一个任务分很多个阶段,每个阶段内的运行,则是基于内存的。"
b、容错弹性:数据丢失可以自动恢复;
c、计算弹性:计算出错后重试机制;
d、分区弹性: 根据计算结果动态改变分区的数量。"每次计算以后,可能数据会减少,这样一来,就会造成数据倾斜的状况,通过动态修改分区的数量,这样就可以数据使尽量均匀分布在不同的分区内。"
3. 不可变:类似不可变集合
RDD只存储计算的逻辑,不存储数据,计算的逻辑是不可变的,一旦改变,则会创建新的RDD;
4. RDD :一个抽象类,需要子类具体实现,说明有很多种数据处理方式
5.2 IO
-- IO流分为:
字节流 字符流
输入流 inputStream read
输出流 outPutStream write
节点流 File + 字符流/字节流
处理流 buffer + ( File+ 字符流/字节流 )
-- 解读如下三张图流程
图1:使用字节流读取一个文件的内容并打印到控制台,使用一个文件节点流,一次只能读取一部分数据,然后打印,然后再读取一部分数据,再进行打印,慢;
图2:增加一个缓冲流,将获取的数据暂时先存放在内存的一个缓冲区内,等到达一定的数据量以后,再统一处理;
图3:发现字节流获取的数据,打印到控制台,我们是不认识的,中间使用一个字节流转字符流,将读取的数据转化为字符流,然后再将字符缓存到内存,待达到一定的数据量以后,再往控制台上打印。
综上,发现,如上的过程属于装饰者设计模式,前者的结果传递到后者,一层一层的包裹起来。
![image-20200602235042403](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200602235042.png)
![image-20200602235122170](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200602235122.png)
![image-20200602235154706](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200602235154.png)
5.3 RDD执行原理
-- RDD的执行原理:
1. 类似IO处理;
2. 装饰者设计模式,通过new的方式体现装饰者设计模式;
3. 延迟加载,RDD只是封装了逻辑,只要当行动算子"如collect()"执行时,才会开始执行整个逻辑。
-- 与IO的区别:
RDD不保存数据,只保留逻辑;
IO会保存数据
-- 如何理解RDD
不可变集合 --> 增加新的数据 --> 创建新的集合
RDD --> 扩展新的功能 --> 创建新的RDD
-- 解读RDD:
1. Executor有多个core
2. 默认一个分区生成一个task
3. 一个task由一个core执行
4. 所以一个Executor可以执行多个task
![image-20200603101333142](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200603101333.png)
5.4 RDD的核心属性
5.4.1 分区列表
-- 什么是分区列表
就是RDD中的多个分区
![image-20200603003135290](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200603003135.png)
5.4.2 分区计算函数
-- 什么是分区计算函数
就是RDD中的计算逻辑,不同的分区具有相同的计算逻辑。
![image-20200603003153013](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200603003153.png)
5.4.3 RDD之间的依赖关系
-- 什么是RDD之间的依赖关系
当需求中需要将多个计算模型进行组合时,就需要将多个RDD建立依赖关系
之所以有依赖关系,是因为假如将所有的执行逻辑封装在一个RDD中时,那么不知道哪个先执行,哪个后执行。
![image-20200603003356558](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200603003356.png)
5.4.4 分区器(可选)
-- 什么是分区器
数据进入分区的规则,对数据进行分区。
1. 只能是KV键值对的数据可以指定进入分区的规则
2. 默认没有分区器
"分区器和分区的个数有什么关系呢?"
分区器:是一种规则,针对kv数据结构的rdd,指明什么样的数据进入哪个分区
![image-20200603003507224](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200603003507.png)
5.4.5 首选位置(可选)
-- 什么是首选位置
根据本地化级别,确定task去到哪一个Executor中进行计算
![image-20200603003641655](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200603003641.png)
![](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200603003308.png)
5.5 基础编程
5.5.1 创建RDD
-- 4种创建RDD的方式:
1. 从内存(集合)中创建
2. 从磁盘中创建
3. 从其他RDD中创建:RDD调用新的逻辑就是创建新的RDD
4. 直接创建:通过new的方式,spark框架内部会使用
5.5.1.1 从内存(集合)中创建
-- 方法1:parallelize(形参)
1. 方法:使用parallelize(形参):创建一个RDD
2. 形参:有两个参数:
"参数1":seq:Seq[T],带泛型的序列,可以传递一个List集合
"参数2":numSlices:int = defaultParallelism,分片的数量,"后面讲平行度和分区时详细讲"。
-- 方法2:makeRDD(形参)
1.发现:底层还是调用了parallelize方法,所以参数和parallelize方法一模一样,只是方法名更好理解
源码:
parallelize(seq, numSlices)
//构建Spark的环境和创建和Spark的连接说明
/*
setMaster(master:String):指明Spark运行的环境
形参:local[] 学习期间暂时使用本地环境
local[1]:代表单核
local[4]:代表4核
local[*]:代表最大核数,在设备管理器中可以查看,我的电脑是12核
SetAppName(name:String):执行程序的名称。
*/
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")
val sc = new SparkContext(sparkConf)
val list = List(1, 2, 3, 4)
//方法1:parallelize(list)
val datas: RDD[Int] = sc.parallelize(list)
datas.saveAsTextFile("output")//local,单核,数据只存在一个分区
datas.saveAsTextFile("output1")//local[3],三核,数据存在3个分区
datas.saveAsTextFile("output2")
//local[*],最大核数:12核,生成12个分区,但是由于list数据没有那么多,所以有些分区没有数据
//方法2:makeRDD(list)
val rdd: RDD[Int] = sc.makeRDD(list)
//将RDD的处理后的数据保存到分区文件中
rdd.saveAsTextFile("output3")
5.5.1.2 从外部(Disk)存储中创建RDD
-- 从本地磁盘中创建RDD
1. 方法:textFile(形参)
2. 形参:有两个参数
"参数1":path:String,表示文件的路径
"表示方式":
a、可以表示一个文件
b、可以表示一个文件夹
c、还可以使用通配符"星号"的方式表示多个文件
如val rdd: RDD[String] = sc.textFile("input/*.txt")
"路径说明":
a、可以是相对路径,在IDEA中,从当前项目的根目录下往下找,path路径根据环境的不同自动发生改变
b、也可以是绝对路径
c、还可以指向第三方存储路径,如HDFS
"参数2":minPartitions:Int = defaultMinPartitions
指建议产生的RDD的最小分区数,"后面与并行度重点展开"。
suggested minimum number of partitions for the resulting RDD
-- 说明:spark读取文件时,默认是采用hadoop读取文件的规则,按行读取。
//环境准备和连接Spark
val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("wordCount")
val sc = new SparkContext(sparkConf)
//创建RDD
val rdd: RDD[String] = sc.textFile("input")
//将RDD处理后的数据保存到分区文件中
rdd.saveAsTextFile("output")
5.5.2 并行度与分区
-- 1. 理解一下什么是并行度和分区
并行度:parallelism,指整个集群同时执行任务的数量
分区:partitions,数据在RDD中分区,在RDD模型中,一个分区将生成一个task,且task执行互不影响。
-- 2. 并行度和分区的关系
默认情况下(即资源充足的情况下),一个分区生成的一个task,一个task为一个并行度。
也就是:分区数量 = 并行度。
-- 说明:并行度还和集群的总核数有关,所以资源充足就是指集群可用的核数 core >= task数量,如果可用的core < task数量(即分区数量),那么并行度就比task小。
5.5.2.1 从内存中读取数据
1. 创建RDD的方式为:
-- 方法1:parallelize(形参)
-- 方法2:makeRDD(形参)
2. 形参为:( seq:Seq[T],numSlices:int = defaultParallelism)
形参2:numSlices:int = defaultParallelism
"参数的含义":表示集合被切分分区数量,有默认值: number of partitions to divide the collection into。
分区数量:
a、如果传参数了,那么按照传递的参数为分区数量;
b、如果没有传参数,则使用默认值,默认值的情况如下:
源码中:
默认值:defaultParallelism,"默认并行度",调用下面这个函数:
scheduler.conf.getInt("spark.default.parallelism", totalCores)
①如果连接spark的配置中设定spark.default.parallelism参数了,那么就等于设定的参数
②如果没有设置,那么就等于机器总核数
"什么是机器总核数?"
机器总核数 = 当前环境中可用核数
local => 单核(单线程)=> 1
local[4] => 4核(4个线程) => 4
local[*] => 最大核数 => 我的电脑最大核数为12
3. 通过参数2知道了数据的分区数量,那么数据进入不同分区的规则是什么?
如下有三个案例,通过设置不同的分区数量,确认数据在分区中的情况。
--规则总结:
内存中的数据在分区中基本上是平均分配的。
如果:数据条数 % 分区数 == 0 --> 平均分配,如有4个数据,2个分区,则前面两个数据进入分区0,后面两个数据进入分区1
数据条数 % 分区数 !=0 --> 会采用一种基本算法实现分配,源码中具体的实现方法如下
-- 计算每个分区中数据的起止索引
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
-- Seq表示内存中的集合
seq match {
case _ =>
val array = seq.toArray -- 将集合中的元素转换为数组
-- 调用了上面的计算分区索引的方法
positions(array.length, numSlices).map { case (start, end) =>
array.slice(start, end).toSeq -- 根据索引位置对数据进行切分,确定哪些数据进入哪个分区
}.toSeq
// 情况1:从内存中读取数据时,集群的并行度、分区及数据进入分区的规则
//创建spark环境和连接spark
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val sc = new SparkContext(sparkConf)
//准备数据
val list = List(1,2,3,4)
/*
情况1:设定分区为2。
解析结果文件:
1. 生成两个分区文件
2. 分区数据如下:
分区0:1 2
分区1:3 4
*/
val rdd1: RDD[Int] = sc.makeRDD(list,2)
rdd1.saveAsTextFile("output1")
/*
情况2:设定分区为4。
解析结果文件:
1. 生成4个分区文件
2. 分区数据如下:
分区0:1
分区1:2
分区2:3
分区3:4
*/
val rdd2: RDD[Int] = sc.makeRDD(list,4)
rdd1.saveAsTextFile("output2")
/*
情况3:设定分区为3。
解析结果文件:
1. 生成3个分区文件
2. 分区数据如下:
分区0:1
分区1:2
分区2:3 4
*/
val rdd3: RDD[Int] = sc.makeRDD(list,3)
rdd1.saveAsTextFile("output3")
5.5.2.2 从文件中读取数据
- 单文件读取情况
-- 问题1:分区数如何确定:
方法解读:textFile(path:String,minPartitions:Int = defaultMinPartitions)
--1.建议最小分区数:
参数2:minPartitions:Int = defaultMinPartitions,指建议产生的RDD的最小分区数,有默认值
--2. 情况一:假如使用默认值:"取defaultParallelism【默认并行度】和2的最小值"
默认值:defaultMinPartitions,源码中调用了如下方法:"取defaultParallelism【默认并行度】和2的最小值"
源码:def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
defaultParallelism:源码中调用:
scheduler.conf.getInt("spark.default.parallelism", totalCores)
①如果连接spark的配置中设定spark.default.parallelism参数了,那么就等于设定的参数
②如果没有设置,那么就等于总核数
"什么是总核数?"
机器总核数 = 当前环境中可用核数
local => 单核(单线程)=> 1
local[4] => 4核(4个线程) => 4
local[*] => 最大核数 => 我的电脑最大核数为12
--3. 情况二:当传递了参数以后:使用传递的参数值。
--4. 什么是最小分区数?
所谓的最小分区数,取决于总的字节数是否能整除分区数,并且看剩下的字节数/每个分区的字节数是否
大于10%,如果大于10%,则剩余的字节数会生产一个新的分区。
--5. 实际上的分区数是多少呢?
1. 实际的分区数 >= 设定的RDD最小分区数
2. 算法:文件的字节数 / RDD设定的最小分区数 = result
a、如果恰好整除,则实际的分区数=设定的RDD最小分区数
b、如果除不尽,有余数,如果余数 / result <= 10% ,则实际的分区数=设定的RDD最小分区数
如果余数 / result > 10% ,则实际的分区数= 设定的RDD最小分区数 + 1
============================================================================================================
--问题2:在没有指定分区器的情况下,数据依据什么规则进入不同的分区?
通过查看源码发现,Spark读取文件采用的是hadoop的读取规则。
1、切片规则:"以字节的方式来切分数据"
2、数据读取规则:"按行读取"。
回答问题2之前,我们先来回答如下两个问题。
问题a:文件到底切成几片(也就是分区的数量)
按照文件的字节数,确定预计的切片数量。
问题b:分区数据是如何进行存储的?
1.换行符为2个字节
2.分区数据按行为单位进行读取,一行的数据不会被拆分。
"规则":数据进行不同分区的规则有二,二者合并一起使用。
规则一:数据起始偏移量和字节数。
规则二:数据偏移量(offset)
--具体是什么意思?一起跟着下面案例来详细认识。
- 案例1:
举例1:
--word1数据:
12@@
234
说明:换行符(用@@代替)为2个字节,Spark是按照行进行读取数据。
-- 第一步:建议生成RDD的最小分区数: => 2
-- 第二步:计算实际的分区数:=> 3
1.计算文件的字节数:=> 7个字节
2.计算整除的结果和余数: 7/2=3 ..1
3.计算余数和每个分区字节数的比率:1/3 > 10% => 生成一个新的分区 => 3
-- 第三步:计算每个分区的数据起始偏移量和每个分区的字节数
分区0:(起始偏移量,字节数) =>(0,3) =>(0,3)
分区1:(起始偏移量,字节数) =>(3,3) =>(3,6)
分区2:(起始偏移量,字节数) =>(6,1) =>(6,7)
-- 第四步:计算每行数据的偏移量
12@@ => 0 1 2 3
234 => 4 5 6
-- 第五步:数据的分配:按行读取,数据只会被读取一次
分区0 => 读取索引为0/1/2/3的数据,读取12
分区1 => 读取索引为3/4/5/6的数据,发现3已经被读取了,所以读取4/5/6索引的数据,读取:234
分区2 => 读取索引为6/7的数据,发现6已经被读取,索引7无数据,所以分区2没有数据。
*/
val rdd1: RDD[String] = sc.textFile("input/word1",2)
rdd1.saveAsTextFile("output")
- 案例2
举例2:
--word2数据:
1@@
2@@
3@@
4
说明:换行符(用@@代替)为2个字节,Spark是按照行进行读取数据。
-- 第一步:建议生成RDD的最小分区数: => 3
-- 第二步:计算实际的分区数:=> 4
1.计算文件的字节数:=> 10个字节
2.计算整除的结果和余数: 10/3=3 ..1
3.计算余数和每个分区字节数的比率:1/3 > 10% => 生成一个新的分区
-- 第三步:计算每个分区的数据起始偏移量和每个分区的字节数
分区0:(起始偏移量,字节数)=(0,3) =>(0,3)
分区1:(起始偏移量,字节数)=(3,3) =>(3,6)
分区2:(起始偏移量,字节数)=(6,3) =>(6,9)
分区3:(起始偏移量,字节数)=(9,1) =>(9,10)
-- 第四步:计算数据的偏移量
1@@ => 0 1 2
2@@ => 3 4 5
3@@ => 6 7 8
4 => 9
-- 第五步:数据的分配:按行读取,数据只会被读取一次
分区0 => 读取索引为0/1/2/3的数据,因为第一行数据不够,所以第二行也被读取,
所以实际读取了索引为0/1/2/3/4/5的数据,读取1 2
分区1 => 读取索引为3/4/5/6的数据,发现3/4/5已经被读取了,所以只能读取索引为6的数据,
所以读取6索引所在行,最后读取了索引为6/7/8的数据,读取:3
分区2 => 读取索引为6/7/8/9的数据,发现6/7/8已经被读取,所以只能读取索引为9的数据,
所以读取索引9所在行,最后读取了索引为9的数据,读取:4。
分区3 => 读取索引为9/10的数据,发现9已经被读取,索引10无数据,所以分区2没有数据。
val rdd2: RDD[String] = sc.textFile("input/word2",3)
rdd2.saveAsTextFile("output")
- 多文件读取情况
多文件和单文件异同:
1) 字节数为递归计算所有文件的字节数总和
2)不能跨文件读取数据
3)依然是按行读取
4)计算每个分区的字节数的算法保持不变,
但是总分区数增加的个数依据每个文件的字节数是否能整除每个分区的字节数而定。
- 案例解析
object Spark_FliePartitions {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("FileParallelism")
val sc = new SparkContext(sparkConf)
/*
文件1:
12@@ => 0,1,2,3
234 => 4,5,6
文件2:
1@@ => 0,1,2
2@@ => 3,4,5
3@@ => 6,7,8
4 => 9
计算过程:
1.字节总数:7+10=17
2.每个分区字节数:17/3=5 .. 2
3.文件1:
分区0:(0,5) =>12 234
分区1:(5,7) =>空
文件2:
分区0:(0,5) =>1 2
分区1:(5,10) => 3 4
*/
val rdd: RDD[String] = sc.textFile("input",3)
rdd.saveAsTextFile("output")
sc.stop()
}
}
5.5.3 RDD算子
RDD转换算子:
-- 1. 什么是算子?
认知心理学,解决问题的思路,也就是方法。
-- 2. 所谓的RDD算子,其实就是将旧的RDD通过方法的调用转换为新的RDD
-- 3. 既然算子也是方法,那么为什么叫做算子呢?
算子是RDD所拥有的,RDD只是封装逻辑,由Driver将task分配给Executor执行,所以是分布式计算的逻辑。
方法:则是在当前的虚拟机中执行。
-- 4. 算子的分类:
根据RDD处理数据的方式不同分为:value类型、双value类型、key-value类型。
value:单值数据,如List(1,2,3,4)
双value:指多个RDD之间的运算
key-value:指数据结构是kv键值对。List(("a",1),("b",2))
那么为什么要这么设计呢?
因为不同的数据结构,处理数据的需要不一样,也就有对应的算子来应对。
5.5.3.1 map
--算子:map(形参):
1. 作用:将处理的数据逐条进行映射处理,"类比scala中的map,对数据进行结构转换"
2. 形参:def map[U: ClassTag](f: T => U): RDD[U]
3. 基本使用如下:
val list = List(1,2,3,4)
val rdd: RDD[Int] = sc.makeRDD(list)
val rddmap: RDD[Int] = rdd.map(_*2)
println(rddmap.collect().mkString(","))
-- 关于map算子的两个问题
--问题1:分区的问题:RDD有分区列表,每个RDD都有相同的分区计算函数,那么新的RDD与旧的RDD的分区关系是什么?
默认分区的数量保持不变,数据会转换后输出。
--问题2:Map中数据处理的顺序是怎么样的?
通过如下验证发现:
a、分区内数据按照顺序依次执行,且第一条数据的所有逻辑执行完成以后再执行第二条数据,依次类推
b、分区间的数据执行是没有顺序,而且无需等待,即分区间执行逻辑互不影响,各自执行各自的逻辑。
- 验证如下:
//测试:新旧RDD分区的关系
val rdd1: RDD[Int] = sc.makeRDD(list,2)
val rddmap1: RDD[Int] = rdd1.map( num => num * 2})
//将数据输出到本地文件中,查看分区数量及分区内的数据
rdd1.saveAsTextFile("output1")
rddmap1.saveAsTextFile("output")
//测试:分区间的执行顺序
val rddmap2: RDD[Int] = rdd1.map( num => {println("mapA ->" + num );num * 2})
val rddmap3: RDD[Unit] = rddmap1.map(num => println("mapB ->" + num))
//collect方法不会转换RDD,会触发作业的执行,所以将collect这样的方法称之为行动(action)算子
rddmap3.collect()
- 练习:
//练习:从服务器日志数据apache.log中获取用户请求URL资源路径
val rdd: RDD[String] = sc.textFile("input/apache.log")
val result: Array[String] = rdd.map(
str => {
//按照空格拆分一条数据
val array: Array[String] = str.split(" ")
//只取URL资源数据
array(6)
}
).collect()
//遍历结果集
result.foreach(println)
5.5.3.2 mapPartitions
--map()算子问题:
在分区内每次只能获取一个数据,而且只有当前一个数据的所有逻辑执行完成以后才会执行下一个数据,这样一来,效率就相对比较慢。
--引出了另外一个算子:mapPartitions(形参)
--1. 形参:(f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]
形参1:f: Iterator[T] => Iterator[U],是一个函数
函数的形参:一个迭代器,内容为一个分区中所有的数据;
函数的返回:分区内数据经过转换以后数据形成的"迭代器"。
参数2:暂时不管。
--2. 返回结果:返回一个新的RDD
--3. 算子的作用:
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行"任意的处理",哪怕是过滤数据
--4. 与map()算子的不同点:
map 算子是一个全量数据处理,不能丢失数据;
mapPartitions 算子一次获取分区中所有的数据,那么可以执行迭代器所有的操作,如可以进行数据的过滤。
--5. mapPartitions算子存在的问题
如果一个分区的数据没有处理完,那么该分区内所有的数据都不会释放,即使是前面已经处理完的数据也不会释放,
容易出现内存溢出。
--6. map和mapPartitions()算子的选择:
如果内存空间足够大,为了提高效率时,推荐使用mapPartitions()算子
- 代码演示
val list = List(1,2,3,4)
val rdd: RDD[Int] = sc.makeRDD(list,2)
val rdd1: RDD[Int] = rdd.mapPartitions(iter => {
//只要分区内为偶数的数据
iter.filter(_% 2 ==0)
})
println(rdd1.collect().mkString(","))
- 练习
//练习:获取每个数据分区的最大值
val list = List(1,5,6,4,3,6)
val rdd: RDD[Int] = sc.makeRDD(list,3)
val result: RDD[Int] = rdd.mapPartitions(
iter => {
//求分区内的最大值,返回值为一个值,不是迭代器,所以使用List集合进行包装
List(iter.max).iterator
}
)
println(result.collect().mkString(","))
5.5.3.3 mapPartitionsWithIndex
--1. 算子:mapPartitionsWithIndex (形参)
--2. 形参:(f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false)
形参1:f: (Int, Iterator[T]) => Iterator[U],是一个函数
函数的形参:
参数1:为分区号
参数2:为一个迭代器,内容为一个分区中所有的数据;
函数的返回:分区内每个数据经过转换以后数据形成的"迭代器"。
形参2:暂时不管。
--3.算子的作用:将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,
"在处理时同时可以获取当前分区索引"
- 代码演示
val list = List(1,2,3,4)
val rdd: RDD[Int] = sc.makeRDD(list,2)
//获取每个分区最大值以及分区号
val rdd1: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex(
(num, iter) => {
List((num, iter.max)).iterator
})
println(rdd1.collect().mkString(","))
- 练习
//练习:获取第二个数据分区的数据
val list = List(1, 2, 3, 4, 5, 6)
val rdd: RDD[Int] = sc.makeRDD(list, 3)
val rdd1= rdd.mapPartitionsWithIndex(
(num, iter) => {
if (num == 1) {
iter
} else {
Nil.iterator
}
})
println(rdd1.collect().mkString(","))
5.5.3.4 flatmap
--1. 算子:flatMap(形参)
--2. 形参:(f: T => TraversableOnce[U]):是一个函数
函数的参数:分区内的一个一个的元素
返回值:经过映射以后将数据进行扁平化,返回一个可迭代的集合
--3. 作用:和scala中的作用完全一致,映射扁平
- 代码演示
val list = List(List(1,2),List(3,4))
val rdd: RDD[List[Int]] = sc.makeRDD(list)
val rdd1: RDD[Int] = rdd.flatMap(list=>list)
println(rdd1.collect().mkString(","))
- 练习
//将List(List(1,2),3,List(4,5))进行扁平化操作
val list = List(List(1, 2), 3, List(4, 5))
val rdd: RDD[Any] = sc.makeRDD(list)
val rdd1: RDD[Any] = rdd.flatMap(
data => {
data match {
case list: List[_] => list
case b => List(b)
}
}
)
println(rdd1.collect().mkString(","))
5.5.3.5 glom
--1. 算子:glom(形参)
--2. 形参:空,无形参
--3. 返回值:RDD[Array[T]],返回一个一个的数组,数组的数据来自同一个分区
--4. 作用:将同一个分区内的数据转换成数组。
- 代码实现
val list = List(1, 2, 5, 6, 4, 3)
val rdd: RDD[Int] = sc.makeRDD(list, 3)
val rdd1: RDD[Array[Int]] = rdd.glom()
rdd1.collect().foreach(array=>{println(array.mkString(","))})
- 练习
// 求每个分区的最大值的和
val list = List(1, 10, 8, 6, 2, 3)
val rdd: RDD[Int] = sc.makeRDD(list, 3)
//方法1:以分区来单位,进行处理
val sum: Double = rdd.mapPartitions(
iter => {
List(iter.max).iterator
}
).sum()
println(sum)
//方法2:使用glom
println(rdd.glom().flatMap(array => List(array.max).iterator).sum())
5.5.3.6 groupBy
--1. 算子:groupBy(形参)
--2. 形参:def groupBy[K](f: T => K,p:Partitioner)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
形参1:f: T => K:是一个函数
函数的形参为:数据集中的一个一个的元素
返回值为:返回分组的K
形参2:p:Partitioner,指设定下游的分区数量,如果不设置,则默认为旧RDD的分区数量
--3. 算子的返回值:返回一个元组
元组的第一个元素:表示分组的Key
元组的第二个元素:表示相同的key形成可迭代的集合
--4. 作用:将数据根据指定的规则进行分组。
--5. 特点:
a、分区默认不变
b、不同分区的数据会被重新打乱进入到不同的分区中;
c、我们将上游的分区数据打乱重新组合到下游的分区中,这个操作称之为shuffle
d、极限情况下,所有的数据会被分到一个分区
e、一个组的数据在一个分区,但是并不是说一个分区中只有一个组,
"如当分组数量大于分区数量时,那么一个分区就可能有多个组"。
--6. 存在的问题:
groupby方法会导致数据重新组合以后不均匀
--7. 解决方案:
通过传递参数,改变下游分区的数量。
![image-20200606000907169](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200606000907.png)
- 代码
/*
1.一个组的数据在一个分区,但是并不是说一个分区中只有一个组
奇偶分组,将数据分成两个组,结果文件中只有一个分区文件,分区文件中有两个分组。
*/
val list = List(1,2,3,4,5,6,7,8)
val rdd: RDD[Int] = sc.makeRDD(list,1)
val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
rdd1.saveAsTextFile("output")
/*
2.当前有4个分区,奇偶分组只会有两个分组,所以结果文件中有4个分区文件,但是有两个分区分件中没有数据
*/
val rdd: RDD[Int] = sc.makeRDD(list,4)
val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
rdd1.saveAsTextFile("output")
/*
3.通过设置下游的分区数量解决分区无数据的情况,此时生成的结果文件只有两个分区
*/
val rdd: RDD[Int] = sc.makeRDD(list,4)
val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(((num :Int) => num % 2),2)
rdd1.saveAsTextFile("output")
- 练习
// 小功能:将List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。
val list = List("Hello", "hive", "hbase", "Hadoop")
val rdd: RDD[String] = sc.makeRDD(list)
val rdd1: RDD[(String, Iterable[String])] = rdd.groupBy(word => word.substring(0,1))
println(rdd1.collect().mkString(","))
// 小功能:从服务器日志数据apache.log中获取每个时间段访问量。
val rdd: RDD[String] = sc.textFile("input/apache.log")
val rdd1: RDD[String] = rdd.flatMap(str => {
val datas: ArrayOps.ofRef[String] = str.split(" ")
List(datas(3).substring(11, 13))
})
val rdd2: RDD[(String, Iterable[String])] = rdd1.groupBy(time=>time)
println(rdd2.flatMap(data => List((data._1, data._2.size))).sortBy(_._1).collect().mkString(","))
// 小功能:WordCount。
val rdd: RDD[String] = sc.textFile("input/word1")
val wordcount: String = rdd.flatMap(_.split(" ")).groupBy(word => word)
.map(tuple => (tuple._1, tuple._2.size)).collect().mkString(",")
println(wordcount)
5.5.3.7 filter
--1. 算子:Filter(形参)
--2. 形参:(f: T => Boolean):是一个函数,用法和scala中的fliter类似
函数的形参:RDD中数据集的一个一个的数据
返回值:ture或者false
true:表示数据被保留下来
false:表示数据被过滤掉
--3. 作用:将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
--4. 特点:
a、分区不变
b、分区内的数据可能不均衡,生产环境下,可能会导致数据倾斜
- 代码演示
val list = List(1,2,3,4)
val rdd: RDD[Int] = sc.makeRDD(list)
val rdd1: RDD[Int] = rdd.filter(data => data % 2 ==0)
println(rdd1.collect().mkString(","))
- 练习
//练习:从服务器日志数据apache.log中获取2015年5月17日的请求路径
val rdd: RDD[String] = sc.textFile("input/apache.log")
val rdd1: RDD[(String, String)] = rdd.map(data =>(data.split(" ")(3),data.split(" ")(6)) )
val rdd2: RDD[(String, String)] = rdd1.filter(tuple => {
tuple._1.substring(0, 10) == "17/05/2015"
})
rdd2.collect().foreach(println)
5.5.3.8 Sample
--1. 算子:Sample(形参)
--2. 形参: 有三个参数:
参数1:withReplacement: Boolean,表示数据从原数据集中抽取以后是否还放回
参数2:fraction: Double:和参数1配合一起使用,参数1的值不同,参数2表示含义不相同。
情况1:参数1为ture,表示抽取以后放回,此时参数2表示重复抽取的次数
情况2:参数1为false,表示抽取后不放回,此时参数2表示数据被抽取的几率。
说明:几率 != 返回数据集数量 / 原数据集的数量
参数3:seed: Long = Utils.random.nextLong:表示随机数的种子,可以确定数据抽取,可以理解
为数据的伪随机。所谓的随机是通过某种算法计算得来的,一旦设置了这个参数,每次获取的随机
数都是固定的。这个参数可选,如果没有设置,那么就是真的随机数,每次返回的结果集都可能是不一样的
--3. 作用:
根据指定的规则从数据中抽取数据。
--4. 使用场景:
在实际开发中,往往会出现数据倾斜的情况,那么可以从数据倾斜的分区中抽取部分数据,
通过抽取的数据,分析造成数据倾斜的原因。
val rdd: RDD[Int] = sc.makeRDD(List(1,3,5,6,7,8),2)
val rdd1: RDD[Int] = rdd.sample(false,0.5,1)
println(rdd1.collect().mkString(","))//1,7
val rdd2: RDD[Int] = rdd.sample(true,2,1)
println(rdd2.collect().mkString(","))//1,1,1,1,3,3,3,3,5,5,6,7,7,8,8,8,8
5.5.3.9 distinct
--1. 算子:distinct(形参)
--2. 形参:有两个重载的方法:
方法1:无参
方法2:有一个参数:numPartitions: Int:用来改变去重以后的分区数量。
--3. 作用:对数据进行去重操作
val list = List(1,2,3,5,4,12,3,1)
val rdd: RDD[Int] = sc.makeRDD(list,2)
val rdd1: RDD[Int] = rdd.distinct(3)
rdd1.saveAsTextFile("output")
5.5.3.10 coalesce
--1. 算子:coalesce(形参)
--2. 形参:有三个参数
参数1:numPartitions: Int:重置的分区数量
参数2:shuffle: Boolean = false:是否需要打乱数据,shuffle
参数3:partitionCoalescer: Option[PartitionCoalescer] = Option.empty
--3. 作用:缩减分区数量
--4. 说明:
a、该算子重点在减少分区,我们在重置分区的个数的时候,参数值不要比原有分区数量多,因为"该算子默认是不会
打乱数据重新,没有shuffle",所以分区设置多了,多余的分区不会有数据。
b、我们在使用这个算子的时候,只需要传递重置的分区数量即可,其他的参数使用默认值;
c、如果想扩大分区,有新的算子可以实现,不过底层还是调用coalesce,只是将参数2设置为true
--5. 应用场景:
a、数据经过过滤以后,发现数据不均匀,使用这个算子来减少分区的数量
b、数据分区设置的不合理,也可以使用这个方法。
val list = List(1,2,3,5,1,6,4,5)
val rdd: RDD[Int] = sc.makeRDD(list,4)
val rdd1: RDD[Int] = rdd.distinct()
val rdd2: RDD[Int] = rdd1.coalesce(2)
rdd2.saveAsTextFile("output")
5.5.3.11 rePartition
--1. 算子:rePartition(形参)
--2. 底层逻辑:调用了coalesce算子,只是将shuffle的值改为了true,执行shuffle过程。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
--3. 参数:numPartitions: Int:重新分区的数量
--4. 作用:扩大分区,重分区
--5. 说明:
这个参数即可以扩大分区,也可以缩小分区的数量,但是我们一般用来扩大分区。
缩小分区可以使用coalesce算子
--6. coalesce 和 rePartition算子的使用选择
a、如果是减少分区,那么就使用coalesce
b、如果是扩大分区,那么就使用rePartition
val list = List(1, 2, 3, 5, 4, 12, 3, 1,6)
val rdd: RDD[Int] = sc.makeRDD(list,3)
val rdd1: RDD[Int] = rdd.distinct()
rdd1.saveAsTextFile("output1")
/*
output1:
分区0:6 3 12
分区1:4 1
分区2:5 2
*/
val rdd2: RDD[Int] = rdd1.coalesce(2)
rdd2.saveAsTextFile("output2")
/*
output2:同一分区的数据还在一起
分区0:6 3 12 4 1
分区1:5 2
*/
val rdd3: RDD[Int] = rdd1.repartition(2)
rdd3.saveAsTextFile("output3")
/*
output3:数据从原来的分区打乱重组
分区0:6 12 1 5
分区1:3 4 2
*/
5.5.3.12 sortBy
--1. 算子:sortBy(形参)
--2. 形参:参数有3个:
形参1:f: (T) => K:
T:数据集中的每一个元素
K:排序的K
形参2:ascending: Boolean = true
排序的方式,默认值为ture,为升序,
如果改为false,则是降序
形参3: numPartitions: Int = this.partitions.length
排序后的分区数量,默认值为前一个RDD的分区数量。
--3. 作用:按照指定的规则进行排序
val list = List(1,5,6,3,2,4)
val rdd: RDD[Int] = sc.makeRDD(list,3)
//降序排序:
val rdd1: RDD[Int] = rdd.sortBy(num => num,false)
println(rdd1.collect().mkString(","))//6,5,4,3,2,1
5.5.3.13 双Value数据类型
双Value:表示是两个RDD之间进行操作,类似sacla中集合的并集(union)、交集(intersect)、差集(diff)、拉链(zip)
--1. 算子:union
--2. 作用:并集
--3. 说明
a、分区:分区合并
b、数据:数据合并
c、两个RDD的数据类型必须保持一致,否者编译不通过
val rdd1 = sc.makeRDD(List(1,2,1,3), 4)
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6), 2)
val result1: RDD[Int] = rdd1.union(rdd2)
println(result1.collect().mkString(","))
result1.saveAsTextFile("output1")
--1. 算子:intersection
--2. 作用:交集
--3. 说明
a、数据打乱重组,有shuffle过程;
b、两个RDD的数据类型必须保持一致,否者编译时报错
c、返回的RDD的分区数量保留两个RDD最大的分区数量
val result2 = rdd1.intersection(rdd2)
--1. 算子:subtract
--2. 作用:差集
--3. 说明
a、分区:返回的RDD的分区数量等于调用这个方法的RDD的分区数量
b、有数据打乱重组过程,有shuffle过程
c、数据:返回当前RDD除去和参数RDD共同的数据集
d、两个RDD的数据类型必须保持一致,否者编译时报错
val result3: RDD[Int] = rdd1.subtract(rdd2)
--1. 算子:zip
--2. 作用:拉链
--3. 说明:
a、分区数量相同,每个分区的数据量不相等,
报错:Can only zip RDDs with same number of elements in each partition
只有两个RDD的每个分区数据量相同才能拉链
b、分区数量不相同,每个分区的数量量相同,
报错:Can't zip RDDs with unequal numbers of partitions
RDD的分区数量不同不能拉链
综上:只要两个RDD的分区数量和每个分区数据量相等,才不会报错。
c、返回的RDD的数据是元组
val result4: RDD[(Int, Int)] = rdd1.zip(rdd2)
5.5.3.14 key-value类型
--1.Spark中有很多方法都是基于Key进行操作,所以数据格式应该为键值对(对偶元素)才能使用这些方法
--2.如果数据类型是kv类型,那么Spark会将RDD自动转换补充很多新的功能-->功能的扩展
--3.那么是如果实现的?
a、通过隐式转换
b、如果数据类型为kv类型,在RDD的伴生对象中会将当前的RDD会转换为PairRDDFunctions对象
c、如下的partitionBy就是来自PairRDDFunctions类中的方法
5.5.3.15 partitionBy
--1. 算子:partitionBy(形参)
--2. 形参:partitioner: Partitioner:是一个分区器对象。
--3. 作用:根据指定的规则对数据进行分区,指定数据进入到哪一个分区。
--4.当前能改变分区的算子有:
groupBy、coalesce、rePartition -->改变分区的数据,但是并不能指定数据去到指定的分区
而partitionBy就是来处理将数据指定去到哪个分区。
--5. 什么是分区器?
a、Partitioner是一个抽象类,有两个抽象方法:
方法1:def numPartitions: Int --用来获取当前的分区数量
方法2:def getPartition(key: Any): Int --根据数据的key,返回数据所在的分区号
b、Partitioner有三个实现类:
1.HashPartitioner
2.RangePartitioner
3.PythonPartitioner
c、HashPartitioner:
1.是Spark默认的分区器
2.分区规则:将当前数据的Key的哈希值 % 分区数量
3.形参需要传递分区的数量。
d、RangePartitioner:范围分区器,指定每个分区的key范围,在这个范围的key就进入这个分区
这需要key能比较大小。
--6.注意事项:如果重分区的分区器和当前RDD的分区器相同,那么数据不会重新分区。
--7.自定义分区器:
步骤:
a、创建一个类,有一个分区数量的属性,然后extends Partitioner
b、重写Partitioner中的两个抽象方法
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val list = List((1, "a"), (2, "b"), (2, "c"))
val rdd: RDD[(Int, String)] = sc.makeRDD(list, 3)
// rdd.saveAsTextFile("output1")
val rdd1: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(3))
// rdd1.saveAsTextFile("output2")
val list1 = List(
("cba", "消息1"),
("nba", "消息5"),
("wnba", "消息10"),
("cba", "消息2"),
("nba", "消息2"),
("wnba", "消息6"),
("cba", "消息1"),
)
val rddInfo: RDD[(String, String)] = sc.makeRDD(list1,2)
val partitionRDD: RDD[(String, String)] = rddInfo.partitionBy(new MyPartitioner(2))
partitionRDD.saveAsTextFile("output")
sc.stop()
}
//自定义分区器
class MyPartitioner(num:Int) extends Partitioner {
override def numPartitions: Int = num
override def getPartition(key: Any): Int = {
key match {
case "nba" => 0
case _ => 1
}
}
}
5.5.3.16 reduceByKey
--1. 算子:reduceByKey(形参)
--2. 形参:有三个重载的方法,我们这里介绍两种。
方法1:
形参:func: (V, V) => V ,是一个函数
函数的形参为:表示相同Key的value
函数返回值:经过聚合以后的结果,返回值数据类型和原数据value类型一致
方法2:
形参:func: (V, V) => V, numPartitions: Int
形参1:与方法1一致
形参2:定义聚合以后分区的数量
--3. 作用:根据数据的Key进行分组,对相同Key的value进行数据处理
val list = List(("a",2),("b",1),("a",3),("b",5))
val rdd: RDD[(String, Int)] = sc.makeRDD(list,2)
rdd.saveAsTextFile("output")
val rdd1: RDD[(String, Int)] = rdd.reduceByKey(_ + _)
rdd1.saveAsTextFile("output1")
![image-20200606000759888](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200606000759.png)
5.5.3.17 groupByKey
--1. 算子:groupByKey(形参)
--2. 形参:有三个重载的方法:分别是(无参)、(partitioner: Partitioner)、(numPartitions: Int)
--3. 返回值:RDD[(K, Iterable[V])],是一个元组
第一元素:表示用于分组的key
第二元素:表示分组后相同key的value的集合
--4. 作用:根据数据的key进行分组
--5. 对比groupBy:按照指定的规则进行分组
--6. 补充知识点:
1. shuffle过程必须落盘。
2. 一个分区就是一个task,如果处理过程有shuffle过程,那么将会把task一分为二
有shuffle过程,会生产新的分区,生产新的task。
3. 判断一个算子的效率,取决于shuffle的效率,落盘的数据越少,读取的数据越少,则效率越高
--7. 关于reduceByKey和groupByKey的区别。
1. 算子的作用:
reduceByKey:根据key进行分组,对相同的key的value进行操作
groupByKey:对key进行分组
2. groupByKey
a、对一个分区的数据分区后不能继续执行后续的操作,需要等到其他分区的数据全部到达后,才能执行后续的操作
b、groupByKey是面向整个数据集,而不是面向一个分区
c、但是如果在内存等待,那么可能由于内存不够,导致执行失败,所以这个等待的过程依靠落盘.
3.reduceByKey:
a、在shuffle之前进行分区内的聚合操作,称之为预聚合,这样shuffle时,落盘的数据就会减少,提高了shuff的
效率
b、分区内和分区间的规则相同。
val list = List(("a",2),("b",1),("a",3),("b",5))
val rdd: RDD[(String, Int)] = sc.makeRDD(list,2)
val rdd1: RDD[(String, Iterable[Int])] = rdd.groupByKey(1)
rdd1.saveAsTextFile("output")
println(rdd1.collect().mkString(","))
//(b,CompactBuffer(1, 5)),(a,CompactBuffer(2, 3))
![image-20200606000826608](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200606000826.png)
5.5.3.18 aggregateByKey
--1. 算子:aggregateByKey(形参)
--2. 形参:两个形参列表
a、形参列表1:(zeroValue: U)
参数:表示计算的初始值
b、形参列表2:(seqOp: (U, V) => U,combOp: (U, U) => U)
参数1:seqOp: (U, V) => U:是一个函数,表示分区内相同key的value的计算规则
函数的形参:第一个参数按照计算规则和第一个value计算的结果,类型和初始值类型相同,第二个参数为数据的V
函数的返回值:返回和第一个参数一样的数据类型
参数2:combOp: (U, U) => U:是一个函数,表示分区间,相同key的value的计算规则
函数的形参:两个参数为每个分区的计算结果,类型和初始值类型相同
函数的返回值:返回和第一个参数一样的数据类型
--3. 作用:根据key进行聚合,分区内和分区间的执行逻辑均是针对于value的操作
--4. 使用场景:
a、当出现分区内和分区间对数据处理的规则不一样时,使用这个算子。
--5. 当分区内计算规则和分区间的计算规则相同时,可以使用foldByKey进行替代。
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
--6. 说明:
a、初始值只参与分区内相同key的第一次运算,而且初始值为value值
b、可以初始值的方式改变数据结构
![image-20200606000641855](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200606000641.png)
/*
需求:求分区内相同key的value最大值,然后再求分区间相同key的value总和
分区内:
分区0:
("a",2), ("a",3), ("b",4),
=>("a",3),("b",4)
分区1:
("b",2), ("a",5), ("b",3)
=>("b",3),("a",5)
分区间:
("a",8)
("b",7)
*/
val list = List(
("a", 2), ("a", 3), ("b", 4),
("b", 2), ("a", 5), ("b", 3)
)
val rdd: RDD[(String, Int)] = sc.makeRDD(list, 2)
val rdd1: RDD[(String, Int)] = rdd.aggregateByKey(0)(
(x, y) => {
x + y
},
(x, y) => {
x + y
}
)
println(rdd1.collect().mkString(","))
val rdd2: RDD[(String, Int)] = rdd.foldByKey(0)(_ + _ )
println(rdd2.collect().mkString(","))
5.5.3.19 combineByKey
--1. 算子:combineByKey(形参)
--2. 形参: 相同key内进行操作
参数1:createCombiner: V => C,表示将计算的第一个值进行结构转化
形参:相同key组内的第一个value元素
返回:value经过转换后的数据
参数2:mergeValue: (C, V) => C,表示分区内的计算规则
形参:参数1为经过处理后value,参数2为组内的一个一个的value
返回:value经过处理后的数据
参数3:mergeCombiners: (C, C) => C):表示分区间的计算规则
形参:相同key,两个经过分区内处理过的v
返回:返回两个v的处理结果
--3. 作用:均是对key为组进行合并,对value进行数据处理
--4. 使用场景:当计算时发现key的value不符合计算规则的格式时,那么选择conbineByKey
/*
需求:将数据
List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
求每个key的平均值
*/
val list = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
val rdd: RDD[(String, Int)] = sc.makeRDD(list, 2)
val rdd1: RDD[(String, (Int, Int))] = rdd.combineByKey(
v => (v, 1),
(tuple: (Int, Int), v) => (tuple._1 + v, tuple._2 + 1),
(tuple: (Int, Int), tuple2: (Int, Int)) => {
(tuple._1 + tuple2._1, tuple._2 + tuple2._2)
}
)
val mapRDD: RDD[(String, Int)] = rdd1.map {
case (word, (sum, count)) => (word, sum / count)
}
println(mapRDD.collect().mkString(","))
![image-20200606000722774](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200606000722.png)
5.5.3.20 几个ByKey的区别
-- reduceByKey、foldByKey、aggregateByKey、combineByKey的区别
1.从源码的角度发现,如上4个算子底层逻辑是相同,唯一不同的区别是参数不同。
参数1: createCombiner,分区内每个key的第一个v的转换逻辑
参数2: mergeValue,分区内部的计算逻辑
参数3: mergeCombiners,分区间的计算逻辑
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)
--2.reduceByKey:
源码如下:
--参数1:没有任何的转换,对key的第一个value没有转换
--参数2和参数3相同,即分区内和分区间的计算逻辑保持一致。
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
--3.aggregateByKey
源码如下:
--参数1:传递的初始值会和每一个key的第一个value按照分区内计算逻辑进行计算
--参数2:分区内计算逻辑
--参数3:分区间的计算逻辑
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),cleanedSeqOp, combOp, partitioner)
--4.foldByKey
源码如下:
--参数1:传递的初始值会和每一个key的第一个value按照分区内计算逻辑进行计算
--参数2和参数3一致:分区内和分区间的计算逻辑保持一致
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner)
--5.combineByKey
源码如下:
--参数1:分区内每个key的第一个v的转换逻辑,所以去无需传递初始值
--参数2:分区内计算逻辑
--参数3:分区间的计算逻辑
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)
5.5.3.21 sortByKey
--1. 算子:sortByKey(形参)
--2. 形参:有两个形参,均有默认值:
形参1:ascending: Boolean = true,
排序的顺序,默认是升序,如果需要降序,则输入false
形参2:numPartitions: Int = self.partitions.length:
排序以后分区的数量,默认等于上一个rdd的分区的数量。
--3. 作用:根据key进行排序,默认是升序
--4. 说明:
还可以自定义分区的规则。步骤:
1.继承与ordered,并混入serializable
2.重写compare方法,指定排序比较的规则
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("b", 4)), 2)
println(rdd.sortByKey().collect().mkString(","))
val user1 = User("scala", 20)
val user2: User = User("scala", 21)
val user3: User = User("hadoop", 25)
val rdd2: RDD[(User, Int)] = sc.makeRDD(List((user1, 1), (user2, 2),(user3,3)),3)
println(rdd2.sortByKey().collect().mkString(","))
}
//自定义排序方式
case class User(name:String,age:Int) extends Ordered[User] with Serializable{
override def compare(that: User): Int = {
if (this.name > that.name){
1
}else if (this.name == that.name){
this.age - that.age
}else{
-1
}
}
}
5.5.3.22 join
--1. 算子:Join(形参)
--2. 形参:(other: RDD[(K, W)]): 另外一个RDD
--3. 算子的返回值:RDD[(K, (V, W))]
返回一个元组:
元组的第一个元素:两个rdd连接的Key
元组的第二个元素:相同key的value,一个来自当前RDD,一个来自另外一个RDD
--4. 作用:将两个RDD中,key相同的value一一进行连接,类似mysql中的join,会出现笛卡尔积错误
--5. 说明:
情况1:如果当前RDD中key在连接的RDD中没有,那么这条数据就不会被关联,数据则没有
情况2:如果当前RDD中相同的Key有多条数据,且另外一个RDD与子相同的key也有多条数据,那么就出现了笛卡尔积错误
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",1)))
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a",21),("b",2),("c",2)))
println(rdd.join(rdd1).collect().mkString(","))//(a,(1,21)),(b,(1,2))
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",1),("a",2)))
val rdd3: RDD[(String, Int)] = sc.makeRDD(List(("a",21),("b",2),("c",2),("a",2)))
println(rdd2.join(rdd3).collect().mkString(","))
//(a,(1,21)),(a,(1,2)),(a,(2,21)),(a,(2,2)),(b,(1,2))
5.5.3.23 Left/rightOuterJoin
--1. 算子:LeftOuterJoin/RightOuterJoin
--2. 形参:另外一个RDD
--3. 类似mysql中的左外连接和右外连接,同样会出现笛卡尔积错误
--4. 注意返回值:
如果两个RDD有相同的key,则为:(a,(1,Some(21)))
如果主RDD中的key,在从RDD没有对应的key,则为:(d,(2,None))
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",1),("d",2),("a",2)))
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a",21),("b",2),("a",2)))
println(rdd.leftOuterJoin(rdd1).collect().mkString(","))
//(a,(1,Some(21))),(a,(1,Some(2))),(a,(2,Some(21))),(a,(2,Some(2))),(b,(1,Some(2))),(d,(2,None))
println(rdd.rightOuterJoin(rdd1).collect().mkString(","))
5.5.3.24 cogroup
--1. 算子:cogroup(形参) -->co 是connect的简写
--2. 形参:other: RDD[(K, W)]:另外一个RDD
--3. 返回值:(K,(Iterable<V>,Iterable<W>)),是一个元组
元组的第一个元素:RDD的key
元组的第二个元素:还是一个元组
元组的第一个元素:当前相同key的所有value的集合,是一个迭代器
元组的第二个元素:另外一个RDD的key的所有value的集合,是一个迭代器
--4. 作用:将两个RDD中,key相同的value组合在一起。
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",1),("c",1)))
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a",21),("b",2),("b",2),("d",1)))
rdd.cogroup(rdd1).collect().foreach(println)
//(a,(CompactBuffer(1),CompactBuffer(21)))
//(b,(CompactBuffer(1),CompactBuffer(2, 2)))
//(c,(CompactBuffer(1),CompactBuffer()))
//(d,(CompactBuffer(),CompactBuffer(1)))
5.5.3.25 行动算子
--什么是行动算子:
a、所谓行动算子,就是不会产生一个新的RDD,而是触发作业的执行
b、而之前的转换算子,只是功能的扩展和包装,不会触发作业的执行
c、行动算子执行以后,会获取当前作业的执行结果
d、Spark的行动算子执行时,会产生job对象,然后提交job对象
5.5.3.26 reduce
--1. 算子:reduce
--2. 作用:与scala中的reduce使用方式相同
val rdd: RDD[Int] = sc.makeRDD(List(1, 4, 5, 2), 2)
val sum: Int = rdd.reduce(_ + _)
println(sum)
5.5.3.27 collect
--1. 算子:collect
--2. 作用:采集数据,纯粹的将所有分区的计算结果拉取到当前的节点,可能会出现内存溢出的情况
rdd.collect().foreach(println)
5.5.3.28 count
--1. 算子:count
--2. 作用:返回当前RDD中数据的个数
println(rdd.count())
5.5.3.29 first
--1. 算子:first
--2. 作用:获取RDD数据集中的第一个数据
println(rdd.first())
5.5.3.30 take
--1. 算子:take
--2. 返回当前RDD的前几个数据,是一个Array集合
rdd.take(2).foreach(println)
5.5.3.31 takeOrdered
--1. 算子:takeOrdered
--2. 先对RDD中的数据进行排序,默认升序,然后取前几个数据
rdd.takeOrdered(2).foreach(println)
5.5.3.32 sum
--1. 算子:sum
--2. 作用:对RDD集合数据求和,但是返回值类型为Double类型
println(rdd.sum())
5.5.3.33 aggregate
--1. 算子:aggregate ,与aggregateByKey的用法类似
--2. 形参:有两个形参列表:
形参列表1: (zeroValue: U) 初始值
形参列表2:(seqOp: (U, T) => U, combOp: (U, U) => U)
参数1:分区内的计算逻辑
参数2:分区间的计算逻辑
--3. 与aggregateByKey的区别:
aggregate的初始值参与分区间和分区内的计算
aggregateByKey的初始值只参与分区内的计算
--4. 重点:初始值可以和集合数据的类型不一致,分区内和分区间最后的结果和初始值的类型一致,说明是可以起到转换结构的作用。
println(rdd.aggregate(0)(_ + _, _ + _)) //12
println(rdd.aggregate(10)(_ + _, _ + _)) //42
5.5.3.34 fold
--1. 算子:fold
(zeroValue: T)(op: (T, T) => T): T
--2. 作用:当aggregate的分区间和分区内的计算逻辑相同时,可以使用fold进行简化
--3. 要求:初始值数据类型和集合中的数据类型保持一致
println(rdd.fold(10)(_ + _))//42
5.5.3.35 mapValues
--1. 算子:mapValues(形参)
--2. 形参:(f: V => U),是一个函数,仅对value进行处理,key不变
--3. 返回值:kv类型的RDD。
--4. 作用:对value的数据结构进行转换
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",1)))
println(rdd1.mapValues(_ + 1).collect().mkString(","))
5.5.3.36 countBykey
--1. 算子:countBykey
--2. 形参:无参
--3. 返回值:Map[K, Long],返回一个元组
元组的第一个元素:RDD中的Key
元组的第二个元素:RDD中Key出现的次数
--4. 作用:计算key出现的次数
--5. 底层源码:
a、调用mapValues算子,将V转换为1
b、然后再调用reduceByKey,将相同的key的value值进行相加
c、最后转换成map结构
self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
println(rdd1.countByKey())//Map(a -> 2, b -> 1)
5.5.3.37 countByValues
--1. 算子:countByValues
--2. 形参:无
--3. 返回值:Map[T, Long] ,返回一个元组
元组的第一个元素:原数据的kv
元组的第二个元素:原数据kv出现的次数
--4. 作用:计算value出现的次数
--5. 源码:
a、将原数据进行处理,(k,v) =>(v,null)
b、调用countByKey算子,计算v出现的次数
map(value => (value, null)).countByKey()
println(rdd1.countByValue())
//Map((a,1) -> 1, (b,1) -> 1, (a,2) -> 1)
5.5.3.38 save
--1. 算子:
saveAsTextFile
savaAsObjectFile
savaAsSequenceFile
--2. 形参:无
--3. 作用:将RDD的结果以不同的形式保存到文件中,其中savaAsSequenceFile要求数据结构的kv键值对
5.5.3.39 foreach
--1. 算子:foreach
--2. 形参:无
--3. 作用:遍历RDD中的数据
--4. 算子与方法的区别:
a、rdd.collect().foreach(println) -->foreach:方法
b、rdd.foreach(println) -->foreach:算子
1.只要看到rdd的算子,一定要想到两个块,Driver和Executor
2.算子逻辑代码是在分布式计算节点executor中执行的,算子以外代码是在Driver端执行
3.foreach是算子时,那么将在不同的executor中同时执行,互不影响。
4.foreach是方法时,那么是在当前的节点的内存中完成数据的循环。
5.结果就是:两种方法的结果的顺序会不同。
rdd.collect().foreach(println)//1 4 5 2
println("=====================")
rdd.foreach(println) //5 2 1 4
5.5.4 RDD的序列化
1.如果算子中使用了算子以外的对象,那么在执行时,需要保证这个对象能序列化
2.样例类自动混入了可序列化特质
3.Spark算子的操作都是闭包,所以闭包有可能用到外部的变量,如果包含了外部的变量,那么一定要保证这个外部变量可序列化,所以Spark在提交作业之前,应对闭包中的变量进行检测,这个操作我们称为闭包检测。
- Kryo序列化框架
参考地址: https://github.com/EsotericSoftware/kryo
Java的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。"当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化"。
"注意:即使使用Kryo序列化,也要继承Serializable接口。
5.5.5 RDD的依赖关系
- toDebugString:打印当前RDD的血缘关系
- 获取每个RDD的血缘关系
//3.1 读取数据
val str: RDD[String] = sc.textFile("input")
println(str.toDebugString)
//3.2 扁平化数据
val words: RDD[String] = str.flatMap(_.split(" "))
println(words.toDebugString)
//3.3 分组
val wordtocount: RDD[(String, Iterable[String])] = words.groupBy(word => word)
println(wordtocount.toDebugString)
//3.4 结构化处理=
val wordcount: RDD[(String, Int)] = wordtocount.map {
case (word, iter) => (word, iter.size)
}
println(wordcount.toDebugString)
- 打印结果:
(4) input MapPartitionsRDD[1] at textFile at Spark_WordCount.scala:25 []
| input HadoopRDD[0] at textFile at Spark_WordCount.scala:25 []
(4) MapPartitionsRDD[2] at flatMap at Spark_WordCount.scala:29 []
| input MapPartitionsRDD[1] at textFile at Spark_WordCount.scala:25 []
| input HadoopRDD[0] at textFile at Spark_WordCount.scala:25 []
(4) ShuffledRDD[4] at groupBy at Spark_WordCount.scala:33 []
+-(4) MapPartitionsRDD[3] at groupBy at Spark_WordCount.scala:33 []
| MapPartitionsRDD[2] at flatMap at Spark_WordCount.scala:29 []
| input MapPartitionsRDD[1] at textFile at Spark_WordCount.scala:25 []
| input HadoopRDD[0] at textFile at Spark_WordCount.scala:25 []
(4) MapPartitionsRDD[5] at map at Spark_WordCount.scala:38 []
| ShuffledRDD[4] at groupBy at Spark_WordCount.scala:33 []
+-(4) MapPartitionsRDD[3] at groupBy at Spark_WordCount.scala:33 []
| MapPartitionsRDD[2] at flatMap at Spark_WordCount.scala:29 []
| input MapPartitionsRDD[1] at textFile at Spark_WordCount.scala:25 []
| input HadoopRDD[0] at textFile at Spark_WordCount.scala:25 []
1. RDD的依赖关系:RDD与直接上级RDD之间的关系
2. RDD的血缘关系:包含直接依赖和间接依赖
3. 如果Spark的计算过程中某个节点计算失败,那么框架会重新计算
4. Spark想要重新对失败的task重新计算,那么需要知道数据来源以及需要知道数据需要经过哪些计算
5. RDD不保存数据,仅保持计算的逻辑
6. 依赖关系主要是用来解决容错计算
- dependencies:获取当前RDD与直接上级的RDD的依赖关系
- 获取直接上级RDD的依赖关系
//3.2 扁平化数据
val words: RDD[String] = str.flatMap(_.split(" "))
println(words.dependencies)
//3.3 分组
val wordtocount: RDD[(String, Iterable[String])] = words.groupBy(word => word)
println(wordtocount.dependencies)
//3.4 结构化处理
val wordcount: RDD[(String, Int)] = wordtocount.map {
case (word, iter) => (word, iter.size)
}
println(wordcount.dependencies)
- 打印结果
List(org.apache.spark.OneToOneDependency@4ca8dbfa)
List(org.apache.spark.ShuffleDependency@4d654825)
List(org.apache.spark.OneToOneDependency@62db0521)
- 依赖关系解析
-- 当前RDD与上级RDD有两种关系:
--第一种:OneToOneDependency
指上游RDD的一个分区最多只能被下游RDD一个分区使用,称之为窄依赖,类比独生子女
--第二种:ShuffleDependency
指上游RDD的一个分区被下游RDD的多个分区使用,称之为宽依赖,形成1对n的关系
![image-20200608013041068](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200608013041.png)
![image-20200608013109377](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200608013109.png)
![image-20200608013128090](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200608013128.png)
5.5.6 RDD阶段的划分
阶段:stage
1. 如果Spark计算过程中存在落盘的操作,那么就应该划分阶段
2. 如果执行过程中没有落盘操作,那么就应该是一个完成的阶段
3. 如果执行过程有落盘的操作,那么应该task应该一分为二
spark中阶段的划分取决于shuffle依赖的个数:
阶段的个数 = shuffle依赖的数量 + 1
![image-20200608013517259](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200608013517.png)
5.5.7 RDD任务的划分
--几个概念
1. Application:应用程序,初始化一个sparkContext就会产生一个application
2. job:一个行动算子就会产生一个job
3. stage:宽依赖的个数 + 1
4. task:一个stage阶段中,最后一个RDD的分区个数就是task的个数。
-- 特列:在转换算子中如果调用了行动算子,那么在转换算子的内部也会有job的提交。
application > job > stage > task ,每一层都是1 对N 的关系。
5.5.8 RDD的持久性
5.5.8.1 问题
-- 问题
RDD中是不保存数据的,所以如果多个RDD需要共享其中一个RDD的数据,那么必须重头执行,效率非常低,所以需要将一些重复性比较高,比较耗时的操作的结果缓存起来,提高效率,这样,不需要重头执行。
![image-20200608192945181](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200608192945.png)
![image-20200608193135788](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200608193135.png)
5.5.8.2 cache
-- 上述问题的解决方法:将计算结果进行缓存,重复使用,提高效率
-- cache解析:
1. 缓存cache底层其实调用的persist方法
2. persist方法在持久化数据时会采用不同的存储级别对数据进行持久化操作
3. cache缓存的默认操作就是将数据保存到内存
4. cache存储的数据在内存中,如果内存不够用,executor可以将内存的数据进行整理,然后可以丢弃数据。
5. 如果由于executor端整理内存导致缓存的数据丢失,那么数据操作依然要重头执行。
6. 如果cache后的数据重头执行数据操作的话,那么必须要遵循血缘关系,所以cache操作不能删除血缘关系。
7. cache操作在行动算子执行后,会在血缘关系中增加和缓存相关的依赖
8. cache操作不会切断血缘,一旦发生错误,可以重新执行。
- 验证缓存的现象
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
val rdd1 = rdd.map(num => {
println("map...." + num)
num
})
//将RDD的计算结果缓存到内存中
rdd1.cache()
rdd1.map(_ * 2).collect().foreach(print)
println("\n*************")
rdd1.collect().foreach(print)
// 未加rdd1.cache()代码,打印结果为:
map....3
map....1
map....4
map....2
2468
*************
map....1
map....2
map....3
map....4
1234
// 加rdd1.cache()代码以后,打印结果为:
map....3
map....1
map....4
map....2
2468
*************
1234
- 验证缓存的依赖关系
val rdd1 = rdd.map(num => num)
rdd1.cache()
println(rdd1.toDebugString)
rdd1.collect()
println(rdd1.toDebugString)
//打印结果:
(2) MapPartitionsRDD[1] at map at Spark3_persist.scala:20 [Memory Deserialized 1x Replicated]
| ParallelCollectionRDD[0] at makeRDD at Spark3_persist.scala:18 [Memory Deserialized 1x Replicated]
(2) MapPartitionsRDD[1] at map at Spark3_persist.scala:20 [Memory Deserialized 1x Replicated]
| CachedPartitions: 2; MemorySize: 48.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ParallelCollectionRDD[0] at makeRDD at Spark3_persist.scala:18 [Memory Deserialized 1x Replicated]
5.5.8.3 persist
-- 问题:默认的缓存是存储在Executor端的内存中,数据量大的时候,该如何处理?
可以使用persist,将数据保存到当前RDD的磁盘中,但是依然有数据丢失风险。所以我们一般也不这么使用。
rdd1.persist()
- persist的数据保存级别
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
5.5.8.4 checkPoint
1. 将比较耗时,比较重要的数据一般会保存到分布式文件系统中。
2. 使用checkpoint方法将数据保存到文件中
SparkException: Checkpoint directory has not been set in the SparkContext
3. 执行checkpoint方法前应该设定检查点的保存目录
4. 检查点的操作中为了保证数据的准确性,执行时,会启动新的job
5. 为了提高性能,检查点操作一般会和cache联合使用,先将数据缓存到内存中,这样再进行checkpoint,这样在执行checkpoint的
的时候,就不需重头执行。
//设定检查点的保存目录
sc.setCheckpointDir("output")
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
val rdd1 = rdd.map(num =>{
println("map...")
num
}
)
//缓存和检查点联合使用
rdd1.cache().checkpoint()
println(rdd1.map(_ * 2).collect().mkString(","))
println(rdd1.collect().mkString(","))
5.5.8.5 缓存和检查点区别
-- 缓存和检查点区别
1. Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
2. Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
3. 建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD
5.5.9 分区器
-- 1. 分区器类别:
a、Hash分区,"默认分区器"
b、Range分区
c、用户自定义分区
-- 2. 分区器的作用:
直接决定了RDD分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了Reduce的个数
--3. 说明
a、只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
b、每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
-- 4 分区规则:
Hash分区:对于给定的key,计算其hashCode,并除以分区个数取余
Range分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序