原创

常见11种word count


import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**

  • @Autor LZH

  • @Date 2020/12/1 10:17 / object WordCount extends App { var conf=new SparkConf().setMaster("local[]").setAppName("WordCount") val sc=new SparkContext(conf)

    val rdd: RDD[(String, Int)] = sc.makeRDD( List( ("a", 1), ("a", 2), ("c", 1), ("b", 1), ("b", 2), ("c", 2) ), 2 )

println("第1种:reduceByKey()") rdd.reduceByKey(+).collect().foreach(println)

println("第2种:groupByKey()") rdd.groupByKey().mapValues(_.sum).collect().foreach(println)

println("第3种:groupBy()") rdd.groupBy(.1).mapValues(.map(._2).sum).collect().foreach(println)

println("第4种:aggregateByKey()") rdd.aggregateByKey(0)(+,+).collect().foreach(println)

println("第5种:foldByKey()") rdd.foldByKey(0)(+).collect().foreach(println)

println("第6种:combineByKey()") rdd.combineByKey( v=>v, (x: Int, y) => x+y, (x: Int, y: Int) => x+y ).collect().foreach(println)

println("第7种:countByKey()") rdd.map{ case (str,sum)=> (str+" ")*sum }.flatMap(.split(" ")).map((,1)).countByKey().foreach(println)

println("第8种:countByValue()") rdd.map{ case (str,sum)=> (str+" ")*sum }.flatMap(_.split(" ")).countByValue().foreach(println)

println("第9种:aggregate()") rdd.map{ case (str,sum)=> (str+" ")*sum }.flatMap(_.split(" ")).map(s => mutable.Map(s -> 1)) .aggregate(mutable.MapString, Int)( (map1: mutable.Map[String, Int], map2: mutable.Map[String, Int]) => { map1.foldLeft(map2)( (innerMap, kv) => { innerMap(kv._1) = innerMap.getOrElse(kv._1, 0) + kv._2 innerMap } ) }, (map1: mutable.Map[String, Int], map2: mutable.Map[String, Int]) => { map1.foldLeft(map2)( (innerMap, kv) => { innerMap(kv._1) = innerMap.getOrElse(kv._1, 0) + kv._2 innerMap } ) } ).foreach(println)

println("第10种:fold()") rdd.map(t => mutable.Map((t._1,t._2))).fold(mutable.MapString, Int)( (map1, map2) => { map2.foreach{ case (word,cnt)=>{ val oldCnt= map1.getOrElse(word,0) map1.update(word,oldCnt+cnt) } } map1 } ).foreach(println)

println("第11种:reduce()") rdd.map(t=>mutable.Map((t._1,t._2))).reduce( (map1, map2) => { map2.foreach{ case (word,cnt)=>{ val oldCnt= map1.getOrElse(word,0) map1.update(word,oldCnt+cnt) } } map1 } ).foreach(println)

}

Spark

评论