Spark权威指南(中文版)----第11章 Datasets(1)

Datasets是结构化api的基本类型。我们已经使用过DataFrames,它是Row类型的Datasets,可以跨Spark的不同语言使用。Datasets是一种严格意义上的Java虚拟机(JVM)语言特性,仅适用于Scala和Java。使用Datasets,您可以定义数据集中每行包含的对象。在Scala中,这将是一个case类对象,它本质上定义了一个您可以使用的模式,在Java中,您将定义一个Java Bean。有经验的用户经常将Datasets称为Spark中的“类型化api集”。有关更多信息,请参见第4章。在第4章中,我们讨论了Spark的类型,如StringType、BigIntType、StructType等。这些特定于Spark的类型映射到每个Spark语言(如String、Integer和Double)中可用的类型。当您使用DataFrame API时,您不创建strings 或 integers,而是通过操作Row对象来为您操作数据。事实上,如果您使用Scala或Java,所有“DataFrames”实际上都是Row类型的Datasets。为了有效地支持特定领域的对象,需要一个称为“Encoder”的特殊概念。编码器将特定于域的类型T映射到Spark的内部类型系统。例如,给定一个Person类有两个字段,name (string)和age (int),编码器指导Spark在运行时生成代码,将Person对象序列化为二进制结构。当使用DataFrames或“标准”结构化api时,这个二进制结构将是Row。当我们想要创建我们自己的特定于域的对象时,我们在Scala中指定一个case class,或者在Java中指定一个JavaBean。Spark将允许我们以分布式方式操作这个对象(代替Row)。当您使用Dataset API时,对于它所触及的每一行,域指定类型,Spark将Spark行格式转换为您指定的对象(case类或Java类)。这种转换会减慢操作速度,但可以提供更大的灵活性。您将注意到性能上的下降,但这与您在Python中看到的用户定义函数(UDF)之类的东西的量级大不相同,因为性能成本不像切换编程语言那样极端,但是要记住这一点。

11.1.   什么时候使用Datasets

您可能会想,如果我在使用Datasets时要付出性能代价,为什么还要使用它们呢?如果我们必须把原因压缩成一个标准列表,这里有几个原因:

  • 当您希望执行的操作无法使用DataFrame操作来表示时

  • 当您希望或需要类型安全,并且愿意接受性能成本来实现它时

让我们更详细地探讨这些。有一些操作不能使用我们在前几章中看到的结构化api来表示。虽然这些不是特别常见,但是您可能希望将大量的业务逻辑编码到一个特定的函数中,而不是用SQL或DataFrames。这是Datasets的适当用法。此外,Dataset API是类型安全的。对其类型无效的操作(例如两个字符串类型相减)将在编译时而不是运行时失败。如果正确性和类型安全代码是您的最高优先级,那么以牺牲一些性能为代价,这对您来说可能是一个很好的选择。这并不能保护您免受畸形数据的影响,但是可以让您更优雅地处理和组织数据。您可能希望使用Datasets的另一个潜在时间是,您希望重用单节点工作负载和Spark工作负载之间对整个行进行的各种转换。如果您对Scala有一些经验,您可能会注意到Spark的api反映了Scala序列类型,但是它们是以分布式方式运行的。事实上,Scala的发明者马丁·奥德斯基(Martin Odersky)在2015年的Spark欧洲峰会上就说过这句话。因此,使用Datasets的一个优点是,如果将所有数据和转换定义为可接受case classes,那么在分布式和本地工作模式中重用它们就很简单了。此外,当您将DataFrames收集到本地磁盘时,它们将是正确的类和类型,有时会使进一步的操作更加容易。可能最流行的用例是同时使用DataFrames和Datasets,在与工作负载最相关的性能和类型安全之间进行手动权衡。当您想要收集数据到驱动程序并使用单节点库操作它时,可能处于一个大型的、基于DataFrames的提取、转换和加载(ETL)转换的末尾,或者,当您需要在Spark SQL中执行过滤和进一步操作之前执行逐行解析时,它可能处于转换的开始。

