首先看一下Spark官网对该方法的讲解:
大致的意思是aggregate接收两个函数,和一个初始化值。seqOp函数用于聚集每一个分区,combOp用于聚集所有分区聚集后的结果。每一个分区的聚集,和最后所有分区的聚集都需要初始化值的参与。
举例如下:
集群环境:一台Master,三台Worker,在spark-shell中测试
1
2
3
4
5
6
7
8
9
10
11
12
13 scala> val seqOp:(Int,Int)=>Int={(a,b)=>{println("seqOp"+a+"\t"+b);math.min(a,b)}}
seqOp: (Int, Int) => Int = <function2>
scala> val combOp:(Int,Int)=>Int={(a,b)=>{println("combOp"+a+"\t"+b);a+b}}
combOp: (Int, Int) => Int = <function2>
scala> val z=sc.parallelize(List(1,2,3,4,5,6,7,8),2)
z: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> z.aggregate(3)(seqOp,combOp)
[Stage 13:> (0 + 0) / 2]combOp3 1
combOp 4 3
res13: Int = 7
为什么会等于7呢?
我们首先看一下集群中任务:
可以看出有两个任务,原因是我们将List并发数设置为了2,Spark会将List拆分成2部分同时执行。再进一步看这两个任务的的统计信息:
可以看出这两个任务在两个worker上执行,可以看到任务的启动时间,执行了多久等信息。再进一步看任务的stdout输出日志:
可以看出spark将List拆分成了两部分,启动两个任务分别执行。再来看看seqOp函数表达的意思,seqOp取的是两个数中的较小值。如第一半部分List(1,2,3,4),spark会拿初始值3与这个List中的每一个元素分别比较,最后得出的结果是1.同时,第二半部分List得出的结果是3,然后spark再将这两部分得出的结果调用combOp处理,combOp是两个数的相加,spark首先将初始值3加上1得出4,再加上3得到7.
接下来再看一下使用aggregate方法编写wordcount例子。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
* User:cool coding
* Date:20171214
* Time:16:12:20
*
*/
object WordCount {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("wordcount").setMaster("local[2]")
val sc=new SparkContext(conf)
val data=sc.textFile("H:/hadoop/wordcount.txt");
val words: RDD[String] = data.flatMap(_.split(" "))
val countsMap=words.aggregate(mutable.HashMap[String,Int]())((agg:mutable.HashMap[String,Int], word)=>{
if(!agg.contains(word)){
agg.put(word,1)
}else{
agg.put(word,agg(word)+1)
}
agg
},(agg1:mutable.HashMap[String,Int],agg2:mutable.HashMap[String,Int])=> {
for((word,count)<-agg1){
if(!agg2.contains(word)){
agg2.put(word,1)
}else{
agg2.put(word,agg2(word)+count)
}
}
agg2
}
)
println(countsMap.toList)
}
}