本章展示了Apache Spark( https://spark.apache.org )的一些例子,这是一个用于大规模数据处理的统一数据分析引擎。
Spark网站将其描述为一个“用于大规模数据处理的统一分析引擎”。这意味着它是一个大数据框架,可以让你用不同的技术分析你的数据 —— 比如把数据当作电子表格或数据库 —— 并在分布式集群上运行。你可以用Spark来分析那些大到跨越数千台计算机的数据集。 虽然Spark是为在计算机集群上处理巨大的数据集而设计的,但它的一个伟大之处在于,你只需用几个例子文件就可以学会如何在自己的计算机上使用Spark。
本章的例子使用Spark 3.1.1,该版本于2021年3月发布,也是本文写作时的最新版本。Spark目前只能与Scala 2.12一起工作,所以本章的例子也使用Scala 2.12。然而,由于使用Spark的工作通常涉及到使用 map 和 filter 等集合方法,或者SQL查询,所以在这些例子中你几乎不会注意到Scala 2和Scala 3之间的区别。
本章的例子展示了如何在你自己的计算机上使用Spark,同时展示了在跨越数千台计算机的数据集上工作的关键概念。20.1小节展示了如何开始使用Spark,并深入研究其基本概念之一,即 弹性分布式数据集(Resilient Distributed Dataset),或称RDD。RDD让你把数据当作一个大型的分布式电子表格。
第一个例子展示了如何在REPL内部,从一个集合中创建一个RDD,然后20.2小节展示了如何将一个数据文件读入RDD。它展示了几种读取数据文件的方法,使用Scala和Spark方法操作该数据,并将数据写入其他文件中。
接下来,使用MovieLens( https://movielens.org )的样本数据集,20.3小节展示了如何用Scala样例类对CSV文件中的一行进行建模,然后将该文件中的数据读入RDD并按需要进行操作。
然后,20.4小节展示了如何将RDD转换成 DataFrame。这个过程让你可以使用类似SQL的API来查询你的数据。这些例子展示了如何使用该查询语法,以及如何定义一个模式(schema)来表示你的数据。
从20.5小节开始,展示了如何将MovieLens CSV文件读入一个 DataFrame,并再次查询该数据。本示例将这一过程向前推进了一步,展示了如何在该数据上创建一个SQL视图,这让你可以使用常规的SQL查询字符串来查询该数据。
随着例子继续使用MovieLens数据集,20.6小节展示了如何将多个CSV文件读入 DataFrames,在这些文件上创建视图,然后用传统的SQL查询(包括Join)来查询数据,比如这样:
val query = """
select * from movies, ratings
where movies.movieId == ratings.movieId
and rating == 5.0
"""
最后,20.7小节展示了创建Spark应用程序的完整过程,该程序可以使用sbt打包成JAR文件并从命令行运行。
Spark是一个巨大的主题,关于它的书已经写了很多。本章内容的目的是让你迅速掌握一些主要的技术,并使之运行。有关Spark的更多细节,请看Bill Chambers和Matei Zaharia(O'Reilly)写的 Spark: The Definitive Guide 一书。
本章的例子使用MovieLens数据集( https://oreil.ly/3fSf0 )。该数据集包括以CSV文件形式存在的真实世界的电影评分,该文件大小为数百兆字节,包含了超过62,000部电影的评分和明细。关于该数据集的明细,请参阅F.Maxwell Harper和Joseph A. Konstan的开创性论文,“The MovieLens Datasets: History and Context,” ACM Transactions on Interactive Intelligent Systems 5, no. 4 (2016): 1– 19, https://doi.org/10.1145/2827872 .
你以前从未使用过Spark,并想开始使用它。
当你专业地使用Spark时,你的数据可能会分布在多台电脑上 —— 也许是成千上万台电脑 —— 但要开始使用Spark,你可以在一台电脑系统上以本地模式完成所有的工作。最简单的方法是启动Spark shell,创建一个数组,然后从那里开始。
Spark的安装说明可能会随着时间的推移而变化,但目前你只需从其下载页面( https://oreil.ly/O63Qu )下载Spark。下载的文件是一个 tgz 文件,所以只要解压并将其移动到你的 bin 目录,就像你手动安装其他下载的应用程序一样。例如,我把Spark安装在我的 /Users/al/bin 目录下。
一旦你安装了Spark,就可以像这样启动Scala Spark shell:
$ spark-shell
Spark shell是你通过 scala 命令得到的普通Scala shell的修改版,所以在Scala shell中能做的任何事情,在Spark shell中也能做,比如创建一个数组:
val nums = Array.range(0, 100)
一旦你有了像数组(array)或映射(map)这样的东西,就可以通过调用Spark Context的 parallelize 方法创建一个Spark Resilient Distributed Dataset(RDD):
scala> val rdd = spark.sparkContext.parallelize(nums)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize
at <console>:25
从输出中注意到,rdd的类型为 RDD[Int] 。正如你将在讨论中看到的,RDD是Spark的基本构建模块之一。现在你可以把它看作是一个像列表或数组一样的集合类,但它的数据可以分布在集群中的所有计算机上。它也有额外的方法可以被调用。下面是一些看起来很熟悉的Scala集合类的方法的例子:
rdd.count // Long = 100
rdd.first // Int = 0
rdd.min // Int = 0
rdd.max // Int = 99
rdd.take(3) // Array[Int] = Array(0, 1, 2)
rdd.take(2).foreach(println) // prints 0 and 1 on separate lines
这里有几个RDD方法,可能看起来不那么熟悉:
// “sample” methods return random values from the RDD
rdd.sample(false, 0.05).collect // Array[Int] = Array(0, 16, 22, 27, 60, 73)
rdd.takeSample(false, 5) // Array[Int] = Array(35, 65, 31, 27, 1)
rdd.mean // Double = 49.5
rdd.stdev // Double = 28.866070047722115
rdd.getNumPartitions // Int = 8
rdd.stats // StatCounter = (count: 100, mean: 49.500000,
// stdev: 28.866070, max: 99.000000,
// min: 0.000000)
你也可以使用熟悉的集合方法,如 map 和 filter:
scala> rdd.map(_ + 5).filter(_ < 8)
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at filter at
<console>:26
scala> rdd.filter(_ > 10).filter(_ < 20)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at filter at
<console>:26
然而,注意到这些方法并没有返回一个结果,至少不是你所期望的那个结果。在Spark中,像这样的转换方法是被惰性地评估的,所以我们把它们称为惰性或非严格的。为了从它们那里得到一个结果,你必须添加一个action方法。比如RDD有一个 collect 方法,这是一个action方法,可以强制运行所有之前的转换方法,然后把结果带回你的代码正在运行的计算机上。在这些例子中,添加 collect 会导致一个结果被计算出来:
scala> rdd.map(_ + 5).filter(_ < 8).collect
res0: Array[Int] = Array(5, 6, 7)
scala> rdd.filter(_ > 10).filter(_ < 20).collect
res1: Array[Int] = Array(11, 12, 13, 14, 15, 16, 17, 18, 19)
在生产环境下,Spark会处理分布在计算机集群中的数据,但上面的示例所示,在Spark的 本地模式 下,所有的处理都在本地计算机上完成。
在解决方案中,我在Spark shell中用这两行代码创建了一个RDD:
val nums = Array.range(0, 100)
val rdd = spark.sparkContext.parallelize(nums)
正如你可能猜到的,spark 是一个在shell中预先构建的可用对象。你可以用shell的 :type 命令查看 spark 和 spark.sparkContext 的类型:
scala> :type spark
org.apache.spark.sql.SparkSession
scala> :type spark.sparkContext
org.apache.spark.SparkContext
在你的Spark编程中,你会经常使用这两个对象。因为你经常使用 SparkContext,所以在shell中有一个名为 sc 的快捷对象可用,而不用再输入这个:
val rdd = spark.sparkContext.parallelize(nums)
你可以直接这样写:
val rdd = sc.parallelize(nums)
虽然在Spark中有更高层次的方法来处理数据 —— 你会在下面的章节中看到 —— 但RDD是一个基本的数据结构。Spark RDD专业编程指南( https://oreil.ly/Lmf0r )将其描述为“在集群的各个节点上分区的元素集合,可以并行操作”。Spark的创建者建议将RDD视为一个大型的分布式电子表格。
从技术上讲,RDD是一种不可改变的、具有容错性的、并行的数据结构。Hien Luu(Apress)撰写的 Beginning Apache Spark 2 一书指出,一个RDD由五项信息代表:
- 由一组分区构成数据集,每个分区表示一块数据。
- 一组对父RDDs的依赖关系。
- 一个用于计算数据集中所有行的函数。
- 关于分区方案的元数据(可选)。
- 数据在集群中的位置(可选)。如果数据在HDFS上,那么它将是文件块所在的位置。
当我们说RDD是一种并行数据结构时,这意味着一个大文件可以被分割到许多计算机上。在典型的使用案例中,一个数据集非常大,以至于无法容纳在一个节点上,所以它最终被分割到一个集群中的多个节点。关于分区,需要了解的一些基本情况是:
- 分区是Spark中并行的主要单位。
- Spark集群中的每个节点都包含一个或多个分区。
- 分区的数量是可以配置的。
- Spark提供了一个默认值,但你可以对它进行调整。
- 分区不跨越多个节点。
- Spark为集群的每个分区运行一个任务。
当你使用一个RDD时,数据中的每一行通常被表示为一个Java/Scala对象。这个对象的结构对Spark来说是未知的,所以除了提供 filter 和 map 这样的方法外,它不能帮助你进行处理,这些方法只知道如何使用从文件(如20.2小节中讨论的)创建的RDD,该文件被分解成块并分布在多台计算机上。
当我说Spark不能帮助你处理时,这意味着Spark也提供了更高层次的技术,你会在后面的展示Spark DataFrame 和 Dataset 的示例中看到。当使用这些数据结构时,你将能够使用SQL查询,而且Spark有一个优化引擎,可以帮助你执行查询。
有三种方法来创建RDD:
- 在一个集合上调用 parallelize。
- 从一个或多个文件中读取数据到RDD中(正如你将在下一章看到的例子)。
- 在现有的RDD上调用转换方法来创建一个新的RDD。
解决方案中展示了 parallelize 方法。这种方法一般只用于测试,根据RDD Programming Guide ( https://oreil.ly/X5BFx )的“Parallelized Collections”一节,parallelize 方法复制了一个集合中的内容,“以创建一个可以并行操作的分布式数据集”。
parallelize 方法需要一个可选的参数,以让你指定数据集可以被分成的分区数量:
val rdd = spark.sparkContext.parallelize(nums, 20)
rdd.getNumPartitions // Int = 20
RDD上有几十种方法可用。你可以像往常一样,通过创建一个RDD,然后输入一个小数,在小数后面按Tab键,在REPL中看到这些方法。这包括常见的Scala集合方法的实现,如 distinct、filter、foreach、map 和 take,以及Spark特有的其他方法。有几十种方法,所以请看RDD类Scaladoc( https://oreil.ly/kl3rg )和 RDD Programming Guide( https://oreil.ly/7FwWA )以了解更多可用方法的细节。
- 关于Spark独立模式的页面 spark.apache.org ( https://oreil.ly/AXtug )
- The Spark RDD Programming Guide( https://oreil.ly/7FwWA )
这些文章提供了关于Spark分区的更多细节:
- “How Data Partitioning in Spark Helps Achieve More Parallelism?”( https://oreil.ly/Qxter )
- “An Intro to Apache Spark Partitioning” ( https://oreil.ly/67uLx )
- “Spark Partitions” ( https://oreil.ly/d7ymD )
你想开始读取数据文件到Spark的RDD中。
展示如何将数据文件读入RDD的典型例子是单词统计应用,因此,为了不让人失望,本示例展示了如何读取Abraham Lincoln的Gettysburg Address演讲文本,并找出文本中每个单词的使用次数。
启动Spark shell后,第一步是使用前一小节中介绍的 SparkContext 变量 sc 的 textFile 方法,来读取一个名为 Gettysburg-Address.txt 的文件:
scala> val fileRdd = sc.textFile("Gettysburg-Address.txt")
fileRdd: org.apache.spark.rdd.RDD[String] = Gettysburg-Address.txt
MapPartitionsRDD[1] at textFile at <console>:24
这个例子假设 Gettysburg-Address.txt 在当前目录下。
textFile 方法用于读取纯文本文件,它返回一个 RDD[String] 类型的对象。它也是惰性的,这意味着什么也不会发生。事实上,如果你拼错了文件名,你要等到一段时间后调用一个非惰性的action方法时才会发现。
剩下的字数计算的解决方案是纯Scala代码。你只需在RDD上调用一系列的转换方法,就能得到你想要的解决方案:
val counts = fileRdd.map(_.replaceAll("[.,]", ""))
.map(_.replace("—", " "))
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.sortBy(_._2)
.collect
在这个解决方案中,只有Spark独有的东西是:
- 在 fileRdd 上调用 map 方法(其类型为 RDD[String])
- 在结尾处调用 collect 方法
正如前面提到的,因为所有的Spark转换方法都是惰性的,所以你必须调用一个急切的action方法,如 collect,以使action开始执行。
在Spark 3.1中,当你有一个像这样的多行表达式时,你必须用它的 :paste 命令将其粘贴到Spark shell中。其步骤是:
- 在shell中输入 :paste
- 从文本编辑器中复制你的表达式,并使用Command+V或类似的方法将其粘贴到REPL中
- 输入Control+D来结束过去的操作
然后,shell会解释你的表达式。
下面解释一下这段代码是如何工作的。首先,我用这段代码创建为一个类型为 **RDD[String]**的 fileRdd:
val fileRdd = sc.textFile("Gettysburg-Address.txt")
因为 textFile 是惰性的,所以实际上还没有发生什么。
接下来,因为我想计算单词次数,所以我用这两个 map 转换调用去除了文本中的小数、逗号和连字符:
.map(_.replaceAll("[.,]", ""))
.map(_.replace("—", " "))
然后,我使用这个 flatMap 表达式将文本转换为一个词的数组:
.flatMap(line => line.split(" "))
如果你观察一下这两个 map 表达式和这个 flatMap 表达式的结果,你将会看到一个 Array[String]:
Array(Four, score, and, seven, years, ago, our, fathers ...
为了从这个 Array[String] 中得到一个包含所有单词和它们出现次数的集合 —— Map[String, Int] —— 的解决方案,下一步是将每个单词变成一个元组:
.map(word => (word, 1))
在这一点上,中间的数据看起来像这样:
Array[(String, Int)] = Array((Four,1), (score,1), (and,1), (seven,1) ...
接下来,我使用了 reduceByKey:
.reduceByKey(_ + _)
这就把中间数据转变成了这个数据结构:
Array[(String, Int)] = Array((nobly,1), (under,1), (this,4), (have,5) ...
如上所示,这是一个元素是二元组的数组,每个元组的第一个值是演讲中的一个词,第二个值是该词出现次数的计数。这就是 reduceByKey 为我们做的事情。
接下来,你可以通过元组的第二个元素对该数据进行排序:
.sortBy(_._2)
最后,你调用 collect 方法来强制运行所有的转换:
.collect
因为这个数据结构的类型是 Array[(String, Int)],你可以对它调用 counts.foreach(println)。其输出的结尾展示了演讲中最常见的词:
scala> counts.foreach(println) [data omitted here]
(here,8)
(we,8)
(to,8)
(the,9)
(that,13)
有两种主要的方法可以将文本文件读入RDD:
- sparkContext.textFile
- sparkContext.wholeTextFiles
textFile 方法将一个文件作为一个行的集合来读取。它可以从本地文件系统中读取文件,也可以分别使用 “hdfs://” 和 “s3a://” URL从Hadoop或Amazon S3文件系统中读取。
根据 RDD Programming Guide( https://oreil.ly/7FwWA )所述,textFile “需要一个可选的第二个参数来控制文件的分区数量。默认情况下,Spark为文件的每个区块创建一个分区(HDFS中的区块默认为128MB),但你也可以通过传递一个更大的值来要求更多的分区数量。”
下面是一些使用 textFile 的例子:
textFile("/foo/file.txt") // read a file, using the default
// number of partitions
textFile("/foo/file.txt", 8) // same, but with 8 partitions
textFile("/foo/bar.txt", "/foo/baz.txt") // read multiple files
textFile("/foo/ba*.txt") // read multiple files
textFile("/foo/*") // read all files in 'foo'
textFile("/a/1.txt", "/b/2.txt") // multiple files in different
// directories
textFile("hdfs://.../myfile.csv") // use a Hadoop URL
textFile("s3a://.../myfile.csv") // use an Amazon S3 URL
请注意,s3a URL 前缀是Hadoop S3连接器的名称,之前被命名为 s3 和 s3n,所以你可能会在旧的文档中看到这些用法。
wholeTextFiles 方法将整个文件读成一个 String,并返回一个包含元组的RDD,所以其返回类型是 RDD[(String, String)]。元组中的第一个字符串是被读取的文件名,第二个字符串是文件的全部内容。这个例子展示了它的工作原理:
scala> val rdd = sc.wholeTextFiles("Gettysburg-Address.txt")
rdd: org.apache.spark.rdd.RDD[(String, String)] = Gettysburg-Address.txt
MapPartitionsRDD[5] at wholeTextFiles at <console>:24
scala> rdd.foreach(t => println(s"${t._1} | ${t._2.take(15)}"))
file:/Users/al/Projects/Books/ScalaCookbook2Examples/16_Ecosystem/Spark/ ↵
SparkApp1/Gettysburg-Address.txt | Four score and
第二个表达式的输出显示该元组包含文件名和文件内容。
Spark还包含其他方法,用于将文件读入 DataFrame 或 Dataset:
- spark.read.text() 用于将文本文件读入 DataFrame。
- spark.read.textFile() 用于将文本文件读入 Dataset[String]。
- spark.read.csv() 和 spark.read.format("csv").load("") 用于将CSV文件读入 DataFrame。
这些方法在下面的示例中都有展示。
当你使用RDD的transformation和action方法获得最终结果时,你可能想保存结果。此时你可以使用 saveAsTextFile 方法将RDD保存到磁盘。下面这条命令会将RDD保存到 /tmp 目录下名为 MyRddOutput 的目录中:
myRdd.saveAsTextFile("/tmp/MyRddOutput")
这样做之后,你会在 /tmp/MyRddOutput 下发现一系列文件,它们代表了名为 myRdd 的RDD。注意,如果该目录已经存在,该操作将失败,并出现异常。
在macOS系统中,/etc/passwd 文件包含7个字段,以 : 字符为界,但它也包含初始注释行,每个注释行以 # 字符开头。要将该文件读入RDD,你需要跳过这些初始注释行。
一种方法是像往常一样将该文件读入RDD:
val fileRdd = spark.sparkContext.textFile("/etc/passwd")
接下来,创建一个样例类来模拟七列格式:
case class PasswordRecord (
username: String,
password: String,
userId: Int,
groupId: Int,
comment: String,
homeDirectory: String,
shell: String
)
现在你可以将 fileRdd 转换成一个新的RDD,首先过滤掉所有以 # 字符开头的记录,然后将剩下的七列字段转换成一个 PasswordRecord:
val rdd = fileRdd
.filter(! _.startsWith("#"))
.map { line =>
val row = line.split(":")
PasswordRecord(
row(0), row(1), row(2).toInt, row(3).toInt, row(4), row(5), row(6)
)
}
这样做之后,你会看到 rdd 只包含文件中以冒号分隔的行:
scala> rdd.take(3)
res1: Array[PasswordRecord] = Array(
PasswordRecord(nobody,*,-2,-2,Unprivileged User,/var/empty,/usr/bin/false),
PasswordRecord(root,*,0,0,System Administrator,/var/root,/bin/sh),
PasswordRecord(daemon,*,1,1,System Services,/var/root,/usr/bin/false)
)
现在你可以像以前一样使用这个RDD工作了。
- SparkByExamples.com 关于读取文本文件的页面( https://oreil.ly/m6MOR )是一个很好的资源。
你想把一个CSV文件读到一个RDD里。
要将一个格式良好的CSV文件读入RDD:
- 创建一个样例类来模拟文件数据。
- 使用 sc.textFile 读取文件。
- 通过将数据中的每一行映射到你的样例类的一个实例来创建一个RDD。
- 根据需要操作数据。
下面的例子使用一个名为 TomHanksMoviesNoHeader.csv 的文件展示了这些步骤,该文件有以下内容:
1995, Toy Story, 8.3
2000, Cast Away, 7.8
2006, The Da Vinci Code, 6.6
2012, Cloud Atlas, 7.4
1994, Forrest Gump, 8.8
首先,创建一个与文件中的数据相匹配的样例类:
case class Movie(year: Int, name: String, rating: Double)
接下来,将文件读入一个初始的RDD中:
val fileRdd = sc.textFile("TomHanksMoviesNoHeader.csv")
然后在 fileRdd 上调用 map,创建一个名为 movies 的新RDD:
val movies = fileRdd.map { row =>
val fields = row.split(",").map(_.trim)
Movie(fields(0).toInt, fields(1), fields(2).toDouble)
}
代码块中的第一行在逗号字符上分割每一行,然后修剪该行中的每个结果字段。当该代码块完成后,movies 的类型为 org.apache.spark.rdd.RDD[Movie]。
现在你可以根据需要使用前面示例中展示的算子来处理电影RDD:
scala> :type movies
org.apache.spark.rdd.RDD[Movie]
scala> movies.first
res0: Movie = Movie(1995,Toy Story,8.3)
scala> movies.take(2)
res1: Array[Movie] = Array(Movie(1995,Toy Story,8.3), Movie(2000,Cast Away,7.8))
scala> movies.filter(_.rating > 7).filter(_.year > 2000).collect
res2: Array[Movie] = Array(Movie(2012,Cloud Atlas,7.4))
这个小节展示了如何将CSV文件读入RDD。你也可以使用SQL来处理CSV文件 —— 来自Spark SQL模块 —— 这将在20.5小节和20.6小节中演示。
你可以在不使用样例类的情况下将CSV文件读入RDD,但这个过程会比较麻烦。如果你想使用这种方法,就像以前一样从读取文件开始:
// RDD[String]
val fileRdd = sc.textFile("TomHanksMoviesNoHeader.csv")
然后在逗号字符上分割每一行,并修剪每个产生的字段:
// movies: RDD[Array[String]]
val movies = rdd.map(row => row.split(",")
.map(field => field.trim))
如注释所示,movies 具有 RDD[Array[String]] 类型。现在你可以根据需要处理这些数据,但必须将每一行当作一个字符串数组:
scala> movies.take(2)
Array[Array[String]] = Array(Array(Year, Name, IMDB),
Array(1995, Toy Story, 8.3))
scala> movies.filter(row => row(0).toInt > 2000).collect
res0: Array[Array[String]] = Array(
Array(2006, The Da Vinci Code, 6.6),
Array(2012, Cloud Atlas, 7.4)
)
与其说是使用RDD,不如说是像使用SQL数据库一样使用Spark,这样你就可以用SQL命令查询你的数据。
如果希望像使用数据库一样使用Spark,应当使用 DataFrame 而不是RDD。一个 DataFrame 大致相当于一个数据库表,当然,在Spark中,DataFrame 可能分布在成千上万的计算机上。
首先,创建一个数组,然后从数组中创建一个RDD,就像你在20.1小节中所做的那样:
val nums = Array.range(1, 10) // Array[Int] = Array(1,2,3,4,5,6,7,8,9)
val rdd = sc.parallelize(nums) // RDD[Int]
接下来,使用RDD的 toDF 方法将该RDD转换为 DataFrame。当使用数组时,产生的数据集只有一列数据,这个命令在创建 DataFrame 时给这一列命名为“num”:
val df = rdd.toDF("num") // org.apache.spark.sql.DataFrame
如注释所示,变量 df 的类型为 org.apache.spark.sql.DataFrame。现在你可以开始使用 DataFrame 的一些类似SQL的功能。首先,这里有一些类似数据库的元数据方法:
scala> df.printSchema
root
|-- num: integer (nullable = false)
scala> df.show(2)
+---+
|num|
+---+
| 1|
| 2|
+---+
only showing top 2 rows
scala> df.columns
res0: Array[String] = Array(num)
scala> df.describe("num").show
+-------+------------------+
|summary| num|
+-------+------------------+
| count | 9|
| mean | 5.0|
| stddev|2.7386127875258306|
| min | 1|
| max | 9|
+-------+------------------+
再次强调,转换方法是惰性的,所以我使用 show 方法在Spark shell中查看结果。
一旦你有了一个 DataFrame,就可以用Spark SQL API执行类似SQL的查询:
scala> df.select('num).show(2)
+---+
|num|
+---+
| 1|
| 2|
+---+
only showing top 2 rows
scala> df.select('num).where("num > 6").show
+---+
|num|
+---+
| 7|
| 8|
| 9|
+---+
scala> df.select('num)
.where("num > 6")
.orderBy(col("num").desc)
.show
+---+
|num|
+---+
| 9|
| 8|
| 7|
+---+
这些例子表明,有几种不同的方法可以用来引用列名。下面的查询展示了不同的方法,而且它们都返回相同的结果:
df.select('num)
df.select(col("num"))
df.select(column("num"))
df.select($"num")
df.select(expr("num"))
正如你所见的,使用 DataFrame 查询与使用SQL查询和数据库类似,尽管在Spark中你是使用Spark SQL API( https://spark.apache.org/sql )来进行查询。
在解决方案中展示的例子中,我让数据源 —— 我的数组 —— 隐式地定义了模式。通过将数组转换为RDD,然后用 toDF 转换为 DataFrame,Spark能够推断出数据的类型。这种让Spark推断数据类型的方法被称为 schema-on-read,这通常是很有用。
像电子表格或数据库表一样,DataFrame 由一系列记录组成,这些记录由Spark SQL Row 建模。使用 Row、StructType 和 StructField 类型,你也可以 显式地 定义模式,类似于你与传统SQL数据库和模式的工作方式:
import org.apache.spark.sql.Row
val rdd = sc.parallelize(
Array(
Row(1L, 0L, "zero", "cero", "zéro"),
Row(2L, 1L, "one", "uno", "un"),
Row(3L, 2L, "two", "dos", "deux"),
Row(4L, 3L, "three", "tres", "trois"),
Row(5L, 4L, "four", "cuatro", "quatre")
)
)
如上所示,数据中的每一行都有五列。接下来,你使用Spark SQL的 StructType 和 StructField 类定义一个描述这些列的模式:
import org.apache.spark.sql.types._
// 'schema' has the type org.apache.spark.sql.types.StructType
val schema = StructType(
Array(
StructField("id", LongType, false), // not nullable
StructField("value", LongType, false), // not nullable
StructField("name", StringType, true), // nullable (English)
StructField("nombre", StringType, true), // nullable (Spanish)
StructField("nom", StringType, true) // nullable (French)
)
)
这段代码声明,数据由五列组成,有这些名称和属性:
- id 是一个长整型,不可为空。
- value 是一个长整型,不可为空。
- name 是一个字符串,可以包含空值(这些是数字的英语名称)。
- nombre 也是一个字符串,可以包含空值(这些是西班牙语名字)。
- nom 也是一个字符串,可以包含空值(这些是法语名称)。
一切准备就绪后,你可以使用RDD和模式创建一个 DataFrame:
scala> val df = spark.createDataFrame(rdd, schema)
df: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 1 more field]
scala> df.printSchema
root
|-- id: long (nullable = false)
|-- value: long (nullable = false)
|-- name: string (nullable = true)
|-- nombre: string (nullable = true)
|-- nom: string (nullable = true)
现在你可以像以前一样查询 DataFrame:
scala> df.select('id, 'name, 'nombre)
.limit(2)
.show
+---+----+------+
| id|name|nombre|
+---+----+------+
| 1|zero| cero|
| 2| one| uno|
+---+----+------+
scala> df.where("value > 1").show(2)
+---+-----+-----+------+-----+
| id|value| name|nombre| nom |
+---+-----+-----+------+-----+
| 3| 2| two| dos| deux|
| 4| 3|three| tres|trois|
+---+-----+-----+------+-----+
only showing top 2 rows
scala> df.select('value, 'name, 'nombre, 'nom)
.where("id > 1")
.where("id < 4")
.show
+-----+----+------+----+
|value|name|nombre| nom|
+-----+----+------+----+
| 1| one| uno| un|
| 2| two| dos|deux|
+-----+----+------+----+
scala> df.where('id > 1)
.orderBy('name)
.show
+---+-----+-----+------+------+
| id|value| name|nombre| nom|
+---+-----+-----+------+------+
| 5| 4| four|cuatro|quatre|
| 2| 1| one| uno| un|
| 4| 3|three| tres| trois|
| 3| 2| two| dos| deux|
+---+-----+-----+------+------+
scala> df.filter('nom.like("tro%")).show
+---+-----+-----+------+-----+
| id|value| name|nombre| nom |
+---+-----+-----+------+-----+
| 4| 3|three| tres|trois|
+---+-----+-----+------+-----+
除了类似SQL的查询外,当你在处理巨大的数据集时,查看一小部分数据的子集也会有帮助。Spark SQL有一个 sample 方法来帮助解决这种情况。sample 需要三个参数:
val withReplacement = false
val fraction = 0.2
val seed = 31
当你调用 sample 时,它会返回一个数据子集,其行数大致与 fraction 参数相对应:
scala> df.sample(withReplacement, fraction, seed).show
+---+-----+-----+------+-----+
| id|value| name|nombre| nom |
+---+-----+-----+------+-----+
| 3| 2| two| dos| deux|
| 4| 3|three| tres|trois|
+---+-----+-----+------+-----+
关于 sample 的更多细节,请参阅Spark Dataset 文档( https://oreil.ly/2hSCr )。
当你需要在Spark文档中搜索 sample 之类的 DataFrame 方法时,搜索“Dataset”而不是“DataFrame”会更好。这是因为DataFrame只是 Dataset 的一个特定实例。具体来说,DataFrame 是一个 Row 类型的 Dataset 。
虽然我倾向于使用 'foo 或 col("foo") 来指代一个名为“foo”的列,但另一种方法是将其称为 expr("foo")。使用这种语法的查询看起来像这样:
df.select(expr("value")).show
df.select(expr("value"), expr("value")).show
因为这是一种常见的编程模式,所以也有一个名为 selectExpr 的方法,它将 select 和 expr 合并为一个方法。下面是一些关于它的用法的例子:
scala> df.selectExpr("value").show(2)
+-----+
|value|
+-----+
| 0|
| 1|
+-----+
only showing top 2 rows
scala> df.selectExpr("avg(value)").show
+----------+
|avg(value)|
+----------+
| 2.0|
+----------+
scala> df.selectExpr("count(distinct(value))").show
+---------------------+
|count(DISTINCT value)|
+---------------------+
| 5|
+---------------------+
scala> df.selectExpr("count(distinct(nombre))").show
+----------------------+
|count(DISTINCT nombre)|
+----------------------+
| 5|
+----------------------+
- 在使用“schema-on-read”和其对应的“schema-on-write”时,有一些性能方面的考虑。搜索这些术语可以获得关于性能问题的最新文章。
- Spark DataTypes Javadoc( https://oreil.ly/ULt9t )显示了定义 StructField 时可用的不同 DataType 值。这些类型是:BinaryType、BooleanType、ByteType、CalendarIntervalType、DateType、Double Type、FloatType、IntegerType、LongType、NullType、ShortType、StringType和 TimestampType。
- Spark Dataset Javadoc( https://oreil.ly/2hSCr )显示了可以在 DataFrame(或 Dataset )上调用的所有方法。
你想把一个CSV文件读入到一个 DataFrame中。
给出一个名为 TomHanksMoviesNoHeader.csv 的文件:
1995, Toy Story, 8.3
2000, Cast Away, 7.8
2006, The Da Vinci Code, 6.6
2012, Cloud Atlas, 7.4
1994, Forrest Gump, 8.8
指定一个模式,使用这两个选项中的任何一个来读取文件:
import org.apache.spark.sql.types._
// syntax option 1
val schema = StructType(
Array(
StructField("year", IntegerType, false),
StructField("name", StringType, false),
StructField("rating", DoubleType, false)
)
)
// syntax option 2
val schema = new StructType()
.add("year", IntegerType, false)
.add("name", StringType, false)
.add("rating", DoubleType, false)
然后像这样把文件读到一个 DataFrame 中:
// paste this into the spark shell with ":paste"
val df = spark.read
.format("csv")
.schema(schema)
.option("delimiter", ",")
.load("TomHanksMoviesNoHeader.csv")
对于像这样一个真正的CSV文件,delimiter 的设置是没有必要的,但我要表明的是,当你的文件有不同的分隔符,如tab、|、或其他字符时,你可以使用它。
一旦你加载了这样的文件,你就可以像前面的示例所示的那样处理它:
scala> df.printSchema
root
|-- year: long (nullable = true)
|-- name: string (nullable = true)
|-- rating: double (nullable = true)
scala> df.show(2)
+----+----------+------+
|year| name|rating |
+----+----------+------+
|1995| Toy Story| 8.3|
|2000| Cast Away| 7.8|
+----+----------+------+
only showing top 2 rows
scala> df.select('year, 'name, 'rating).limit(2).show
+----+----------+------+
|year| name|rating|
+----+----------+------+
|1995| Toy Story| 8.3|
|2000| Cast Away| 7.8|
+----+----------+------+
scala> df.where('year > 1998).where('rating > 7.5).show
+----+----------+------+
|year| name|rating|
+----+----------+------+
|2000| Cast Away| 7.8|
+----+----------+------+
scala> df.where('rating > 7.5).orderBy('rating.desc).show
+----+-------------+------+
|year| name|rating|
+----+-------------+------+
|1994| Forrest Gump| 8.8|
|1995| Toy Story| 8.3|
|2000| Cast Away| 7.8|
+----+-------------+------+
scala> df.filter('name.like("%T%")).show
+----+------------------+------+
|year| name|rating|
+----+------------------+------+
|1995| Toy Story| 8.3|
|2006| The Da Vinci Code| 6.6|
+----+------------------+------+
根据Spark DataTypes Javadoc( https://oreil.ly/ULt9t ),在创建 StructField 时,你可以使用以下数据类型。BinaryType、BooleanType、ByteType、CalendarIntervalType、DateType、DoubleType、FloatType、IntegerType、LongType、Null Type、ShortType、StringType 和 TimestampType。这些类型的名称大多是自解释的,但更多的细节请看Javadoc页面,也请看MungingData这篇文章( https://oreil.ly/gsVIr ),以了解更多关于处理空值的细节。
读取CSV文件时,如果你愿意,你可以把选项指定为 Map:
val df = spark.read
.options(
Map(
"header" -> "true",
"inferSchema" -> "true",
"delimiter" -> ","
)
)
.csv("TomHanksMoviesNoHeader.csv")
另外,正如20.1小节中所讨论的,spark 变量是 SparkSession 的一个实例。它的 read 方法返回一个 DataFrameReader( https://oreil.ly/Qs2Yq ),它被描述为一个“用于从外部存储系统加载 Dataset 的接口”。该Javadoc页面显示了二十多个用于处理CSV文件的选项。
一旦你在 DataFrame 中得到了想要的结果,你可以把它们写到一个目录中。这个例子将结果写到当前目录下一个名为 DataFrameResults 的目录中:
df.write
.option("header","true")
.csv("DataFrameResults")
该目录有一个CSV文件,包含了 DataFrame 中的结果。
下一个示例展示了如何像数据库表一样使用 DataFrame,但作为预览,当你调用 DataFrame 的 createOrReplaceTempView 方法时,你创建了一个类似数据库的视图:
val df = spark.read
.format("csv")
.schema(schema)
.option("delimiter", ",")
.load("TomHanksMoviesNoHeader.csv")
df.createOrReplaceTempView("movies")
createOrReplaceTempView 会创建一个本地临时视图,就像11.4小节“在集合上创建惰性视图”中详述的那样,一旦你有了这个视图,就可以使用普通的SQL来查询你的数据:
val query = "select * from movies where rating > 7.5 order by rating desc"
scala> spark.sql(query).show
+----+-------------+------+
|year| name|rating|
+----+-------------+------+
|1994| Forrest Gump| 8.8|
|1995| Toy Story| 8.3|
|2000| Cast Away| 7.8|
+----+-------------+------+
下一小节有更多关于这种语法的例子。
你想看看如何对一个数据集使用真正的SQL查询,包括对多个文件使用连接(join)。
这个示例使用MovieLens数据集( https://oreil.ly/3fSf0 ),其中包含超过62,000部电影的数据。要遵循这个示例,请参阅该链接,以了解如何下载数据集的信息。
在你下载了这些数据之后,你可以看一下CSV文件的第一行,看看它们的格式。这些Unix head 命令显示了本示例中使用的三个文件的第一行,包括每个文件顶部的标题行:
$ head -3 movies.csv
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
$ head -3 ratings.csv
userId,movieId,rating,timestamp
1,296,5.0,1147880044
1,306,3.5,1147868817
$ head -3 tags.csv
userId,movieId,tag,timestamp
3,260,classic,1439472355
3,260,sci-fi,1439472256
将这些数据文件放在当前目录下,用 spark-shell 命令启动Spark shell。然后将每个文件读入一个 DataFrame,如前面的例子中所示:
// paste these into the REPL with ":paste"
val moviesDf = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("movies.csv")
val ratingsDf = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("ratings.csv")
val tagsDf = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("tags.csv")
这些数据将需要一段时间才能读入。接下来,在每个文件上创建临时视图,并给每个视图起一个名字,如下面方法调用所示:
moviesDf.createOrReplaceTempView("movies")
ratingsDf.createOrReplaceTempView("ratings")
tagsDf.createOrReplaceTempView("tags")
现在你可以根据需要使用真正的SQL查询来处理这些数据。例如,要查看所有的电影(movies)和它们的评级(ratings),可以连接 movies 和 ratings 视图:
val query = """
select * from movies, ratings
where movies.movieId == ratings.movieId
and rating == 5.0
"""
spark.sql(query).limit(4).show
在没有 limit 的情况下,该查询将返回超过360万行,但在有 limit 的情况下,输出看起来是这样的:
+-------+--------------------+----------------+------+-------+------+----------+
|movieId| title| genres|userId|movieId|rating| timestamp|
+-------+--------------------+----------------+------+-------+------+----------+
| 296| Pulp Fiction (1994)|Comedy|Crime|Dra| 1| 296| 5.0|1147880044|
| 307|Three Colors: Blu...| Drama| 1| 307| 5.0|1147868828|
| 665| Underground (1995)|Comedy|Drama|War| 1| 665| 5.0|1147878820|
| 1237|Seventh Seal, The...| Drama| 1| 1237| 5.0|1147868839|
+-------+--------------------+----------------+------+-------+------+----------+
作为另一个例子,这个查询显示了找到所有五星评级的喜剧/爱情电影的一种方法:
val query = """
select distinct(m.title), r.rating, m.genres
from movies m, ratings r
where m.movieId == r.movieId
and m.genres like \'%Comedy%Romance%'
and r.rating == 5.0
order by m.title"""
scala> spark.sql(query).show
+--------------------+------+--------------------+
| title|rating| genres|
+--------------------+------+--------------------+
|(500) Days of Sum...| 5.0|Comedy|Drama|Romance|
| (Girl)Friend (2018)| 5.0| Comedy|Romance|
| 10 (1979)| 5.0| Comedy|Romance|
|10 Items or Less ...| 5.0|Comedy|Drama|Romance|
output continues ...
当你知道SQL和数据的组织结构,便可以运行其他的查询,比如这个,它提供了一个所有喜剧/爱情电影的五星评级的排序计数:
val query = """
select m.title, count(1) as the_count
from movies m, ratings r
where m.movieId == r.movieId
and m.genres like \'%Comedy%Romance%'
and r.rating == 5.0
group by m.title
order by the_count desc
"""
// using 'false' in 'show' tells spark to print the full column widths
spark> spark.sql(query).show(100, false)
+------------------------------------------------------+---------+
|title |the_count|
+------------------------------------------------------+---------+
|Forrest Gump (1994) |25918 |
|Princess Bride, The (1987) |13311 |
|Amelie (Fabuleux destin d'Amélie Poulain, Le) (2001) |10395 |
|Life Is Beautiful (La Vita è bella) (1997) |8466 |
(the output continues ...)
如下所示,视图是通过在 DataFrame 或 Dataset 上调用 createOrReplaceTempView 创建的:
moviesDf.createOrReplaceTempView("movies")
该命令创建(或替换)一个名为 movies 的临时表。该视图的生命周期与用于创建 DataFrame 的 SparkSession 相关联。如果你想放弃该视图,请使用此命令:
spark.catalog.dropTempView("movies")
Spark的性能是一个庞大的话题,但要知道的一个重要概念是窄转换和宽转换之间的区别。窄转换( narrow transformation 也叫窄依赖)是指计算一个分区的结果所需的所有数据都在该分区中。在 宽转换( wide transformation 或宽依赖)中,计算一个结果所需的所有元素可能存在于多个不同的分区中。
宽转换会导致 洗牌( shuffle ),例如,当数据必须在分区之间重新排列时就会发生洗牌。数据的移动大大降低了处理速度,所以洗牌对性能不利。窄转换往往涉及像 filter 和 map 这样的方法,而宽转换涉及像 groupByKey 和 reduceByKey 这样的方法。
对于窄转换,Spark可以对多个转换进行 流水线 处理,这意味着它们将全部在内存中执行。同样的情况不能发生在宽转换中,当必须进行洗牌时,结果会被写入磁盘。
谷歌目前为搜索短语“Spark minimize data shuffling”返回了几十万个结果,所以如你所见,这确实是一个很大话题。
随着你对Spark SQL的进一步了解,使用它的 explain 方法来理解查询的工作原理有时是有帮助的。当你在一个 DataFrame 上调用 explain时,会看到这样的输出:
scala> moviesDf.where('movieId > 100).explain
== Physical Plan ==
*(1) Project [movieId#16, title#17, genres#18]
+- *(1) Filter (isnotnull(movieId#16) AND (movieId#16 > 100))
+- FileScan csv [movieId#16,title#17,genres#18] Batched: false,
DataFilters: [isnotnull(movieId#16), (movieId#16 > 100)],
Format: CSV, Location: InMemoryFileIndex[file:/Users/al/...,
PushedFilters: [IsNotNull(movieId), GreaterThan(movieId,100)],
ReadSchema: struct<movieId:int,title:string,genres:string>
还有这个:
scala> moviesDf.sort("title").explain
== Physical Plan ==
*(1) Sort [title#17 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(title#17 ASC NULLS FIRST, 200), true, [id=#667]
+- FileScan csv [movieId#16,title#17,genres#18] Batched: false,
DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/al...,
PartitionFilters: [], PushedFilters: [],
ReadSchema: struct<movieId:int,title:string,genres:string>
关于 explain 输出的一些说明:
- 输出的第一部分显示了最终结果。
- 输出的其他部分解释了数据来源。
- 每行开头的关键词往往很重要,在这些例子中,你看到了 Filter、FileScan、Sort 和 Exchange 等词。
关于 explain 操作符的更多细节,David Vrba的这篇关于掌握查询计划的Medium文章( https://oreil.ly/VHJ36 )是一个很好的资源,他的视频“Physical Plans in Spark SQL”( https://oreil.ly/89JUF )也是。
Spark SQL包括几种处理时间戳的方法。from_unixtime 展示了显示时间戳字段的一种方法:
val query = """
select userId,movieId,tag,from_unixtime(timestamp) as time
from tags
limit 3
"""
scala> spark.sql(query).show
+------+-------+--------------------+-------------------+
|userId|movieId| tag| time|
+------+-------+--------------------+-------------------+
| 3| 260| classic|2015-08-13 07:25:55|
| 3| 260| sci-fi|2015-08-13 07:24:16|
| 4| 1732| dark comedy|2019-11-16 15:33:18|
+------+-------+--------------------+-------------------+
- 如果你想在读取数据文件时使用模式(schema),请参阅20.4小节和20.5小节中有关如何使用MovieLens dataset( https://oreil.ly/3fSf0 )的部分。关于该数据集的细节,请参阅F.Maxwell Harper和Joseph A. Konstan的开创性论文( https://doi.org/10.1145/2827872 )。
你想写一个Spark应用程序,它将以批处理模式定期运行。
虽然之前的所有示例都展示了如何在命令行shell(REPL)中使用Spark,但在本小节中,你将创建20.2小节中所示的单词统计应用程序的批处理命令行版本。
创建Spark应用程序就像创建其他Scala应用程序一样,你使用像sbt这样的构建工具并导入所需要的Spark依赖。
首先创建一个 build.sbt 文件:
name := "SparkApp1"
version := "0.1"
scalaVersion := "2.12.12"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "3.1.1"
)
在写这篇文章的时候,Spark 3.1目前可以和Scala 2.12一起使用,这就是我指定该Scala版本的原因。
接下来,创建你的应用程序。对于这样一个单文件的小程序,可以把下面的代码放在sbt项目根目录下的一个名为 WordCount.scala 的文件中:
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
object WordCount {
def main(args: Array[String]) {
val file = "Gettysburg-Address.txt"
val spark: SparkSession = SparkSession.builder
.appName("Word Count")
.config("spark.master", "local")
.getOrCreate()
val fileRdd: RDD[String] = spark.sparkContext.textFile(file)
val counts = fileRdd.map(_.replaceAll("[.,]", ""))
.map(_.replace("—", " "))
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.sortBy(_._2)
.collect
println( "------------------------------------------")
counts.foreach(println)
println( "------------------------------------------")
spark.stop()
}
}
这个解决方案的关键是知道如何用 SparkSession.builder 创建一个 SparkSession,如何配置该会话(session),以及如何将一个文件读取为 RDD。该应用的其余部分是正常的Scala代码。
现在,为你的应用程序构建JAR文件:
$ sbt package
一旦该JAR文件被创建,就能用 spark-submit 命令运行它:
$ spark-submit --class WordCount target/scala-2.12/sparkapp1_2.12-0.1.jar
如果一切顺利,你会看到 counts.foreach 语句的 大量 的输出。
对于较大的实际应用,你需要使用像sbt-assembly( https://oreil.ly/3LVap )这样的sbt插件来创建一个单一的JAR文件,其中包含所有的应用代码和依赖。参阅17.11小节,“部署一个可执行的JAR文件”,以了解关于sbt-assembly的讨论。
当你的应用程序使用不同的Spark功能时,你还需要包括其他的Spark依赖项。最新的Spark JAR文件,包括Spark机器学习库和Streaming库,可以在 org.apache.spark Maven列表( https://oreil.ly/SwPDe )中找到。在写这篇文章的时候,当前的依赖URL是这样的:
"org.apache.spark" %% "spark-mllib" % "3.1.1" % Provided
"org.apache.spark" %% "spark-streaming" % "3.1.1" % Provided
另外,这些依赖关系末尾的 Provided 关键字是sbt的方式,说明它们只在编译和测试等任务中需要,之后会通过一些其他方式提供。在这个例子中,生产中不需要这些依赖项,因为Spark的运行环境已经包含了它们。
在实际应用中,你还需要包括 -master 选项,以指定集群的主URL。除此以外,还有许多其他的 spark-submit 选项可以指定。这个命令的例子来自Spark的“提交应用程序”页面( https://oreil.ly/z1s5e ),它展示了 spark-submit 在实际使用时的选项的例子:
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master mesos://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
http://path/to/examples.jar \
1000
- Spark“提交应用程序”页面( https://oreil.ly/z1s5e )展示了如何使用 spark-submit 运行应用程序。
- Spark的“入门指南”页面( https://oreil.ly/CW2O1 )展示了如何创建和配置 SparkSession。
- Spark是一个很庞大的话题,仅靠一章很难去详细的介绍它。要想了解更深入的内容,请参阅 Spark: The Definitive Guide。