WordCount

在spark官网讲解spark streaming的时候,举了一个word count的例子,通过监听一个端口的TCP连接,统计单词的个数。程序如下(in scala):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.spark._
import org.apache.spark.streaming._

object NetworkWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
Logger.getRootLogger.setLevel(Level.ERROR)

val lines = ssc.socketTextStream("localhost", 9998)
val wordcount=lines.flatMap(_.split("\\W+"))
.map((_,1))
.reduceByKey(_+_)
wordcount.print()

ssc.start()
ssc.awaitTermination()

}
}

然后,再在linux命令窗口中使用nc -lk 9998,输入一串单词,就可以统计单词出现的频率。
但是上面的写法只能统计当时输入的内容,而不能加上以前统计的结果。

有两个方法可以实现:
一,使用一个HashMap来存储以前统计的结果
二,使用DStream提供的updateStateByKey方法

先来看第一种方法(只显示main方法中的code):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
   val wordCountMap=new mutable.HashMap[String,Int]()
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
Logger.getRootLogger.setLevel(Level.ERROR)
val lines = ssc.socketTextStream("localhost", 9998)
val wordcount=lines.flatMap(_.split("\\W+"))
.map((_,1))
.reduceByKey(_+_)
//map与reduceByKey两步可以合成一步:countByValue(),但是此
//时,次数是Long类型,而不是Int类型了
wordcount.foreachRDD(line=>{
val array = line.collect()
array.map(w=>{
var count= wordCountMap.get(w._1)
if(count==None){
wordCountMap+=w
}else{
var count2=count.asInstanceOf[Some[Int]].get
count2+=w._2
wordCountMap+=Tuple2(w._1,count2)
}
})

var time=new Date(System.currentTimeMillis()).toString
println(s"---------------Time:$time------------------")
for(w<-wordCountMap.iterator) println(w)
println("---------------------end---------------")
})

ssc.start()
ssc.awaitTermination()

此方法使用DStream中的foreachRDD方法,操作RDD,当每一次有新内容进入时,统计单词出现的频率,并累加到之前统计的结果上。

第二种方法(只显示main中的code)

1
2
3
4
5
6
7
8
9
10
11
12
13
val wordCountMap=new mutable.HashMap[String,Long]()
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
Logger.getRootLogger.setLevel(Level.ERROR)
val lines = ssc.socketTextStream("localhost", 9998)
ssc.sparkContext.setCheckpointDir("d:/spark_check")
val wordcount=lines.flatMap(_.split("\\W+"))
.map((_,1))
.updateStateByKey(updateFunction _)
wordcount.print()

ssc.start()
ssc.awaitTermination()

需要有一个更新状态的函数:updateFunction

1
2
3
4
5
6
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
var newCount = if(runningCount!=None) runningCount.get else 0
if(newValues.size>0)
newCount +=1
Some(newCount)
}

需要注意的是需要设置checkpoint目录,因为spark需要此目录保存状态信息,如果是在windows中运行这段程序,还需要设置HADOOP_HOME环境变量,而且HADOOP_HOME的bin目录下还需要有winutils.exe。每当有新内容进入时,spark都会调用updateFunction函数,newValues是新进入的内容,runningCount是上一个状态。