Spark 之 WordCount
...大约 2 分钟
Spark 之 WordCount
一、14种wordcount实现方式
//数据准备及环境连接:
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("map")
val sc = new SparkContext(sparkConf)
val rdd1: RDD[String] = sc.makeRDD(List("hadoop scala", "spark spark hello", "scala"), 2)
1.1 方法1:groupBy + map
//方法1:groupBy + map
rdd1.flatMap(_.split(" "))
.groupBy(word => word)
.map {
case (word, iter) => {
(word, iter.size)
}
}
.collect()
.foreach(println)
println("=======如上方法1========")
1.2 方法2:reduceByKey
//方法2:reduceByKey
rdd1.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.collect()
.foreach(println)
println("=======如上方法2========")
1.3 方法3:aggregateByKey
//方法3:aggregateByKey
rdd1.flatMap(_.split(" "))
.map((_, 1))
.aggregateByKey(0)(_ + _, _ + _)
.collect()
.foreach(println)
println("=======如上方法3========")
1.4 方法4:foldByKey
//方法4:foldByKey
rdd1.flatMap(_.split(" "))
.map((_, 1))
.foldByKey(0)(_ + _)
.collect()
.foreach(println)
println("=======如上方法4========")
1.5 方法5:combineBykey
//方法5:combineBykey
rdd1.flatMap(_.split(" "))
.map((_, 1))
.combineByKey(v => v, (v1: Int, v2: Int) => v1 + v2, (v1: Int, v2: Int) => v1 + v2)
.collect()
.foreach(println)
println("=======如上方法5========")
1.6 方法6:groupByKey
//方法6:groupByKey
rdd1.flatMap(_.split(" "))
.map((_, 1))
.groupByKey()
.map {
case (word, iter) => (word, iter.size)
}
.collect()
.foreach(println)
println("=======如上方法6========")
1.7 方法7:countByKey
//方法7:countByKey
rdd1.flatMap(_.split(" "))
.map((_, 1))
.countByKey().foreach(println)
println("=======如上方法7========")
1.8 方法8:countByValue
//方法8:countByValue
rdd1.flatMap(_.split(" "))
.map((1, _))
.countByValue().map {
case ((num, word), count) => (word, count)
}
.foreach(println)
println("=======如上方法8========")
1.9 方法9:cogroup
//方法9:cogroup
val rdd2 = sc.makeRDD(List(("a", 0)))
rdd1.flatMap(_.split(" "))
.map((_, 1))
.cogroup(rdd2).map {
case (word, (iter1, iter2)) => {
(word, iter1.size + iter2.size)
}
}
.filter(tuple => tuple._1 != "a")
.foreach(println)
println("=======如上方法9========")
1.10 方法10:mapValues
//方法10:mapValues
rdd1.flatMap(_.split(" "))
.map((_, 1))
.groupBy(word => word)
.mapValues(_.size)
.map {
case ((word, num), count) => (word, count)
}
.collect()
.foreach(println)
println("=======如上方法10========")
1.11 方法11:reduce + foldLeft
//方法11:reduce + foldLeft
val wordcount: Map[String, Int] = rdd1.flatMap(_.split(" "))
.map(word => Map[String, Int]((word, 1)))
.reduce(
(map1, map2) => {
map1.foldLeft(map2)((map, kv) => {
val word: String = kv._1
val count: Int = kv._2
map.updated(word, map.getOrElse(word, 0) + count)
})
}
)
wordcount.foreach(println)
println("=======如上方法11========")
1.12 方法12:aggregate
// 12. aggregate
rdd.flatMap(_.split(" "))
.map((_,1))
.aggregate(mutable.Map[String,Int]())((map,kv) => {
val word: String = kv._1
val num: Int = kv._2
map(word)=map.getOrElse(word,0) + num
map
},(map1,map2)=>{
map1.foldLeft(map2){
case (map,kv)=>{
map(kv._1)=map.getOrElse(kv._1,0) + kv._2
map
}
}
}).foreach(println)
println(" ======== 方法12 ===========")
1.13 方法13:fold
// 13. fold
rdd.flatMap(_.split(" "))
.map(word => mutable.Map[String,Int]((word,1)))
.fold(mutable.Map[String,Int]())((map1,map2)=>{
map1.foldLeft(map2){
case (map,kv)=>{
map(kv._1)=map.getOrElse(kv._1,0) + kv._2
map
}
}
}).foreach(println)
println(" ======== 方法13 ===========")
1.14 方法14 :累加器
赞助