首先看一下Spark官网对该方法的讲解:
大致的意思是aggregate接收两个函数,和一个初始化值。seqOp函数用于聚集每一个分区,combOp用于聚集所有分区聚集后的结果。每一个分区的聚集,和最后所有分区的聚集都需要初始化值的参与。
举例如下:
集群环境:一台Master,三台Worker,在spark-shell中测试
1
2
3
4
5
6
7
8
9
10
11
12
13 scala> val seqOp:(Int,Int)=>Int={(a,b)=>{println("seqOp"+a+"\t"+b);math.min(a,b)}}
seqOp: (Int, Int) => Int = <function2>
scala> val combOp:(Int,Int)=>Int={(a,b)=>{println("combOp"+a+"\t"+b);a+b}}
combOp: (Int, Int) => Int = <function2>
scala> val z=sc.parallelize(List(1,2,3,4,5,6,7,8),2)
z: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> z.aggregate(3)(seqOp,combOp)
[Stage 13:> (0 + 0) / 2]combOp3 1
combOp 4 3
res13: Int = 7
为什么会等于7呢?
我们首先看一下集群中任务:
可以看出有两个任务,原因是我们将List并发数设置为了2,Spark会将List拆分成2部分同时执行。再进一步看这两个任务的的统计信息:
可以看出这两个任务在两个worker上执行,可以看到任务的启动时间,执行了多久等信息。再进一步看任务的stdout输出日志:
可以看出spark将List拆分成了两部分,启动两个任务分别执行。再来看看seqOp函数表达的意思,seqOp取的是两个数中的较小值。如第一半部分List(1,2,3,4),spark会拿初始值3与这个List中的每一个元素分别比较,最后得出的结果是1.同时,第二半部分List得出的结果是3,然后spark再将这两部分得出的结果调用combOp处理,combOp是两个数的相加,spark首先将初始值3加上1得出4,再加上3得到7.
接下来再看一下使用aggregate方法编写wordcount例子。
1 | import org.apache.spark.rdd.RDD |
v1.5.2