AAS代码第2章

[root@node1 aas]# pwd
/root/aas
[root@node1 aas]# wget http://archive.apache.org/dist/spark/spark-1.2.1/spark-1.2.1-bin-hadoop2.4.tgz 
[root@node1 aas]# tar zxvf spark-1.2.1-bin-hadoop2.4.tgz 

修改将conf目录下的spark-env.sh.template复制为/conf/spark-env.sh并增加如下内容:

HADOOP_CONF_DIR=/etc/hadoop/conf/
SPARK_EXECUTOR_INSTANCES=3
SPARK_EXECUTOR_CORES=4
SPARK_EXECUTOR_MEMORY=4G
SPARK_DRIVER_MEMORY=4G
SPARK_YARN_APP_NAME=AAS

修改conf目录下的log4j.properties.template为log4j.properties,并修改日志输出级别为WARN

log4j.rootCategory=WARN, console

启动spark-shell

[root@node1 spark-1.2.1-bin-hadoop2.4]# ./bin/spark-shell --master yarn-client
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_   version 1.2.1
      /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_09-icedtea)
Type in expressions to have them evaluated.
Type :help for more information.
15/12/03 14:54:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context available as sc.

scala> :help
All commands can be abbreviated, e.g. :he instead of :help.
Those marked with a * have more detailed help, e.g. :help imports.

:cp <path>                 add a jar or directory to the classpath
:help [command]            print this summary or command-specific help
:history [num]             show the history (optional num is commands to show)
:h? <string>               search the history
:imports [name name ...]   show import history, identifying sources of names
:implicits [-v]            show the implicits in scope
:javap <path|class>        disassemble a file or class name
:load <path>               load and interpret a Scala file
:paste                     enter paste mode: all input up to ctrl-D compiled together
:quit                      exit the repl
:replay                    reset execution and replay all previous commands
:reset                     reset the repl to its initial state, forgetting all session entries
:sh <command line>         run a shell command (result is implicitly => List[String])
:silent                    disable/enable automatic printing of results
:fallback                  
disable/enable advanced repl changes, these fix some issues but may introduce others. 
This mode will be removed once these fixes stablize
:type [-v] <expr>          display the type of an expression without evaluating it
:warnings                  show the suppressed warnings from the most recent line which had any

scala> 

P10中的样例数据在国内无法下载。需要有代理才行。请到百度网盘上下载:http://pan.baidu.com/s/1pJvjHA7

[root@node1 linkage]# unzip donation.zip 
Archive:  donation.zip
 extracting: block_10.zip            
 extracting: block_1.zip             
 extracting: block_2.zip             
 extracting: block_3.zip             
 extracting: block_4.zip             
 extracting: block_5.zip             
 extracting: block_6.zip             
 extracting: block_7.zip             
 extracting: block_8.zip             
 extracting: block_9.zip             
  inflating: documentation           
  inflating: frequencies.csv         
[root@node1 linkage]# ll
total 110280
-rw-r--r-- 1 root root  5643837 Mar  9  2011 block_10.zip
-rw-r--r-- 1 root root  5643935 Mar  9  2011 block_1.zip
-rw-r--r-- 1 root root  5642577 Mar  9  2011 block_2.zip
-rw-r--r-- 1 root root  5644247 Mar  9  2011 block_3.zip
-rw-r--r-- 1 root root  5644264 Mar  9  2011 block_4.zip
-rw-r--r-- 1 root root  5645826 Mar  9  2011 block_5.zip
-rw-r--r-- 1 root root  5645291 Mar  9  2011 block_6.zip
-rw-r--r-- 1 root root  5645235 Mar  9  2011 block_7.zip
-rw-r--r-- 1 root root  5646395 Mar  9  2011 block_8.zip
-rw-r--r-- 1 root root  5643134 Mar  9  2011 block_9.zip
-rwxrw-rw- 1 root root     4516 Mar 10  2011 documentation
-rw-r--r-- 1 root root 56448373 Dec  3 14:22 donation.zip
-rw-r--r-- 1 root root      272 Mar  9  2011 frequencies.csv

解压:

[root@node1 linkage]# unzip block_1.zip
Archive:  block_1.zip
  inflating: block_1.csv             
[root@node1 linkage]# unzip block_2.zip
Archive:  block_2.zip
  inflating: block_2.csv             