11.2.   创建Datasets

创建Datasets有点像手工操作,需要您提前知道并定义模式schemas。

11.2.1.   In Java: Encoders

JavaEncoders相当简单,您只需指定您的类,然后当您遇到DataFrame(Dataset<Row>)时,您将对其进行编码:

图片

11.2.2.   In Scala: Case Classes

要在Scala中创建Datasets,需要定义一个Scala case class。case class是一个常规类,它具有以下特征:

  • 不可变的

  • 可通过模式匹配进行分解

  • 允许基于结构而不是引用进行比较

  • 易于使用和操作

这些特性使得它对于数据分析非常有价值,因为对case class进行推理非常容易。可能最重要的特性是case class是不可变的,并且允许按结构而不是按值进行比较。Scala文档是这样描述case class的:

  • 不可变性使您无需跟踪事物在何时何地发生了变化

  • 按值比较允许您将实例作为原始值进行比较—不再存在关于类的实例是否按值或引用进行比较的不确定性

  • 模式匹配简化了分支逻辑,从而减少了错误,提高了代码的可读性。

这些优点也在Spark中得到了应用。

要开始创建Dataset,让我们为其中一个数据集定义一个case class:

图片

现在我们定义了一个case class,它将表示数据集中的一条记录。更简洁地说,我们现在有了一个飞行Dataset。这没有为我们定义任何方法,只是模式。当我们读取数据时,我们会得到一个DataFrame。然而,我们只是使用as方法将其转换为我们指定的行类型:

图片

11.3.   Actions

尽管我们可以看到Dataset的强大功能,但重要的是要理解像collect、take和count这样的操作是否适用于我们正在使用的数据集或数据流:

图片

您还会注意到,当我们实际访问其中一个case class时,我们不需要进行任何类型强制,我们只需要指定case class的属性名称并返回,不仅是期望值,还有期望的类型:   

图片

11.4.   Transformations

Dataset上的转换与我们在DataFrames上看到的转换相同。您在本节中读到的任何转换都适用于Dataset,我们鼓励您浏览相关聚合或连接的特定部分。除了这些转换之外,数据集还允许我们指定比单独在DataFrames上执行的更复杂和强类型的转换,因为我们要操作原始Java虚拟机(JVM)类型。为了演示这个原始对象操作,让我们过滤刚刚创建的Dataset。

11.4.1.   Filtering

让我们看一个简单的例子,创建一个简单的函数,该函数接受一个Flight并返回一个布尔值,该值描述起点和终点是否相同。这不是一个UDF(至少以Spark SQL定义UDF的方式),而是一个泛型函数。

提示

在下面的示例中,您将注意到我们将创建一个函数来定义这个过滤器。这与我们在书中迄今所做的工作有很大的不同。通过指定一个函数,我们强制Spark对Dataset中的每一行计算这个函数的值。这可能是非常资源密集型的。对于简单的过滤器,总是首选编写SQL表达式。这将大大降低过滤数据的成本,同时仍然允许您稍后将其作为Dataset进行操作:

图片

我们现在可以将这个函数传递到filter方法中,指定对于每一行,它都应该验证这个函数返回true,并且在这个过程中会相应地过滤我们的Dataset:

图片

结果是:

图片

正如我们前面看到的,这个函数根本不需要在Spark代码中执行。与udf类似,我们可以在Spark中使用它之前,使用它并在本地机器上的数据上测试它。

例如,这个数据集足够小,我们可以收集到驱动程序(作为一个Flights数组),我们可以在上面操作和执行完全相同的过滤操作:

图片

结果是:

图片

我们可以看到我们得到了和之前完全一样的答案。

11.4.2.   Mapping

