Spark 问题总结(二)

1. RDD如何持久化数据?

  • 有两种方法可以持久存储数据,比如持久存储 persist()和cache() 临时存储在内存中。

  • 有不同的存储级别选项,比如MEMORY_ONLY、MEMORY_AND_DISK、DISK_ONLY等等。

  • persist() 和 cache() 使用不同的选项取决于任务的要求。

2. Yarn上运行Spark时,是否需要在Yarn cluster的所有节点上都安装Spark ?

  • 因为spark运行在Yarn之上,所以它利用Yarn在集群节点上执行命令。

  • 所以,你只需要在一个节点上安装Spark。

 

3. Spark的缺点是什么?

Spark利用了内存。开发人员必须小心。一个普通的开发人员可能会犯以下错误:

  • 可能最终在本地节点上运行所有东西,而不是将工作分配到集群。

  • 通过使用多个集群,她可能会多次访问某个服务。

 

4. 什么是RDD

  • RDD的完整形式是弹性分布式数据集。

  • 它是位于网络上的数据的一种表示

  • 不可变的——您可以对rdd进行操作以生成另一个rdd,但是您不能更改它。

  • 分区/并行——位于RDD上的数据是并行操作的。RDD上的任何操作都是使用多个节点完成的。

  • 弹性——如果一个承载分区的节点发生故障,另一个节点将获取它的数据。

  • RDD提供两种操作: 转换和操作。

5. 转换是什么?

  • 转换是应用于RDD(弹性分布式数据集)的函数。转换会导致另一个RDD。

  • 直到一个动作发生,转换才会被执行。

  • 转换的例子有:

  • map()——在RDD的每个元素上应用传递给它的函数,生成一个新的RDD。

  • filter()——通过从传递函数参数的当前RDD中选择元素来创建一个新的RDD

6. Action是什么?

  • 一个Action将数据从RDD带回本地机器。

  • 一个Action的执行会导致之前创建的所有转换。

  • Action的例子是:

  • reduce()——执行一次又一次传递的函数,直到只剩下一个值。函数应该有两个参数并返回一个值。

  • take()——将所有值从RDD返回到本地节点。

7. 假设我在RDD中有一个庞大的数字列表(假设是myrdd)。我写了以下代码来计算平均值:

def myAvg(x, y):

return (x+y)/2.0;

avg = myrdd.reduce (myAvg);

它有什么问题? 你会怎么改正呢?

  • 简单地求和,然后除以计数。

  • def sum(x, y):
  • return x+y;
  •  
  • total = myrdd.reduce(sum);
  • avg = total / myrdd.count();

 

  • 上述代码的唯一问题是,总数可能会变得非常大,从而导致流溢出。所以,我宁愿每一个数除以计数,然后按照下面的方式求和。

  • cnt = myrdd.count();
  •  
  • def devideByCnd(x):
  • return x/cnt;
  •  
  • myrdd1 = myrdd.map(devideByCnd);
  • avg = myrdd.reduce(sum);

8. 假设在HDFS文件中有一个巨大的数字列表。每一行都有一个数字。我要计算这些数的平方和的平方根。你会怎么做?

  • #我们将首先从HDFSspark上以RDD的形式加载文件
  • numsAsText = sc.textFile (“hdfs: / /hadoop1.knowbigdata.com/user/student/sgiri/mynumbersfile.txt”);
  • #定义用来计算squaresdef toSqInt(str)的函数:
  • v = int(str);
  • return v*v;
  • #spark rdd上作为转换运行函数
  • nums = numsAsText.map (toSqInt);
  • #运行sum as reduce操作
  • total= nums.reduce(sum)
  • 最后计算平方根。我们需要导入math
  • import math;
  • print math.sqrt(total);

 

9. 下面的方法正确吗? sqrtOfSumOfSq是一个有效的减速器吗?

  • numsAsText =sc.textFile(“hdfs://hadoop1.knowbigdata.com/user/student/sgiri/mynumbersfile.txt”);
  •  
  • def toInt(str):
  • return int(str);
  •  
  • nums = numsAsText.map(toInt);
  •  
  • def sqrtOfSumOfSq(x, y):
  • return math.sqrt(x*x+y*y);
  •  
  • total = nums.reduce(sqrtOfSumOfSq)
  •  
  • import math;
  • print math.sqrt(total);
  •  
  • 是的。该方法是正确的,sqrtOfSumOfSq是一个有效的减速器。

10. 你能比较一下9 10方法的优缺点

  • 9map()中进行平方运算并在 action方法中对reduce求和
  • 10在执行reduce操作时做平方运算。
  • 9方法将会更快,因为10中,由于调用math.sqrt(),所以减速器代码很重,并且减速器代码通常大约执行n-1spark RDD
  • 方法9的唯一缺点是有一个巨大的整数溢出的机会,因为我是作为map的一部分计算平方和的。
  • 原则:能在map 处做的操作尽量在 map 处做。

11. 如果你要计算spark上每个单词的总数,你会怎么做?

  • 这将在spark中加载bigtextfile.txt作为RDD
  • sc.textFile (“hdfs: / /hadoop1.knowbigdata.com/user/student/sgiri/bigtextfile.txt”);
  • 定义一个函数,可以将每一行分解成单词
  • def toWords(line):
  • return line.split();

 

  • spark上的RDD的每个元素上运行toWords函数作为平面映射转换。
  • 我们将使用flatMap而不是map,因为我们的函数返回了多个值。
  • words= lines.flatMap(toWords);
  • 将每个单词转换成(键,值)对。她的键将是单词本身,值将是1
  • def toTuple(word):
  • return (word, 1);

 

  • wordsTuple = words.map(toTuple);

 

  • 现在我们可以轻松地执行reduceByKey()操作。
  • def sum(x, y):
  • return x+y;
  •  
  • counts = wordsTuple.reduceByKey(sum)
  •  
  • #现在,打印
  • counts.collect ()

12. 在一个非常大的文本文件中,您只想检查是否存在特定的关键字。你会如何使用Spark?

  • lines = sc.textFile(“hdfs://hadoop1.knowbigdata.com/user/student/sgiri/bigtextfile.txt”);
  •  
  • def isFound(line):
  • if line.find(“mykeyword”) > -1:
  • return 1;
  • return 0;
  •  
  • foundBits = lines.map(isFound);
  • sum = foundBits.reduce(sum);
  • if sum > 0:
  • print “FOUND”;
  • else:
  • print “NOT FOUND”;

 

13. 你能在之前的答案中改进这段代码的性能吗?

  • 是的。即使我们寻找的单词已经找到,搜索也没有停止。我们的map代码将在所有节点上继续执行,这是非常低效的。

  • 我们可以使用累加器来报告单词是否已经找到,然后停止工作.

原文地址:https://www.cnblogs.com/yjyyjy/p/12869636.html