import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
-
@Autor LZH
-
@Date 2020/12/1 10:17
/
object WordCount extends App {
var conf=new SparkConf().setMaster("local[]").setAppName("WordCount")
val sc=new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(
List(
("a", 1), ("a", 2), ("c", 1),
("b", 1), ("b", 2), ("c", 2)
), 2
)
println("第1种:reduceByKey()")
rdd.reduceByKey(+).collect().foreach(println)
println("第2种:groupByKey()")
rdd.groupByKey().mapValues(_.sum).collect().foreach(println)
println("第3种:groupBy()")
rdd.groupBy(.1).mapValues(.map(._2).sum).collect().foreach(println)
println("第4种:aggregateByKey()")
rdd.aggregateByKey(0)(+,+).collect().foreach(println)
println("第5种:foldByKey()")
rdd.foldByKey(0)(+).collect().foreach(println)
println("第6种:combineByKey()")
rdd.combineByKey(
v=>v,
(x: Int, y) => x+y,
(x: Int, y: Int) => x+y
).collect().foreach(println)
println("第7种:countByKey()")
rdd.map{
case (str,sum)=> (str+" ")*sum
}.flatMap(.split(" ")).map((,1)).countByKey().foreach(println)
println("第8种:countByValue()")
rdd.map{
case (str,sum)=> (str+" ")*sum
}.flatMap(_.split(" ")).countByValue().foreach(println)
println("第9种:aggregate()")
rdd.map{
case (str,sum)=> (str+" ")*sum
}.flatMap(_.split(" ")).map(s => mutable.Map(s -> 1)) .aggregate(mutable.MapString, Int)(
(map1: mutable.Map[String, Int], map2: mutable.Map[String, Int]) => {
map1.foldLeft(map2)(
(innerMap, kv) => {
innerMap(kv._1) = innerMap.getOrElse(kv._1, 0) + kv._2
innerMap
}
)
},
(map1: mutable.Map[String, Int], map2: mutable.Map[String, Int]) => {
map1.foldLeft(map2)(
(innerMap, kv) => {
innerMap(kv._1) = innerMap.getOrElse(kv._1, 0) + kv._2
innerMap
}
)
}
).foreach(println)
println("第10种:fold()")
rdd.map(t => mutable.Map((t._1,t._2))).fold(mutable.MapString, Int)(
(map1, map2) => {
map2.foreach{
case (word,cnt)=>{
val oldCnt= map1.getOrElse(word,0)
map1.update(word,oldCnt+cnt)
}
}
map1
}
).foreach(println)
println("第11种:reduce()")
rdd.map(t=>mutable.Map((t._1,t._2))).reduce(
(map1, map2) => {
map2.foreach{
case (word,cnt)=>{
val oldCnt= map1.getOrElse(word,0)
map1.update(word,oldCnt+cnt)
}
}
map1
}
).foreach(println)
}