Filtering是一个简单的转换,但有时需要将一个值映射到另一个值。在前面的示例中,我们对函数执行了这个操作:它接受一个flight并返回一个Boolean值,但在其他时候,我们可能需要执行一些更复杂的操作,比如提取一个值、比较一组值或类似的操作。

最简单的例子是操作Dataset,以便从每一行中提取一个值。这可以在DataFrame上有效地执行,比如对Dataset进行select。让我们提取目的地:

图片

注意,最后得到的数据集类型为String。这是因为Spark已经知道这个结果应该返回的JVM类型,并且允许我们从编译时检查中获益,如果由于某种原因,它是无效的。我们可以收集这个,并得到一个字符串数组到Driver程序:

图片

这可能感觉微不足道,没有必要;我们可以在DataFrames上正确地完成大部分工作。事实上,我们建议您这样做,因为这样做可以获得很多好处。您将获得一些优势,比如代码生成,这对于任意用户定义的函数都是不可能的。然而,这对于更复杂的逐行操作非常有用。

11.5.   Joins

如前所述,连接的应用与它们对DataFrames的应用相同。然而,Datasets还提供了更复杂的方法joinWith。joinWith大致相当于一个co-group(在RDD术语中),您基本上会在一个Datasets中得到两个嵌套Datasets。每一列表示一个Datasets,可以相应地操作这些数据集。当您需要在连接中维护更多信息或对整个结果执行一些更复杂的操作(如高级映射或过滤器)时,这将非常有用。

让我们创建一个航班元数据集来演示joinWith:

图片

注意,我们最终得到了一个键值对的数据集,其中每一行表示一个航班和航班元数据。当然,我们可以将它们作为Datasets或具有复杂类型的DataFrame查询:

图片

我们可以像以前一样收集它们:

图片

当然,“常规”连接也可以很好地工作,尽管您会注意到,在本例中,我们最终得到了一个DataFrame(因此丢失了JVM类型信息)。

图片

我们总是可以定义另一个Dataset来重新获得它。同样重要的是,DataFrame和Dataset进行join没有任何问题——我们最终得到了相同的结果:

图片

11.6.   Grouping和 Aggregations

分组和聚合遵循与我们在前面的聚合章节中看到的相同的基本标准,所以groupBy rollup和cube仍然适用,但是这些返回的是DataFrame,而不是Dataset (您丢失了类型信息):

图片

这通常没什么大不了的,但是如果您想保留类型信息,可以执行其他分组和聚合。groupByKey方法就是一个很好的例子。这允许您根据数据集中的Dataset进行分组,并返回一个类型化Dataset。但是,这个函数不接受特定的列名,而是接受一个函数。这使得您可以指定更复杂的分组函数,这些函数更类似于以下内容:

图片

尽管这提供了灵活性,但这是一种折衷,因为现在我们引入了JVM类型和不能由Spark优化的函数。这意味着您将看到性能差异,我们可以在检查explain计划时看到这一点。在下面的代码中,你可以看到我们在DataFrame中添加了一个新的列(我们函数的结果),然后对它执行分组:

图片

当我们对一个Dataset执行一个key值的分组后,我们可以对key Value Dataset进行操作,key Value Dataset的函数会将分组操作为原始对象:

图片

图片

我们甚至可以创建新的操作,并定义如何减少组:

图片

这应该是足够直接的理解,这是一个更昂贵的过程后立即聚合扫描,特别是因为它最终在相同的最终结果:

图片

这应该只在用户定义编码和有意义的地方使用Datasets。这可能是大数据管道的起点,也可能是终点。

11.7.   结束语

在本章中,我们介绍了数据集的基础知识,并提供了一些令人鼓舞的示例。虽然篇幅很短,但这一章实际上教会了你所有你需要知道的关于数据集的基本知识,以及如何使用它们。将它们看作是高级结构化api和底层RDD api之间的混合是有帮助的,这是第12章的主题。

原文地址:https://www.cnblogs.com/lanblogs/p/15162353.html