Spark编程2
...大约 5 分钟
Spark编程2
接2.Spark架构及编程
五、Spark核心编程
5.6 累加器
5.6.1 累加器基本介绍
-- 1.什么是累加器?
分布式共享只写变量,使用累加器完成数据的累加。
1. 分布式:每一个executor都拥有这个累加器
2. 共享:Driver中的变量原封不动的被executor拥有一份副本
3. 只写:同一个executor中可以对这个变量进行改值,其他的executor不能读取。
-- 2.累加器用来解决什么问题?
1. 想通过没有shuffle过程的算子来实现数据的累加" 所谓累加器,一般作用就是累加(可以是数值的累加,也可以是数据的累加)",我们实现的方式是:在driver代码中,声明一个变量"类似一个容器",来进行接收累加的结果,但是发现,当前情况,driver端的变量传递给executor以后,并在executor进行计算,该变量无法返回给到driver
原因是:
a、driver端能够传递给到executor,是因为存在闭包的原因
b、executor不能传递过来是因为没有闭包的原因。
因此:我们使用了累计器的方式,将上诉声明的变量封装成累加器的方式,使的executor端计算的累加结果能够传回给到driver端。
-- 3.累计器实现过程:
累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge
![image-20200608203039476](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200608203039.png)
5.6.2 累加器的使用
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
//1.声明累加器
val sum: LongAccumulator = sc.longAccumulator("sum")
rdd.foreach(num =>{
//2. 调用累加器
sum.add(num)
})
// 3. 获取累加器的值
println(sum.value)
-- 一共有三种自带的累加器类型
longAccumulator 、doubleAccumulator()、collectionAccumulator()
5.6.3 累加器的具体流程
1. 将累加器变量注册到spark中
2. 执行计算时,spark会将累加器发送到executor执行计算
3. 计算完毕后,executor会将累加器的计算结果返回到driver端。
4. driver端获取到多个累加器的结果,然后两两合并。最后得到累加器的执行结果
![image-20200608204813705](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200608204813.png)
5.6.4 自定累加器
--步骤:
1. 自定义累加器类,继承extends AccumulatorV2[IN, OUT]
2. IN:累加器输入数据的类型
OUT:累加器返回值的数据类型
需指定如上两个参数的数据类型
3. 重写AccumulatorV2中6个方法
--方法1:判断当前的累加器是初始化
override def isZero: Boolean = ???
--方法2:复制一个累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = ???
--方法3:重置累加器
override def reset(): Unit = ???
--方法4:向累加器中增加值
override def add(v: String): Unit = ???
--方法5:合并当前累加器和其他累加器,两两合并,此方法由Driver端调用,合并由executor返回的多个累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = ???
--方法6: 返回当前累加器的值
override def value: mutable.Map[String, Int] = ???
4. 在Driver端的代码
a、 创建累加器
b、 注册累加器
c、 使用累加器
d、 获取累加器的值
-- 说明:方法1/2/3在闭包检测和序列化时会使用到。依次进行调用,调用的顺序是:
copy --> reset --> isZero
- 自定义累加器实现wordcount
object Scala3_ACC {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("acc")
val sc = new SparkContext(sparkConf)
val rdd= sc.makeRDD(List("spark hadoop", "scala", "java hello scala"))
//1. 创建累加器
val acc = new WordCountAccumulator
//2. 注册累加器
sc.register(acc)
//3. 调用累加器
rdd.flatMap(_.split(" ")).foreach(
word => acc.add(word)
)
//4. 获取累加器的值
println(acc.value)
sc.stop()
}
//自定义累加器
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {
private var wordCountMap: mutable.Map[String, Int] = mutable.Map[String, Int]()
//方法1:判断当前的累加器是初始化
override def isZero: Boolean = {
wordCountMap.isEmpty
}
//方法2:复制一个累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
new WordCountAccumulator
}
//方法3:重置累加器
override def reset(): Unit = {
wordCountMap.clear()
}
//-方法4:向累加器中增加值
override def add(word: String): Unit = {
wordCountMap(word) = wordCountMap.getOrElse(word, 0) + 1
}
//方法5:合并当前累加器和其他累加器,两两合并,此方法由Driver端调用,合并由executor返回的多个累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
val map1 = wordCountMap
val map2 = other.value
wordCountMap = map1.foldLeft(map2)((map, kv) => {
map(kv._1) = map.getOrElse(kv._1, 0) + kv._2
map
}
)
}
//方法6:返回当前累加器的值
override def value: mutable.Map[String, Int] = {
wordCountMap
}
}
}
5.7 广播变量
5.7.1 介绍
--1. 当前现状
一个Executor有多个core,所以可以同时执行多个task,当Driver需要传递一个数据量很大的对象时,由于每一个task中都含有这么一个变量,这样一来,数据在executor中就存在多份。
--2. 导致问题:
1.在Executor数据冗余
2.Executor内存可能溢出
3.如果存在shuffle阶段,数据传输效率将会非常低。
为了解决出现这种性能的问题,可以将数据独立出来,在executor的内存中只保留一份,防止shuffle操作。
--3. 由于数据是保存在task中,如何独立出来呢?
使用广播变量的模式。
--4. 什么是广播变量?
分布式共享只读变量
1. 只读:只能被访问,不能被修改
2. 共享:可以被当前executor中所有task访问,还可以被其他的executor访问
--5. 广播变量的声明和使用
val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
--声明广播变量
val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
--使用广播变量
for ((k, v) <- broadcast.value)
![image-20200608213525912](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200608213525.png)
5.7.2 编程实现
val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
// 声明广播变量
val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
case (key, num) => {
var num2 = 0
// 使用广播变量
for ((k, v) <- broadcast.value) {
if (k == key) {
num2 = v
}
}
(key, (num, num2))
}
}
resultRDD.collect().foreach(println)
赞助