Spark大数据分析实战
上QQ阅读APP看书,第一时间看更新

1.4.2 RDD算子分类

本节将主要介绍Spark算子的作用,以及算子的分类。

Spark算子大致可以分为以下两类。

1)Transformation变换算子:这种变换并不触发提交作业,完成作业中间过程处理。

2)Action行动算子:这类算子会触发SparkContext提交Job作业。

下面分别对两类算子进行详细介绍。

1.Transformations算子

下文将介绍常用和较为重要的Transformation算子。

(1)map

将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。源码中map算子相当于初始化一个RDD,新RDD叫做MappedRDD(this,sc.clean(f))。

图1-7中每个方框表示一个RDD分区,左侧的分区经过用户自定义函数f:T->U映射为右侧的新RDD分区。但是,实际只有等到Action算子触发后这个f函数才会和其他函数在一个stage中对数据进行运算。在图1-6中的第一个分区,数据记录V1输入f,通过f转换输出为转换后的分区中的数据记录V'1。

(2)flatMap

将原来RDD中的每个元素通过函数f转换为新的元素,并将生成的RDD的每个集合中的元素合并为一个集合,内部创建FlatMappedRDD(this,sc.clean(f))。

图1-6 map算子对RDD转换

图1-7表示RDD的一个分区进行flatMap函数操作,flatMap中传入的函数为f:T->U,T和U可以是任意的数据类型。将分区中的数据通过用户自定义函数f转换为新的数据。外部大方框可以认为是一个RDD分区,小方框代表一个集合。V1、V2、V3在一个集合作为RDD的一个数据项,可能存储为数组或其他容器,转换为V'1、V'2、V'3后,将原来的数组或容器结合拆散,拆散的数据形成为RDD中的数据项。

图1-7 flapMap算子对RDD转换

(3)mapPartitions

mapPartitions函数获取到每个分区的迭代器,在函数中通过这个分区整体的迭代器对整个分区的元素进行操作。内部实现是生成MapPartitionsRDD。图1-8中的方框代表一个RDD分区。

图1-8中,用户通过函数f(iter)=>iter.filter(_>=3)对分区中所有数据进行过滤,大于和等于3的数据保留。一个方块代表一个RDD分区,含有1、2、3的分区过滤只剩下元素3。

图1-8 mapPartitions算子对RDD转换

(4)union

使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同。并不进行去重操作,保存所有元素,如果想去重可以使用distinct()。同时Spark还提供更为简洁的使用union的API,通过++符号相当于union函数操作。

图1-9中左侧大方框代表两个RDD,大方框内的小方框代表RDD的分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。合并后,V1、V2、V3……V8形成一个分区,其他元素同理进行合并。

(5)cartesian

对两个RDD内的所有元素进行笛卡尔积操作。操作后,内部实现返回CartesianRDD。图1-10中左侧大方框代表两个RDD,大方框内的小方框代表RDD的分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。

例如:V1和另一个RDD中的W1、W2、Q5进行笛卡尔积运算形成(V1,W1)、(V1,W2)、(V1,Q5)。

图1-9 union算子对RDD转换

图1-10 cartesian算子对RDD转换

(6)groupBy

groupBy:将元素通过函数生成相应的Key,数据就转化为Key-Value格式,之后将Key相同的元素分为一组。

函数实现如下:

1)将用户函数预处理:


val cleanF = sc.clean(f)

2)对数据map进行函数操作,最后再进行groupByKey分组操作。


this.map(t =>(cleanF(t), t)).groupByKey(p)

其中,p确定了分区个数和分区函数,也就决定了并行化的程度。

图1-11中方框代表一个RDD分区,相同key的元素合并到一个组。例如V1和V2合并为V,Value为V1,V2。形成V,Seq(V1,V2)。

图1-11 groupBy算子对RDD转换

(7)filter

filter函数功能是对元素进行过滤,对每个元素应用f函数,返回值为true的元素在RDD中保留,返回值为false的元素将被过滤掉。内部实现相当于生成FilteredRDD(this,sc.clean(f))。

下面代码为函数的本质实现:


deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))

图1-12中每个方框代表一个RDD分区,T可以是任意的类型。通过用户自定义的过滤函数f,对每个数据项操作,将满足条件、返回结果为true的数据项保留。例如,过滤掉V2和V3保留了V1,为区分命名为V'1。

(8)sample

sample将RDD这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。

内部实现是生成SampledRDD(withReplacement,fraction,seed)。

函数参数设置:

·withReplacement=true,表示有放回的抽样。

·withReplacement=false,表示无放回的抽样。

图1-13中的每个方框是一个RDD分区。通过sample函数,采样50%的数据。V1、V2、U1、U2……U4采样出数据V1和U1、U2形成新的RDD。

图1-12 filter算子对RDD转换

图1-13 sample算子对RDD转换

(9)cache

cache将RDD元素从磁盘缓存到内存。相当于persist(MEMORY_ONLY)函数的功能。

图1-14 Cache算子对RDD转换

图1-14中每个方框代表一个RDD分区,左侧相当于数据分区都存储在磁盘,通过cache算子将数据缓存在内存。

(10)persist

persist函数对RDD进行缓存操作。数据缓存在哪里依据StorageLevel这个枚举类型进行确定。有以下几种类型的组合(见图1-14),DISK代表磁盘,MEMORY代表内存,SER代表数据是否进行序列化存储。

下面为函数定义,StorageLevel是枚举类型,代表存储模式,用户可以通过图1-14按需进行选择。