[root@node1 linkage]# unzip block_3.zip
Archive:  block_3.zip
  inflating: block_3.csv             
[root@node1 linkage]# unzip block_4.zip
Archive:  block_4.zip
  inflating: block_4.csv             
[root@node1 linkage]# unzip block_5.zip
Archive:  block_5.zip
  inflating: block_5.csv             
[root@node1 linkage]# unzip block_6.zip
Archive:  block_6.zip
  inflating: block_6.csv             
[root@node1 linkage]# unzip block_7.zip
Archive:  block_7.zip
  inflating: block_7.csv             
[root@node1 linkage]# unzip block_8.zip
Archive:  block_8.zip
  inflating: block_8.csv             
[root@node1 linkage]# unzip block_9.zip
Archive:  block_9.zip
  inflating: block_9.csv  
[root@node1 linkage]# unzip block_10.zip
Archive:  block_10.zip
  inflating: block_10.csv              
[root@node1 linkage]# ll
total 366672
-rw-r--r-- 1 root root 26255957 Mar  9  2011 block_10.csv
-rw-r--r-- 1 root root  5643837 Mar  9  2011 block_10.zip
-rw-r--r-- 1 root root 26248574 Mar  9  2011 block_1.csv
-rw-r--r-- 1 root root  5643935 Mar  9  2011 block_1.zip
-rw-r--r-- 1 root root 26241784 Mar  9  2011 block_2.csv
-rw-r--r-- 1 root root  5642577 Mar  9  2011 block_2.zip
-rw-r--r-- 1 root root 26253247 Mar  9  2011 block_3.csv
-rw-r--r-- 1 root root  5644247 Mar  9  2011 block_3.zip
-rw-r--r-- 1 root root 26247471 Mar  9  2011 block_4.csv
-rw-r--r-- 1 root root  5644264 Mar  9  2011 block_4.zip
-rw-r--r-- 1 root root 26249424 Mar  9  2011 block_5.csv
-rw-r--r-- 1 root root  5645826 Mar  9  2011 block_5.zip
-rw-r--r-- 1 root root 26256126 Mar  9  2011 block_6.csv
-rw-r--r-- 1 root root  5645291 Mar  9  2011 block_6.zip
-rw-r--r-- 1 root root 26261911 Mar  9  2011 block_7.csv
-rw-r--r-- 1 root root  5645235 Mar  9  2011 block_7.zip
-rw-r--r-- 1 root root 26253911 Mar  9  2011 block_8.csv
-rw-r--r-- 1 root root  5646395 Mar  9  2011 block_8.zip
-rw-r--r-- 1 root root 26254012 Mar  9  2011 block_9.csv
-rw-r--r-- 1 root root  5643134 Mar  9  2011 block_9.zip
-rwxrw-rw- 1 root root     4516 Mar 10  2011 documentation
-rw-r--r-- 1 root root 56448373 Dec  3 14:22 donation.zip
-rw-r--r-- 1 root root      272 Mar  9  2011 frequencies.csv

将数据放到HDFS上

[root@node1 linkage]# hdfs dfs -mkdir linkage
[root@node1 linkage]# hdfs dfs -put block_*.csv linkage

运行实例代码:

scala> val rawblocks = sc.textFile("/root/aas/ch02/linkage/frequencies.csv")
rawblocks: org.apache.spark.rdd.RDD[String] = /root/aas/ch02/linkage/frequencies.csv MappedRDD[1] at textFile at <console>:12

scala> rawblocks.first
res3: String = "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"

scala> val head = rawblocks.take(10)
head: Array[String] = Array("id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match", 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE, 39086,47614,1,?,1,?,1,1,1,1,1,TRUE, 70031,70237,1,?,1,?,1,1,1,1,1,TRUE, 84795,97439,1,?,1,?,1,1,1,1,1,TRUE, 36950,42116,1,?,1,1,1,1,1,1,1,TRUE, 42413,48491,1,?,1,?,1,1,1,1,1,TRUE, 25965,64753,1,?,1,?,1,1,1,1,1,TRUE, 49451,90407,1,?,1,?,1,1,1,1,0,TRUE, 39932,40902,1,?,1,?,1,1,1,1,1,TRUE)

scala> head.length
res4: Int = 10

