3.1 键值对RDD
到目前为止,我们已经使用了RDD,其中每行代表一个值,例如整数或字符串。在许多用例中,需要按某个键进行分组或聚合、联结两个RDD。现在看一下另一个RDD类型:键值对RDD。键值对的数据格式可以在多种编程语言中找到。它是一组数据类型,由带有一组关联值的键标识符组成。使用分布式数据时,将数据组织成键值对是有用的,因为它允许在网络上聚合数据或重新组合数据。与MapReduce类似,Spark以RDD的形式支持键值对数据格式。
在Scala语言中,Spark键值对RDD的表示是二维元组。键值对RDD在许多Spark程序中使用。当想在分布式系统中进行值聚合或重新组合时,需要通过其中的键进行索引,例如有一个包含城市级别人口的数据集,并且想要在省级别汇总,那么就需要按省对这些行进行分组,并对每个省所有城市的人口求和;另一个例子是提取客户标识作为键,以查看所有客户的订单。要想满足键值对RDD的要求,每一行必须包含一个元组,其中第一个元素代表键,第二个元素代表值。键和值的类型可以是简单的类型,例如整数或字符串,也可以是复杂的类型,例如对象或值的集合或另一个元组。键值对RDD带有一组API,可以围绕键执行常规操作,例如分组、聚合和连接。
代码3-1
上面的代码创建了键值对RDD,每一行为一个元组,其中键是长度,值是单词。它们被包裹在一对括号内。一旦以这种方式排列了每一行,就可以通过按键分组轻松发现长度相同的单词。以下各节将介绍如何创建键值对RDD,以及如何使用关联的转换和操作。
3.1.1 创建
创建键值对最常用的方法有:使用已经存在的非键值对;加载特定数据创建键值对;通过内存中的集合创建键值对。
虽然大多数Spark操作适用于包含任何类型对象的RDD,但是几个特殊操作只能在键值对的RDD上使用,例如按键分组或聚合元素,这些操作都需要进行分布式洗牌。键值对操作在PairRDDFunctions类中自动封装在元组RDD上。键值对RDD中的键和值可以是标量值或复杂值,可以是对象、对象集合或另一个元组。当使用自定义对象作为键值对RDD中的键时,该对象的类必须同时定义自定义的equals()和hashCode()方法。
语法解释:Scala元组结合多个固定数量的元素在一起,使它们可以被作为一个整体进行数据传递。不像一个数组或列表,元组可以容纳不同类型的对象,但它们也是不可改变的。下面是一个包括整数、字符串和Console的元组:
代码3-2
这是语法方糖,是下面代码的简写方式:
代码3-3
一个元组的实际类型取决于它包含的元素和这些元素的类型和数目。因此,该类型(99,"Luftballons")是Tuple2[Int,String];而('u','r',"the",1,4,"me")的类型是Tuple6[Char,Char,String,Int,Int,String]。元组类型包括Tuple1、Tuple2、Tuple3等,至少目前的上限为22,如果需要更多,可以使用一个集合,而不是一个元组。对于每个TupleN类型,其中1<=N<=22,Scala定义了许多元素的访问方法。假定定义一个元组t为
代码3-4
要访问元组t中的元素,可以使用方法t._1访问第一个元素,使用t._2访问第二个元素,以此类推。例如,下面的表达式计算t的所有元素的总和:
代码3-5
存在许多格式的数据可以直接加载为键值对,例如sequenceFile文件是Hadoop用来存储二进制形式的键值对[Key,Value]而设计的一种平面文件。在此示例中,SequenceFile由键值对(Category,1)组成,当加载到Spark中时,会产生键值对RDD,代码如下。
代码3-6
SequenceFile可用于解决大量小文件问题,SequenceFile是Hadoop API提供的一种二进制文件支持,直接将键值对序列化到文件中,一般对小文件可以使用这种文件合并,即将文件名作为键,文件内容作为值序列化到大文件中,读取SequenceFile的示例如下。
代码3-7
def sequenceFile[K,V](path:String,keyClass:Class[K],valueClass:Class[V]):RDD[(K,V)]
使用给定的键和值类型获取Hadoop SequenceFile的RDD。
· path为输入数据文件的目录,可以是逗号分隔的路径作为输入列表。
· keyClass为与SequenceFileInputFormat关联的键类。
· valueClass为与SequenceFileInputFormat关联的值类。
可以说,键值对RDD在许多程序中起着非常有用的构建块的作用。基本上,一些操作允许我们并行操作每个键,通过这一点可以在整个网络上重新组合数据。reduceByKey()方法分别为每个键聚合数据,而join()方法通过将具有相同键的元素分组将两个RDD合并在一起。可以从RDD中提取字段,例如客户ID、事件时间或其他标识符,然后将这些字段用作键值对RDD中的键。
· Scala模式匹配
Scala提供了强大的模式匹配机制,应用也非常广泛。一个模式匹配包含一系列备选项,每个都开始于关键字case。每个备选项都包含一个模式及一到多个表达式。箭头符号=>隔开了模式和表达式。上面的代码中使用了元组匹配模式,可以使用下面的例子学习其语法。
代码3-8
定义langs序列(Seq)变量,其中包含三个三维元组。
代码3-9
在for循环中定义了case模式匹配。第一个case匹配一个三元素元组,其中第一个元素是字符串“Scala”,忽略第二个和第三个参数;第二个case匹配任何三元素元组,元素可以是任何类型,但是由于输入的是langs,因此它们被推断为字符串。将元素提取为变量lang、first和last,输出结果为
代码3-10
在上面的代码中,一个元组可以分解成其组成元素。可以匹配元组中的字面值,在任何想要的位置,可以忽略不关心的元素。
使用Scala和Python语言,可以使用SparkContext.parallelize()方法从内存中的数据集合创建一键值对,代码如下。
代码3-11
在这个例子中,首先这是在内存中创建键值对集合dist1,然后通过SparkContext.parallelize()方法应用于dist1创建键值对dist1RDD。另外,在一组小文本文件上运行sc.wholetextFiles将创建键值对,其中键是文件的名称,而值为文件中的内容。
3.1.2 转换
键值对RDD允许使用标准RDD可用的所有转换,由于键值对包含元组,因此需要在转换方法中传递可以在元组上操作的函数。下面总结了键值对常用的转换。
■ 基于一个键值对RDD的转换
创建一个键值对RDD。
代码3-12
reduceByKey(func:(V,V)⇒V,numPartitions:Int):RDD[(K,V)]
调用包含(K,V)的数据集,返回的结果也为(K,V)。数据集中的每个键对应的所有值被聚集,使用给定的汇总功能func,其类型必须为(V,V)=>V。像groupByKey,汇总任务的数量通过第二个可选的参数numPartitions配置,这个参数用于设置RDD的分区数。
代码3-13
groupByKey(numPartitions:Int):RDD[(K,Iterable[V])]
调用包含(K,V)的数据集,返回(K,Iterable<V>)。如果分组的目的是为了对每个键执行聚集,如总和或平均值,使用reduceByKey或aggregateByKey将产生更好的性能。默认情况下,输出的并行任务数取决于RDD谱系中父RDD的分区数,可以通过一个可选的参数numPartitions设置不同数量的任务。
代码3-14
combineByKey[C](createCombiner:(V)⇒C,mergeValue:(C,V)⇒C,mergeCombiners:(C,C)⇒C):RDD[(K,C)]
使用相同的键组合值,产生与输入不同的结果类型,例子和详细说明见后面的部分。
mapValues[U](f:(V)⇒U):RDD[(K,U)]
对键值对RDD的每个值应用一个方法,而不用改变键。
代码3-15
flatMapValues[U](f:(V)⇒TraversableOnce[U]):RDD[(K,U)]
与mapValues相似,将键值对中的每个值传递给函数f而不改变键,不同的是将数据的内在结构扁平化。
代码3-16
keys:RDD[K]
将键值对RDD中每个元组的键返回,产生一个RDD。
代码3-17
values:RDD[V]
将键值对RDD中每个元组的值返回,产生一个RDD。
代码3-18
sortByKey(ascending:Boolean=true,numPartitions:Int=self.partitions.length):RDD[(K,V)]
当在数据集(K,V)上被调用时,K实现了有序化,返回按照键的顺序排列的数据集(K,V),在布尔参数ascending中指定升序或降序。
代码3-19
aggregateByKey[U](zeroValue:U)(seqOp:(U,V)⇒U,combOp:(U,U)⇒U)(implicit arg0:ClassTag[U]):RDD[(K,U)]
使用给定的组合函数和中性zeroValue聚合每个键的值。该函数可以返回与输入键值对RDD中的V值类型不同的结果类型U。因此,需要一个用于将V合并到U中的操作和一个用于合并两个U的操作,如在scala.TraversableOnce中,前一个函数seqOp用于合并分区中的值,后者combOp用于在分区之间合并值。为了避免内存分配,这两个函数都允许修改并返回其第一个参数,而不是创建一个新的U。
代码3-20
上面的代码中,通过定义myfunc函数,分别打印出RDD分区中的内容。
■ 基于两个键值对RDD的转换
创建两个键值对RDD,分别为
代码3-21
subtractByKey
从RDD中删除other中存在的键元素。
代码3-22
join(otherDataset,[numTasks])
在两个RDD之间执行内部连接。
代码3-23
rightOuterJoin
在两个RDD之间执行连接,其中键必须存在于other中。
代码3-24
leftOuterJoin
在两个RDD之间执行连接,其中键必须存在于rdd中。
代码3-25
cogroup(otherDataset,[numTasks])
将两个RDD具有相同键的值组合在一起。
代码3-26
3.1.2.1 聚合
当使用键值对描述数据集时,通常需要在具有相同键的所有元素上统计数据。对于基本的RDD的fold、combine和reduce操作,在键值对RDD上也有基于键的类似操作,这些操作基于相同的键进行汇集。这些操作是转换,而不是动作。
1.reduceByKey
基本上,reduceByKey函数仅适用于包含键值对元素类型的RDD,即Tuple或Map作为数据元素。这是一个转型操作,意味着被惰性评估。我们需要传递一个关联函数作为参数,该函数将应用于键值对RDD,创建带有结果值的RDD,即新的键值对。由于分区间可能发生数据Shuffle,因此此操作是一项涉及全数据集的广泛操作。
在数学中,关联属性是一些二元运算的属性。在命题逻辑中,关联性是在逻辑证明中替换表达式的有效规则。在包含同一个关联运算符的一行中出现两次或更多次的表达式中,只要操作数序列未更改,操作的执行次序就无关紧要。也就是说,重新排列这种表达式中的括号不会改变其值。考虑下面的等式:
关联性让我们可以按顺序并行使用相同的函数。reduceByKey使用该属性计算RDD的结果,RDD是由分区组成的分布式集合。直观地说,这个函数在重复应用于具有多个分区的同一组RDD数据时会产生相同的结果,而不管元素的顺序如何。此外,它首先使用Reduce函数在本地执行合并,然后在分区之间发送记录,以准备最终结果。通过下面的代码看一看reduceByKey的执行过程。
代码3-27
在图3-1中,可以看到RDD具有多个键值对元素,如(a,1)和(b,1),以及3个分区。在对整个分区之间的数据洗牌之前,先在每个本地分区中进行相同的聚合。可以使用reduceByKey与mapValues一起计算每个键的平均值,代码和图示(见图3-2)如下。
图3-1 reduceByKey运行示意图
图3-2 每键平均值计算的数据流
实际上,reduceByKey是aggregateByKey的一个特例。aggregateByKey有两个参数:一个应用于每个分区的聚合;另一个应用于分区之间的聚合。reduceByKey在上述两种情况下都使用相同的关联函数,在每个分区上都执行一遍,然后在分区间执行一遍,将第一遍的结果合并为最终结果。
2.combineByKey
combineByKey调用是一种聚合的优化。使用combineByKey值时,每个分区合并为一个值,然后将每个分区值合并为一个值。值得注意的是,组合值的类型不必与原始值的类型相匹配,而且通常不会。combineByKey函数将3个函数作为参数,第一个函数为创建组合器的函数,在aggregateByKey函数中,第一个参数只是zeroValue,在combineByKey中提供了一个函数,它将接受当前的值作为参数,并返回将与合成值合并的新值;第二个函数是一个合并函数,它接受一个值并将它合并或组合到先前收集的值中;第三个函数将合并的值组合在一起,基本上这个函数采用在分区级别上产生的新值,并将它们结合起来,直到得到一个最后的结果。下面是一段执行combineByKey的代码。
代码3-28
参考上面的代码,combineByKey需要三个函数,分别为createCombiner、mergeValue和mergeCombiner。
■createCombiner
combineByKey()方法中的第一个函数是必选参数,用作每个键的第一个聚合步骤。当在每个分区中,如果找不到每个键的组合器,createCombiner会为分区上每个遇到的第一个键创建初始组合器。上面的代码是用在分区中遇到的第一个值和为1的键计数器初始化一个tuple,其值为(v,1),v代表第一个遇到的值,表示存储组合器的存储内容为(sum,count)。
■ mergeValue
这是下一个必需的函数,告诉combineByKey当组合器被赋予一个新值时该怎么做。该函数的参数是组合器acc和新值v。组合器的结构在上面被定义为(sum,count)形式的元组,acc._1执行累加代表组合器中的sum,acc._2执行计数代表组合器中的count。所以,通过将新值v添加到组合器元组的第一个元素,同时加1到组合器元组的第二个元素合并新值。mergeValue只有在这个分区上已经创建了初始组合器(在我们的例子中为元组)时才被触发。
■ mergeCombiner
最终一个必需的函数告诉combineByKey如何合并分区之间的两个组合器。在这个例子中,每个分区组合器元组的形式为(sum,count),需要做的是将第一个分区依次到最后一个分区中的组合器加在一起。
最终的目标是逐个计算平均值averageByKey()。combineByKey()的结果是RDD,其格式为(label,(sum,count)),因此可以通过使用map()方法,映射(sum,count)到sum/count轻松获取平均值。接下来将数据的子集分解到多个分区,并在实际中看数据的计算方式。
3.1.2.2 分组
使用键值对数据,一个常见的用例是按键分组的数据,例如一起查看客户的所有订单。如果数据已经按照想要的方式组成键值对元组,groupByKey将使用RDD中的键对数据进行分组。在由K型键和V型值构成的RDD上,分组后得到[K,Iterable[V]]类型的RDD。现在使用groupByKey实现上面reduceByKey代码的功能。
代码3-29
得到的结果与上面的代码一致,但是数据的计算过程不一样。另一方面,当调用groupByKey时所有的键值对都在洗牌,在网络中传输了大量不必要的数据。当在一个执行器上有更多的数据在内存中进行洗牌时,Spark将内存数据溢出到磁盘中。但是,一次只会将一个键数据刷新到磁盘上,因此如果单个键的值超过内存容量,则会发生内存不足的异常。这种情况应该避免。当Spark需要溢出到磁盘时,性能会受到严重影响。groupByKey运行示意图如图3-3所示。
图3-3 groupByKey运行示意图
可以尝试的一种优化方法是合并或组合值,因此最终只发送较少的键值对。另外,较少的键值对意味着Reduce不会有太多的工作要做,从而带来额外的性能提升。groupByKey()调用不会尝试进行合并或组合值,因此这是一项昂贵的操作。对于一个更大的数据集,洗牌数据量的差异在reduceByKey()和groupByKey()之间会变得更加夸张和不同。以下是比groupByKey更优化的方法。
· combineByKey():可用于组合元素,但返回类型与输入值类型不同。
· foldByKey():使用关联函数和中性zeroValue合并每个键的值。
3.1.2.3 连接
将键值对RDD与其他键值对RDD进行连接,将数据连接在一起可能是键值对中最常见的操作之一,并且有一系列选项,包括左右外连接、交叉连接和内连接。由于数据框功能的增强,这部分功能也可能通过数据框的join操作实现。
简单的join运算符是内连接,只输出两键值对RDD中共同拥有的键。当在其中一个输入RDD中具有相同键和多个值的键值对时,结果键值对RDD将具有来自两个输入键值对RDD的该键的每个可能的值对,下面的代码可以帮助理解这种操作结果。
代码3-30
有时并不需要结果键值对RDD中的键同时出现在两个输入键值对RDD中。例如,通过建议加入客户信息,如果没有任何建议,可能不想删除客户信息。leftOuterJoin(other)和rightOuterJoin(other)都通过键将两个输入键值对RDD连接在一起,其中一个RDD可能丢掉无法匹配的键,而另一个RDD保存了所有的键。
使用leftOuterJoin,结果RDD将保留所有源RDD中的每个键。在结果RDD中,与每个键关联的值是一个元组,由输入键值对源RDD的值以及来自另一输入键值对RDD的值Option组成。与join类似,每个键可以有多个条目;当这种情况发生时,得到两个值列表之间的笛卡儿乘积。rightOuterJoin与rightOuterJoin几乎相同,除了键必须存在于另一个RDD中,并且生成的元组中具有Option的为源输入键值对RDD,而不是另一个。下面使用代码3-30中的两个输入键值对departments和employees演示leftOuterJoin和rightOuterJoin的用法。
代码3-31
· Option、Some和None
强大的Scala语言可以使用Option类,定义函数返回值,其值可能为null。简单地说,如果函数成功时返回一个对象,而失败时返回null,那么可以定义函数的返回值为一个Option实例,其中Option对象是Some类的实例或None类的实例。因为Some和None都是Option的子项,所有的函数签名只是声明返回一个包含某种类型的Option(如下面显示的Int类型)。至少,这让用户知道发生了什么。以下是使用Scala Option语法的示例。
代码3-32
以下是toInt函数的工作原理:它需要一个String作为参数。如果它可以将String转换为Int,那么它将返回Some(Int);如果String不能转换为Int,则返回None。调用此函数的代码如下所示。
代码3-33
3.1.2.4 排序
对数据进行排序在很多情况下非常有用,特别是在产生后续的输出时。可以使用键值对RDD进行排序,前提是在键上定义了一个排序。一旦对数据进行了排序,对排序后的数据进行collect()或save()操作,将导致有序的数据。
sortByKey函数作用于键值对形式的RDD上,并对键进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的,具体操作如下。
sortByKey(ascending:Boolean=true,numPartitions:Int=self.partitions.length):RDD[(K,V)]
从函数的实现可以看出,它主要接受两个函数,其含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型,因为对源RDD进行排序,必须进行洗牌操作,而洗牌操作的结果RDD就是ShuffledRDD。其实,这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围键数据分到同一个分区中,然后内部用到mapPartitions对每个分区中的数据进行排序,而每个分区中数据的排序用到标准的排序机制,避免了大量数据的shuffle。下面对sortByKey的使用进行说明。
代码3-34
上面对键进行了排序,sortBy()函数中可以对排序方式进行重写,sortByKey()也有这样的功能。通常在OrderedRDDFunctions类中有一个变量ordering,它是隐式的。
代码3-35
这就是默认的排序规则,可以对它进行重写,代码如下。
代码3-36
例子中的sortIntegersByString就是修改了默认顺序的排序规则。这样,将默认顺序按照Int的大小排序改成对字符串的排序,所以12会排序在3之前。
3.1.3 动作
与转换一样,所有在基础RDD上提供的传统转换操作也可用在键值对RDD上。当然,键值对RDD可以使用一些额外的操作,首先创建一个RDD。
代码3-37
countByKey():Map[K,Long]
对每个键进行计数,只有当返回的结果Map预计很小时,才使用此方法,因为整个内容都会加载到驱动程序的内存中。要处理非常大的结果,可以考虑使用:
代码3-38
map Values将返回RDD[T,Long],而不是Map。
代码3-39
collectAsMap():Map[K,V]
此函数与collect()类似,但对关键值RDD起作用并将其转换为Scala Map,以保留其键值结构,如果键值对RDD中同一个键有多个值,则每个键中只有一个值会保留在返回的Map中。因为所有的数据都加载到驱动程序的内存中,所以只有在结果数据很小时才使用此方法。
代码3-40
lookup(key:K):Seq[V]
返回与提供键关联的所有值。如果RDD具有已知的分区程序,则只搜索该键映射到的分区即可高效地执行此操作。
代码3-41