persist(newLevel:StorageLevel)

图1-15中列出persist函数可以进行缓存的模式。例如,MEMORY_AND_DISK_SER代表数据可以存储在内存和磁盘,并且以序列化的方式存储,其他同理。

图1-15 persist算子对RDD转换

图1-16中方框代表RDD分区。disk代表存储在磁盘,mem代表存储在内存。数据最初全部存储在磁盘,通过persist(MEMORY_AND_DISK)将数据缓存到内存,但是有的分区无法容纳在内存,将含有V1、V2、V3的分区存储到磁盘。

(11)mapValues

mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理。

图1-17中的方框代表RDD分区。a=>a+2代表对(V1,1)这样的Key Value数据对,数据只对Value中的1进行加2操作,返回结果为3。

图1-16 Persist算子对RDD转换

图1-17 mapValues算子RDD对转换

(12)combineByKey

下面代码为combineByKey函数的定义:


combineByKey[C](createCombiner:(V)C,
mergeValue:(C, V)C,
mergeCombiners:(C, C)C,
partitioner:Partitioner,
mapSideCombine:Boolean=true,
serializer:Serializer=null):RDD[(K,C)]

说明:

·createCombiner:V=>C,C不存在的情况下,比如通过V创建seq C。

·mergeValue:(C,V)=>C,当C已经存在的情况下,需要merge,比如把item V加到seq C中,或者叠加。

·mergeCombiners:(C,C)=>C,合并两个C。

·partitioner:Partitioner,Shuffle时需要的Partitioner。

·mapSideCombine:Boolean=true,为了减小传输量,很多combine可以在map端先做,比如叠加,可以先在一个partition中把所有相同的key的value叠加,再shuffle。

·serializerClass:String=null,传输需要序列化,用户可以自定义序列化类:

例如,相当于将元素为(Int,Int)的RDD转变为了(Int,Seq[Int])类型元素的RDD。

图1-18中的方框代表RDD分区。如图,通过combineByKey,将(V1,2),(V1,1)数据合并为(V1,Seq(2,1))。

(13)reduceByKey

reduceByKey是比combineByKey更简单的一种情况,只是两个值合并成一个值,(Int,Int V)to(Int,Int C),比如叠加。所以createCombiner reduceBykey很简单,就是直接返回v,而mergeValue和mergeCombiners逻辑是相同的,没有区别。

图1-18 comBineByKey算子对RDD转换

函数实现:


def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
  combineByKey[V]((v: V) => v, func, func, partitioner)
}

图1-19中的方框代表RDD分区。通过用户自定义函数(A,B)=>(A+B)函数,将相同key的数据(V1,2)和(V1,1)的value相加运算,结果为(V1,3)。

图1-19 reduceByKey算子对RDD转换

(14)join

join对两个需要连接的RDD进行cogroup函数操作,将相同key的数据能够放到一个分区,在cogroup操作之后形成的新RDD对每个key下的元素进行笛卡尔积的操作,返回的结果再展平,对应key下的所有元组形成一个集合。最后返回RDD[(K,(V,W))]。

下面代码为join的函数实现,本质是通过cogroup算子先进行协同划分,再通过flatMapValues将合并的数据打散。


this.cogroup(other,partitioner).f?latMapValues{case(vs,ws)=>
  for(v<-vs;w>-ws)yield(v,w) }

图1-20是对两个RDD的join操作示意图。大方框代表RDD,小方框代表RDD中的分区。函数对相同key的元素,如V1为key做连接后结果为(V1,(1,1))和(V1,(1,2))。

2.Actions算子

本质上在Action算子中通过SparkContext进行了提交作业的runJob操作,触发了RDD DAG的执行。

图1-20 join算子对RDD转换

例如,Action算子collect函数的代码如下,感兴趣的读者可以顺着这个入口进行源码剖析:


  /**
   * Return an array that contains all of the elements in this RDD.
   */
  def collect(): Array[T] = {
/*提交Job*/
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

下面将介绍常用和较为重要的Action算子。

(1)foreach

foreach对RDD中的每个元素都应用f函数操作,不返回RDD和Array,而是返回Uint。

图1-21表示foreach算子通过用户自定义函数对每个数据项进行操作。本例中自定义函数为println(),控制台打印所有数据项。

图1-21 foreach算子对RDD转换

(2)saveAsTextFile

函数将数据输出,存储到HDFS的指定目录。

下面为saveAsTextFile函数的内部实现,其内部通过调用saveAsHadoopFile进行实现:


this.map(x => (NullWritable.get(), new Text(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) 

将RDD中的每个元素映射转变为(null,x.toString),然后再将其写入HDFS。

图1-22中左侧方框代表RDD分区,右侧方框代表HDFS的Block。通过函数将RDD的每个分区存储为HDFS中的一个Block。

(3)collect

collect相当于toArray,toArray已经过时不推荐使用,collect将分布式的RDD返回为一个单机的scala Array数组。在这个数组上运用scala的函数式操作。

图1-23中左侧方框代表RDD分区,右侧方框代表单机内存中的数组。通过函数操作,将结果返回到Driver程序所在的节点,以数组形式存储。

图1-22 saveAsHadoopFile算子对RDD转换

图1-23 Collect算子对RDD转换

(4)count

count返回整个RDD的元素个数。

内部函数实现为:


defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum

图1-24中,返回数据的个数为5。一个方块代表一个RDD分区。

图1-24 count对RDD算子转换