scala> head.foreach(println)
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE
36950,42116,1,?,1,1,1,1,1,1,1,TRUE
42413,48491,1,?,1,?,1,1,1,1,1,TRUE
25965,64753,1,?,1,?,1,1,1,1,1,TRUE
49451,90407,1,?,1,?,1,1,1,1,0,TRUE
39932,40902,1,?,1,?,1,1,1,1,1,TRUE

scala> head.foreach(println)
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE
36950,42116,1,?,1,1,1,1,1,1,1,TRUE
42413,48491,1,?,1,?,1,1,1,1,1,TRUE
25965,64753,1,?,1,?,1,1,1,1,1,TRUE
49451,90407,1,?,1,?,1,1,1,1,0,TRUE
39932,40902,1,?,1,?,1,1,1,1,1,TRUE

scala> def isHeader(line: String) = line.contains("id_1")
isHeader: (line: String)Boolean

scala> head.filter(isHeader).foreach(println)
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"

scala> head.filterNot(isHeader).foreach(println)
37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE
36950,42116,1,?,1,1,1,1,1,1,1,TRUE
42413,48491,1,?,1,?,1,1,1,1,1,TRUE
25965,64753,1,?,1,?,1,1,1,1,1,TRUE
49451,90407,1,?,1,?,1,1,1,1,0,TRUE
39932,40902,1,?,1,?,1,1,1,1,1,TRUE

scala> head.filter(x => !isHeader(x)).length
res8: Int = 9

scala> val noheader = rawblocks.filter(x => !isHeader(x))
noheader: org.apache.spark.rdd.RDD[String] = FilteredRDD[4] at filter at <console>:16

scala> noheader.first
res9: String = 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE

 

scala> def toDouble(s: String) = {
| if ("?" .equals(s)) Double.NaN else s.toDouble
| }
toDouble: (s: String)Double


scala>


scala> def parse(line: String) = {
| val pieces = line.split(',' )
| val id1 = pieces(0).toInt
| val id2 = pieces(1).toInt
| val scores = pieces.slice(2, 11).map(toDouble)
| val matched = pieces(11).toBoolean
| (id1, id2, scores, matched)
| }
parse: (line: String)(Int, Int, Array[Double], Boolean)

scala> val line =head(5)
line: String = 36950,42116,1,?,1,1,1,1,1,1,1,TRUE

scala> val tup = parse(line)
tup: (Int, Int, Array[Double], Boolean) = (36950,42116,Array(1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0),true)

scala> def parse(line: String) = {
| val pieces = line.split(',' )
| val id1 = pieces(0).toInt
| val id2 = pieces(1).toInt
| val scores = pieces.slice(2, 11).map(toDouble)
| val matched = pieces(11).toBoolean
| MatchData(id1, id2, scores, matched)
| }
parse: (line: String)MatchData

