SparkStreaming
SparkStreaming
一、SparkStreaming 介绍
--0. 几个概念
a、'实时':数据处理的延迟,以毫秒级进行响应
b、'离线':数据处理的延迟,以小时、天、月、年为级别响应
c、'批处理':数据处理的方式,一次处理一批数据
d、'流式处理':数据处理的方式,和水流相似,来一条数据处理一条数据,来一点处理一点,一个一个的处理。
--1. 什么是SparkStreaming
a、流式数据处理,但是实际情况是无法到达真正流式处理,因为底层还是RDD,不能来一条数据处理一条数据
那样IO会非常多。
b、接收来自kafka、flume、Twitter等框架的数据
c、是"准实时""微批次"数据处理引擎,使用类似RDD算子进行数据处理,这里的处理的方法称为原语,数据处理的逻辑
是相对比较简单的,如果处理的逻辑很复杂,计算时间就会比较长,那么就不能算是准实时的处理
"实时数仓将是未来的方向"
d、'处理的数据结果集'可以发送到hdfs、mysql等一些框架中进存储。
--2. 基本流程
a、内部使用离散化流:用DStream表示。
b、DStream是随时间推移而收到数据的序列,在内部,每个时间区间收到的数据作为RDD存在
c、DStream是作为这些RDD所组成的序列
--3. 和SparkCore/SparkSQL之间的区别
a、从环境对象上的区别:
SparkCore --> SparkContext,使用sc表示
SparkSQL --> SparkSeeion,内部是对SparkContext的封装,使用spark表示
SparkStreaming--> StreamingContext,内部是对SparkContext的封装
b、处理抽象的区别:
SparkCore --> RDD
SparkSQL --> DataFrame / DataSet
SparkStreaming --> DStream
-- 4. 关键词:
在SparkStreaming中:
a、准实时
b、微批次
c、采集周期:每采集一次数据,就会生成一个RDD
d、DStream
--5. 框架图
a、在executor中有一个采集器,通过采集器,按照采集周期从指定的地方进行采集数据
b、生成的rdd传递给到Driver端,driver端的StreamingContext用来处理所有数据的Spark作业。
c、底层就封装成了RDD,后面的数据处理流程就和SparkCore处理的流程一致。
--6. 背压机制:
a、背景:
由于数据是源源不断向采集器中发送,那么当生产数据的速度>数据处理的速度时,就会产生背压。
b、解决方法:
在spark1.5之前:
通过参数“spark.streaming.receiver.maxRate”来限制接收器接收数据的效率,但是当生产数据的速度 > 设定接收 数据的速度,且集群处理数据的速度>设定接收数据的速度,那么则产生了性能的浪费。
Spark1.5以后:
通过参数“spark.streaming.backpressure.enabled”动态的协调数据的接收速率和资源处理能力。
c、背压机制:根据jobSchedule返回作业的执行信息来动态调整接收器接收数据的速度。
![image-20200617020834367](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200617020835.png)
![image-20200714002519802](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200714002526.png)
二、DStream入门
2.1 连接SparkContext
// 连接SparkStreaming
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")
/*
1.方法:StreamingContext(形参)
2.形参:
形参1:conf: SparkConf:spark配置对象
形参2:batchDuration: Duration:采集时间
*/
val ssc = new StreamingContext(sparkConf,Seconds(5))
2.2 案例
-- 1. Spark从socket中获取数据:一行一行的获取
-- 2. Driver程序执行时,streaming处理过程不能结束
-- 3. 采集器在正常情况下启动后就不应该停止,除非特殊情况
-- 4. 采集器位于一个executor中,是一个线程,执行时需要一个核,如果设定的总核数为1时,那么在运行时因为没有核数,所以不会有打印结果,所以sparkStreaming使用的核数至少为2个
-- 5. print()方法,默认是打印10行结果
-- 6. netcat的指令:
在Windows下:nc -lp 9999
在linux下: nc -lk 9999
- 导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.5</version>
</dependency>
- 代码实现
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")
val ssc = new StreamingContext(sparkConf,Seconds(5))
// 需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数
// 1. 获取netcat工具9999端口的连接,并开始接收数据
// 从socket中获取数据:一行一行的获取
val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost",9999)
// 2. 数据处理
val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))
val wordToSumDS: DStream[(String, Int)] = wordDS.map((_,1)).reduceByKey(_ + _ )
// 3. 打印数据
wordToSumDS.print()
// 4. Driver程序执行时,streaming处理过程不能结束
// 采集器在正常情况下启动后就不应该停止,除非特殊情况
// 启动采集器
ssc.start()
// 等待采集器的结束
ssc.awaitTermination()
- wordcount解析
a、采集周期时间之间,每一个采集周期生成一个RDD,按照时间的顺序依次进行
b、在每一个采集周期内,会执行wordcount计算,最终得出:统计出每一个采集周期时间的wordcount
三、Dstream的创建
3.1 RDD队列
-- 1. 队列:
a、使用场景:测试
b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理
val sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")
val ssc = new StreamingContext(sparkconf,Seconds(3))
// 创建一个队列对象,队列中存放的是RDD
val queue = new mutable.Queue[RDD[String]]()
// 通过队列创建DStream
val queueDS: InputDStream[String] = ssc.queueStream(queue)
queueDS.print()
// 启动采集器
ssc.start()
//这个操作之所以放在这个位置,是为了模拟流式的感觉,数据源源不断的生产
for(i <- 1 to 5 ){
// 循环创建rdd
val rdd: RDD[String] = ssc.sparkContext.makeRDD(List(i.toString))
// 将RDD存放到队列中
queue.enqueue(rdd)
// 当前线程休眠1秒
Thread.sleep(6000)
}
// 等待采集器的结束
ssc.awaitTermination()
}
3.2 textFileStream
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFileStream")
val ssc = new StreamingContext(sparkConf,Seconds(3))
//从文件中读取数据
val textDS: DStream[String] = ssc.textFileStream("in")
textDS.print()
// 启动采集器
ssc.start()
// 等待采集器的结束
ssc.awaitTermination()
3.3 DIY采集器
--1. 自定义采集器
--2. 什么情况下需要自定采集器呢?
比如从mysql、hbase中读取数据。
采集器的作用是从指定的地方,按照采集周期对数据进行采集。
目前有:采集kafka、采集netcat工具的指定端口的数据、采集文件目录中的数据等
--3. 自定义采集器的步骤,模仿socketTextStream
a、自定采集器类,继承extends,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
b、重写onStart和onStop方法
onStart:采集器的如何启动
onStop:采集的如何停止
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DIY")
val ssc = new StreamingContext(sparkConf, Seconds(3))
// 获取采集的流
val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReciver("localhost",9999))
ds.print()
ssc.start()
ssc.awaitTermination()
}
// 继承extends Reciver,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
class MyReciver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
private var socket: Socket = _
def receive = {
// 获取输入流
val reader = new BufferedReader(
new InputStreamReader(
socket.getInputStream,
"UTF-8"
)
)
// 设定一个间接变量
var s: String = null
while (true) {
// 按行读取数据
s = reader.readLine()
if (s != null) {
// 将数据进行封装
store(s)
}
}
}
// 1. 启动采集器
override def onStart(): Unit = {
socket = new Socket(host, port)
new Thread("Socket Receiver") {
setDaemon(true)
override def run() {
receive
}
}.start()
}
// 2. 停止采集器
override def onStop(): Unit = {
socket.close()
socket = null
}
}
3.4 kafka数据源【重点】
-- DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。
-- 配置信息基本上是固定写法
- 实现方式
// TODO Spark环境
// SparkStreaming使用核数最少是2个
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
// TODO 使用SparkStreaming读取Kafka的数据
// Kafka的配置信息
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop105:9092,hadoop106:9092,hadoop107:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara)
)
// 获取数据,key是null,value是真实的数据
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
valueDStream.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
ssc.start()
// 等待采集器的结束
ssc.awaitTermination()
四、DStream的转换
4.1 介绍
--1. Dstream:DStream的操作和RDD类似,Transformations(转换)和Output Operations(输出)
a、Transformations(转换):类似RDD的转换算子,还有一些比较特殊的原语:
updateStateByKey()、transform()以及各种Window相关的原语
b、Output Operations(输出):类似RDD的行动算子
--2.转换分为:
a、无状态转换:计算结果完成以后,不会保存数据
b、有状态转换:把中间的数据保存起来
-- 3. 代码执行的位置:
a、rdd算子外的代码在driver端执行
b、rdd算子内的代码在executor端执行
"每一个采集周期会生产一个rdd
4.2 无状态转换
--1. 什么是无状态转换:
就是将简单的RDD操作转化操作应用到每个批次上,也就是转化DStream每一个RDD。
--2. 部分无状态转换的操作有
map、flatmap、filter、repartition、reducebykey
--3. 注意:
a、针对键值对的DStream操作,如reduceBykey需要添加import StreamingContext._才能在Scala中使用。
b、reduceBykey会规约每一个时间区间中的数据,但是不会规约不同时间区间的数据
--4. 部分无状态转换算子如下:
![image-20200622012037225](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200622012037.png)
4.2.1 transform
--1. 方法:transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
--2. 泛型:可以指定一个泛型,这个泛型则规定了转换后的RDD和DStream的类型
--3. 形参:是一个函数:transformFunc
函数的形参:每个采集周期生成的RDD;
函数的返回值:RDD[U],返回一个RDD,RDD的数据类型和指定的泛型一致。
--4. 操作的作用:
每个采集周期生成的RDD都会调用一次这个操作。主要是进行RDD的转换,在转换的过程,可以对数据进行处理。
--5. 什么时候使用?
与采集周期相关的时候使用,这里主要的因为代码执行的次数有关
//code1
transform(rdd => {
// code2
rdd.flatMap{
// code3
}
)
"说明":
code1:在driver中执行,只会执行1次
code2:每一个采集周期会生产一个RDD,每生成一个RDD会执行一次
code3:每一个RDD中的数据,会执行一次
val transformDS: DStream[(String, Int)] = ds.transform(rdd => {
val value: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
value
})
![image-20200714011137027](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200714011137.png)
![image-20200714011247518](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200714011247.png)
4.2.2 join
--无状态转换操作之join
1. 操作:join
2. 说明:必须是kv形式的RDD
3. 作用:
a、两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。
b、计算过程就是对当前批次的两个流中各自的RDD进行join,与两个RDD的join效果相同。
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("join")
val ssc = new StreamingContext(sparkConf,Seconds(3))
val ds1: ReceiverInputDStream[String] = ssc.socketTextStream("localhost",9999)
val ds2: ReceiverInputDStream[String] = ssc.socketTextStream("localhost",8888)
val ds11: DStream[(String, Int)] = ds1.map((_,1))
val ds22: DStream[(String, Int)] = ds2.map((_,1))
ds11.join(ds22).print()
ssc.start()
ssc.awaitTermination()
4.3 有状态转换
有状态的转换:将spark每个采集周期处理结果保存起来,然后再一次数据的处理中可以继续使用
4.3.1 updateStateByKey
--1. updateStateByKey(形参)
--2. 泛型:[S: ClassTag]:指定缓冲区中数据的泛型和操作返回值的v返回值类型
--3. 形参:是一个函数,
函数有两个形参
形参1:seq:Seq[V]:相同key的value的集合
形参2:buffer:Option[S]:缓冲区,相同key的缓冲区数据,有可能为空
函数的返回值:Option[S]
--4. 操作的返回值:DStream[(K, S)]:
是一个新的DStream,其内部的RDD序列是由每个时间区间对应的(键,状态)对组成的
--5. 作用:
计算的最后结果需要使用到前面的采集周期的计算结果,如计算wordcount
--6. 说明:
a、用于键值对形式的DStream
b、使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态
![image-20200714012326922](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200714012326.png)
- 代码实现
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("state")
val ssc = new StreamingContext(sparkConf,Seconds(3))
//设置检查点的目录
ssc.sparkContext.setCheckpointDir("./ck")
val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost",9999)
val stateDS: DStream[(String, Int)] = ds.flatMap(_.split(" ")).map((_, 1)).updateStateByKey {
case (values: Seq[Int], buffer: Option[Int]) => {
// 对相同key的value进行求和
val sum: Int = values.sum
// 将相同key的value计算结果保存到buffer中
Option(buffer.getOrElse(0) + sum)
}
}
stateDS.print()
ssc.start()
ssc.awaitTermination()
4.3.2 reduceByKeyandwindow
![image-20200714012641927](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200714012641.png)
![image-20200714013055313](https://lian-zp.oss-cn-shenzhen.aliyuncs.com/pic GO/20200714013055.png)