scala> val md = parse(line)
md: MatchData = MatchData(36950,42116,[D@3c935226,true)

scala> val mds = head.filter(x => !isHeader(x)).map(x => parse(x))
mds: Array[MatchData] = Array(MatchData(37291,53113,[D@3bc5ac5,true), MatchData(39086,47614,[D@42eb3d6d,true), MatchData(70031,70237,[D@620de16d,true), MatchData(84795,97439,[D@7d4aed65,true), MatchData(36950,42116,[D@4227c226,true), MatchData(42413,48491,[D@403b6eb8,true), MatchData(25965,64753,[D@7de212f9,true), MatchData(49451,90407,[D@54bda00,true), MatchData(39932,40902,[D@36d538b7,true))

scala> val parsed = noheader.map(line => parse(line))
parsed: org.apache.spark.rdd.RDD[MatchData] = MappedRDD[5] at map at <console>:28

scala> parsed. cache()
res10: parsed.type = MappedRDD[5] at map at <console>:28


 两次调用parsed.count()

scala> parsed.count()
res11: Long = 5749132                                                                                                                                                                                    

scala> parsed.count()
res12: Long = 5749132

发现第二次的速度明显快乐许多。原因在于第一次调用后parsed这个数据已经到内存里了。

观察Storage页面http://node1:8088/proxy/application_1448538943757_0008/storage/,发现占用缓存683.4 MB。如果将StorageLevel改成MEMOERY_SER,观察一下内存大小为683.4 MB

先调用unpersist(),这时观察到Storage页面上已经没有了缓存。

然后调用parsed.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)

scala> parsed.unpersist()
res13: parsed.type = MappedRDD[5] at map at <console>:28

scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel

scala> parsed.persist(StorageLevel.MEMORY_ONLY_SER)
res18: parsed.type = MappedRDD[5] at map at <console>:28

scala> parsed.count()
res19: Long = 5749132                                                                                                                                                                                    

scala> parsed.count()

观察到采用MEMORY_ONLY_SER之后内存占用大小为580.1 MB。比采用MEMORY时的683.4 MB少了100MB

2.8节聚合

scala> val grouped = mds.groupBy(md => md.matched)
grouped: scala.collection.immutable.Map[Boolean,Array[MatchData]] = Map(true -> Array(MatchData(37291,53113,[D@3bc5ac5,true), MatchData(39086,47614,[D@42eb3d6d,true), MatchData(70031,70237,[D@620de16d,true), MatchData(84795,97439,[D@7d4aed65,true), MatchData(36950,42116,[D@4227c226,true), MatchData(42413,48491,[D@403b6eb8,true), MatchData(25965,64753,[D@7de212f9,true), MatchData(49451,90407,[D@54bda00,true), MatchData(39932,40902,[D@36d538b7,true)))

scala> grouped.mapValues(x => x.size).foreach(println)
(true,9)

2.9创建直方图

scala> val grouped = mds.groupBy(md => md.matched)
grouped: scala.collection.immutable.Map[Boolean,Array[MatchData]] = Map(true -> Array(MatchData(37291,53113,[D@3bc5ac5,true), MatchData(39086,47614,[D@42eb3d6d,true), MatchData(70031,70237,[D@620de16d,true), MatchData(84795,97439,[D@7d4aed65,true), MatchData(36950,42116,[D@4227c226,true), MatchData(42413,48491,[D@403b6eb8,true), MatchData(25965,64753,[D@7de212f9,true), MatchData(49451,90407,[D@54bda00,true), MatchData(39932,40902,[D@36d538b7,true)))

scala> grouped.mapValues(x => x.size).foreach(println)
(true,9)

scala> val matchCounts = parsed.map(md => md.matched).countByValue()
matchCounts: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 5728201)

scala> val matchCountsSeq = matchCounts.toSeq
matchCountsSeq: Seq[(Boolean, Long)] = ArrayBuffer((true,20931), (false,5728201))
scala> matchCountsSeq.sortBy(_._1).foreach(println)
(false,5728201)
(true,20931)

scala> matchCountsSeq.sortBy(_._2).reverse.foreach(println)
(false,5728201)
(true,20931)

2.10连续变量的概要统计

scala> parsed.map(md => md.scores(0)).stats()
res27: org.apache.spark.util.StatCounter = (count: 5749132, mean: NaN, stdev: NaN, max: NaN, min: NaN)

scala> import java.lang.Double.isNaN
import java.lang.Double.isNaN

scala> parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats()
res28: org.apache.spark.util.StatCounter = (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000)

2.11.为计算概要信息创建可重用代码

编写一个新的文件,存放在/root/aas/ch02/StatsWithMissing.scala,代码如下:

class NAStatCounter extends Serializable {
 val stats: StatCounter = new StatCounter()
 var missing: Long = 0
 def add(x: Double): NAStatCounter = {
   if (java. lang. Double. isNaN(x)) {
     missing += 1
   } else {
     stats. merge(x)
   }
   this
 }
 
 def merge(other: NAStatCounter): NAStatCounter = {
   stats. merge(other. stats)
   missing += other. missing
   this
 }
 
 
 override def toString = {
   "stats: " + stats. toString + " NaN: " + missing
 }
 
}

object NAStatCounter extends Serializable {
 def apply(x: Double) = new NAStatCounter(). add(x)
}

注意,这段代码中stats. merge(x)有点难理解:由于定义了apply方法,这里其实有一个NAStatCounter.apply(x)的隐式调用。

scala> val nasRDD = parsed.map(md => {md.scores.map(d=> NAStatCounter(d))})
nasRDD: org.apache.spark.rdd.RDD[Array[NAStatCounter]] = MappedRDD[18] at map at <console>:38

scala> val nas1 = Array(1.0, Double. NaN). map(d => NAStatCounter(d))
nas1: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 0, stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1)

scala> val nas2 = Array(Double. NaN, 2.0). map(d => NAStatCounter(d))
nas2: Array[NAStatCounter] = Array(stats: (count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 0)

scala> val merged = nas1. zip(nas2). map(p => p. _1. merge(p. _2))
merged: Array[NAStatCounter] = Array(stats: (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000) NaN: 1, stats: (count: 1, mean: 2.000000, stdev: 0.000000, max: 2.000000, min: 2.000000) NaN: 1)

scala> val reduced = nasRDD. reduce((n1, n2) => {
     |  n1. zip(n2). map { case (a, b) => a. merge(b) }
     | })
reduced: Array[NAStatCounter] = Array(stats: (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000) NaN: 1007, stats: (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000) NaN: 5645434, stats: (count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000) NaN: 5746668, stats: (count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000) NaN: 795, stats: (count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000) NaN: 795, stats: (count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0....
scala> reduced. foreach(println)

2.12.变量的选择和评分简介

scala> val statsm = statsWithMissing(parsed. filter(_. matched). map(_. scores))
statsm: Array[NAStatCounter] = Array(stats: (count: 20922, mean: 0.997316, stdev: 0.036506, max: 1.000000, min: 0.000000) NaN: 9, stats: (count: 1333, mean: 0.989890, stdev: 0.082489, max: 1.000000, min: 0.000000) NaN: 19598, stats: (count: 20931, mean: 0.997015, stdev: 0.043118, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 475, mean: 0.969370, stdev: 0.153291, max: 1.000000, min: 0.000000) NaN: 20456, stats: (count: 20931, mean: 0.987292, stdev: 0.112013, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 20925, mean: 0.997085, stdev: 0.053914, max: 1.000000, min: 0.000000) NaN: 6, stats: (count: 20925, mean: 0.997945, stdev: 0.045285, max: 1.000000, min: 0.000000) NaN: 6, stats: (count: 20925, mean: 0.996129, stdev: 0.062097, max: 1.000000, min: 0.000000) NaN: 6, stats: (cou...
scala> val statsn = statsWithMissing(parsed. filter(! _. matched). map(_. scores))
statsn: Array[NAStatCounter] = Array(stats: (count: 5727203, mean: 0.711863, stdev: 0.389081, max: 1.000000, min: 0.000000) NaN: 998, stats: (count: 102365, mean: 0.898847, stdev: 0.272720, max: 1.000000, min: 0.000000) NaN: 5625836, stats: (count: 5728201, mean: 0.313138, stdev: 0.332281, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 1989, mean: 0.162955, stdev: 0.192975, max: 1.000000, min: 0.000000) NaN: 5726212, stats: (count: 5728201, mean: 0.954883, stdev: 0.207560, max: 1.000000, min: 0.000000) NaN: 0, stats: (count: 5727412, mean: 0.221643, stdev: 0.415352, max: 1.000000, min: 0.000000) NaN: 789, stats: (count: 5727412, mean: 0.486995, stdev: 0.499831, max: 1.000000, min: 0.000000) NaN: 789, stats: (count: 5727412, mean: 0.219923, stdev: 0.414194, max: 1.000000, min: 0.00...
scala> statsm.zip(statsn).map { case(m, n) =>
     |  (m.missing + n.missing, m.stats.mean - n.stats.mean)
     | }.foreach(println)
(1007,0.285452905746686)
(5645434,0.09104268062279908)
(0,0.6838772482597568)
(5746668,0.8064147192926266)
(0,0.03240818525033473)
(795,0.7754423117834042)
(795,0.5109496938298719)
(795,0.7762059675300523)
(12843,0.9563812499852178)

scala> def naz(d: Double) = if (Double.NaN.equals(d)) 0.0 else d
naz: (d: Double)Double

scala> case class Scored(md: MatchData, score: Double)
defined class Scored

scala> val ct = parsed.map(md => {
     |  val score = Array(2, 5, 6, 7, 8).map(i => naz(md.scores(i))).sum
     |  Scored(md, score)
     | })
ct: org.apache.spark.rdd.RDD[Scored] = MappedRDD[27] at map at <console>:40
scala> ct.filter(s => s.score >= 4.0).map(s => s.md.matched).countByValue()
res34: scala.collection.Map[Boolean,Long] = Map(true -> 20871, false -> 637)                                                                                                                             

scala> ct.filter(s => s.score >= 2.0). map(s => s.md.matched).countByValue()
res35: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 596414)                                                                                                                          
原文地址:https://www.cnblogs.com/littlesuccess/p/5016579.html