Spark RDD及编程接口

1.Spark RDD

RDD(Resilient Distributed Datasets),弹性分布式数据集,即一个RDD代表一个被分区的只读数据集。一个RDD的生成只有两种途径,一是来自内存集合和外部存储系统,另一种是通过转换操作来自于其它RDD,比如map、filter、join等等。

RDD没必要随时被实例化,由于RDD的接口只支持粗粒度的操作(即一个操作会被应用在RDD的所有数据上),所有只要通过记录下这些作用在RDD之上的转换操作,来构建RDD的继承关系(lineage),就可以有效地进行容错处理,而不需要将实际的RDD数据进行记录拷贝。这对于RDD来说是一项非常强大的功能,也即在一个Spark程序中,我们所用到的每一个RDD,在丢失或者操作失败后都是可以重建的。

开发者还可以对RDD进行另外两个方面的控制操作:持久化和分区。开发者可以指明它们需要重用哪些RDD,然后选择一种存储策略(如in-memory storage)将它们保存起来。开发者还可以让RDD根据记录中的键值在集群的机器之间重新分区。

抽象的RDD采用如下五个接口来表示一个分区的、高效容错的而且能够持久化的分布式数据集。

RDD接口 作用
partition 分区,一个RDD会有一个或者多个分区
preferredLocations(p) 对于分区p而言,返回数据本地化计算的节点
dependencies() RDD的依赖关系
compute(p,context) 对于分区p而言,进行迭代计算
partitioner() RDD的分区函数

1.1RDD分区(partitions)

对于RDD的分区而言,用户可以自行指定多少分区,如果没有指定,将会使用默认值。可以利用RDD的成员变量partitions所返回的partition数组的大小来查询一个RDD被划分的分区数。

scala> val rdd = sc.parallelize(1 to 100, 2)//指定分区为2
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> rdd.partitions.size
res0: Int = 2

创建RDD的时候不指定分区,系统默认的数值就是这个程序所分配到的资源的CPU核的个数。

scala> val rdd = sc.parallelize(1 to 100)//不指定分区
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:27

scala> rdd.partitions.size
res2: Int = 4

1.2RDD优先位置(preferred Locations)

RDD优先位置于Spark中的调度有关,返回的是此RDD的每个partition所存储的位置,按照“移动数据不如移动计算”的理念,在Spark进行任务调度的时候,尽可能地将任务分配到数据块所存储的位置,若以从Hadoop中读取数据生成RDD为例,preferredLocations返回每一个数据块所在的机器名或者IP地址,如果每一块数据是多份存储的,那么就会返回多个机器地址。本例中以ml-100k数据为例,返回一个空的ListBuffer。

scala> val rdd = sc.textFile("/home/zhb/Desktop/work/SparkData/ml-100k")
rdd: org.apache.spark.rdd.RDD[String] = /home/zhb/Desktop/work/SparkData/ml-100k MapPartitionsRDD[4] at textFile at <console>:27

scala> val rdd = sc.textFile("/home/zhb/Desktop/work/SparkData/ml-100k/u.user")
rdd: org.apache.spark.rdd.RDD[String] = /home/zhb/Desktop/work/SparkData/ml-100k/u.user MapPartitionsRDD[6] at textFile at <console>:27

scala> val hadoopRDD = rdd.dependencies(0).rdd
hadoopRDD: org.apache.spark.rdd.RDD[_] = /home/zhb/Desktop/work/SparkData/ml-100k/u.user HadoopRDD[5] at textFile at <console>:27

scala> hadoopRDD.partitions.size
res3: Int = 2

scala> hadoopRDD.preferredLocations(hadoopRDD.partitions(0))
res4: Seq[String] = ListBuffer()

1.3RDD依赖关系(dependencies)

RDD是粗粒度的操作数据集,每个转换操作都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系,在Spark中存在两种类型的依赖,即窄依赖(Narrow Dependencies)和宽依赖(Wide Dependencies)。

窄依赖:每一个父RDD的分区最多只被一个子RDD的一个分区所使用。

宽依赖:多个子RDD的分区会依赖同一个父RDD的分区。

scala> val rdd = sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at <console>:27

scala> val mapRDD = rdd.map(x => (x,x))
mapRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[8] at map at <console>:29

scala> mapRDD.dependencies
res5: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@5b79ff65)

scala> val shuffleRDD = mapRDD.partitionBy(new org.apache.spark.HashPartitioner(3))
shuffleRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[9] at partitionBy at <console>:31

scala> shuffleRDD.dependencies
res6: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@17d42714)
scala> val rdd = sc.makeRDD(collect)

rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[1] at makeRDD at :29

1.4RDD分区计算(compute)

Spark中每个RDD的计算都是以partition(分区)为单位的,而且RDD中的compute函数都是在迭代器进行复合,不需要保存每次计算的结果。

1.5RDD分区函数(partitioner)

partitioner就是RDD分区函数,目前Spark实现了两种类型的分区函数,即HashPartitioner(哈希分区)和RangePartitioner(区域分区),且partitioner这个属性只存在(K,V)类型的RDD中,对于非(K,V)类型的partitioner的值就是None。partitioner函数既决定了RDD本身的分区数量,也可作为其父RDD Shuffle输出(MapOutput)中每个分区进行数据切割的依据。

2.创建操作

2.1集合创建操作

RDD的形成可以由内部集合类型来生成,Spark中提供了parallelize和makeRDD两类函数来实现从集合生成RDD,两个函数接口功能类似,不同的是makeRDD还提供了一个可以指定每一个分区preferredLocations参数的实现版本。

scala> val rdd = sc.makeRDD(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:27

scala> val collect = Seq((1 to 10,Seq("host1","host3")),(11 to 20,Seq("host2")))
collect: Seq[(scala.collection.immutable.Range.Inclusive, Seq[String])] = List((Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),List(host1, host3)), (Range(11, 12, 13, 14, 15, 16, 17, 18, 19, 20),List(host2)))

scala> val rdd = sc.makeRDD(collect)
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[1] at makeRDD at <console>:29

scala> rdd.preferredLocations(rdd.partitions(0))
res1: Seq[String] = List(host1, host3)

scala> rdd.preferredLocations(rdd.partitions(1))
res2: Seq[String] = List(host2)

2.2存储创建操作

Spark的整个生态系统与Hadoop是完全兼容的,对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。hadoopRDD和newhadoopRDD是最为抽象的两个函数,主要包括以下四个参数:

输入格式:指定数据输入的类型,如TextInputFormat;

键类型:指定[K,V]键值对中K的类型;

值类型:指定[K,V]键值对中V的类型;

分区值:指定由外部存储生成的RDD的partition数量的最小值,如果没有指定,系统会使用默认值defaultMinSplits;

兼容旧版本Hadoop API的创建操作,

文件路径 输入格式 键类型 值类型 分区值
textFile path TextInputFormat LongWritable Text minSplits
hadoopFile path F K V minSplits
hadoopFile path F K V DefaultMinSplits
sequenceFile path SequenceFileInputFormat K V minSplits
sequenceFile path SequenceFileInputFormat K V DefaultMinSplits
objectFile path SequenceFileInputFormat NullWritable BytesWritable mminSplits
hadoopRDD n/a inpurformatClass keyClass valueClass minSplits

兼容新版本Hadoop API的创建操作,

文件路径 输入格式 键类型 值类型 分区值
newAPIHadoopFile path F K V n/a
newAPIHadoopFile path F K V n/a
newAPIHadoopRDD path F K V n/a

3.转换操作

3.1RDD基本转换操作

map:将RDD中类型为T的元素,一对一映射为类型为U的元素。

distinct:返回RDD中所有不一样的元素。

flatMap:将RDD中的每一个元素进行一对多转换。

scala> val rdd = sc.makeRDD(1 to 5,1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:27

scala> val mapRDD = rdd.map(x => x.toFloat)
mapRDD: org.apache.spark.rdd.RDD[Float] = MapPartitionsRDD[3] at map at <console>:29

scala> mapRDD.collect()
res3: Array[Float] = Array(1.0, 2.0, 3.0, 4.0, 5.0)

scala> val flatMapRDD = rdd.flatMap(x => (1 to x))
flatMapRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at flatMap at <console>:29

scala> flatMapRDD.collect()
res4: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)

scala> val distinctRDD = flatMapRDD.distinct()
distinctRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at distinct at <console>:31

scala> distinctRDD.collect()
res6: Array[Int] = Array(4, 1, 3, 5, 2)

repartition:repartition只是coalesce接口中shuffle为true的简易实现。

coalesce:主要讨论如何设置shuffle参数,这里分三种情况(假设RDD有N个分区,需要重新划分成M个分区)

  1. 如果N < M,一般情况下,N个分区有数据分布不均的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle参数设置为true;

  2. 如果N > M且N和M差不多(比如说N是1000,M是100),那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并成M个分区,这时可以将shuffle参数设置为false(在shuffle为false的情况下,设置M > N,coalesce是不起作用的),不进行shuffle过程,父RDD和子RDD之间是窄依赖关系;

  3. 如果N > M且N和M差距悬殊(比如说N是1000,M是1),这个时候如果把shuffle参数设置为false,由于父子RDD是窄依赖,它们同处在一个Stage中,就可能会造成Spark程序运行的并行度不够,从而影响性能。比如在M为1时,由于只有1个分区,所以只会有一个任务在运行,为了使coalesce之前的操作有更好的并行度,可以将shuffle参数设置为true;

randomSplit:根据weights权重将一个RDD切分成多个RDD;

glom:将RDD中每一个分区中类型为T的元素转换成数组Array[T],这样每一个分区就只有一个数组元素;

union:将两个RDD集合中的数据进行合并,返回两个RDD的并集(包含两个RDD中相同的元素,不会去重);

intersection:返回两个RDD集合的交集,且交集中不会包含相同的元素;

subtract:如果subtract针对的是A和B两个集合,即操作是val result = A.subtract(B),那么result中将会包含A中出现且不在B中出现的元素;

intersection和subtract一般情况下都会有shuffle的过程;

mapPartitions:与map转换操作类似,只不过映射函数的输入参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器;

mapPartitionsWithIndex:与mapPartitions功能类似,只是输入参数多了一个分区的ID;

zip:将两个RDD组合成Key/Value(键/值)形式的RDD,默认两个RDD的partition数量以及元素数量都相同,否则相同系统将会抛出异常。

zipPartitinos:将多个RDD按照partition组合成为新的RDD,zipPartitinos需要相互组合的RDD具有相同的分区数,但是对于每个分区中的元素没有要求。

zipWithIndex:是将RDD中的元素和这个元素的ID组合成键/值对,需要启动一个Spark作业来计算每一个分区的开始索引号,以便能顺序索引。

zipWithUniqueId:是将RDD中的元素和一个唯一ID组合成键/值对,不需要这样一个额外的作业。

3.2键值RDD转换操作

partitionBy:与基本转换操作中的repartition功能类似,根据partitioner函数生成新的ShuffledRDD,将原RDD重新分区(其实在repartition中也是先将RDD[T]转化成RDD[K,V],这里的V是null,然后使用RDD[K,V]作为参数生成ShuffledRDD)。

mapValues:针对[K,V]中的值进行map操作。

flatMapValues:针对[K,V]中的值进行flatMap操作。

scala> val rdd = sc.parallelize(Array((1,1),(1,2),(2,1),(3,1)),1)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val partitionByRDD = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
partitionByRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:29

scala> partitionByRDD.collect()
res0: Array[(Int, Int)] = Array((2,1), (1,1), (1,2), (3,1))

scala> val mapValuesRDD = rdd.mapValues(x => x+1)
mapValuesRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at mapValues at <console>:29

scala> mapValuesRDD.collect()
res4: Array[(Int, Int)] = Array((1,2), (1,3), (2,2), (3,2))

scala> val flatMapValuesRDD = rdd.flatMapValues(x => Seq(x,"a"))
flatMapValuesRDD: org.apache.spark.rdd.RDD[(Int, Any)] = MapPartitionsRDD[3] at flatMapValues at <console>:29

scala> flatMapValuesRDD.collect()
res5: Array[(Int, Any)] = Array((1,1), (1,a), (1,2), (1,a), (2,1), (2,a), (3,1), (3,a))

combineByKey,foldByKey,reduceByKey,groupByKey:四种键值对转换操作都是针对RDD[K,V]本身,不涉及与其它RDD的组合操作,四种操作类型最终都会归结为对combineByKey的调用。combineByKey接口是将RDD[K,V]转化成返回类型RDD[K,C],这里V类型与C类型可以相同也可以不相同,combineByKey抽象接口一般需要需要传入以下5个典型参数:

createCombiner:创建组合器函数,将V类型值转换成C类型值;

mergeValue:合并值函数,将一个V类型值和一个C类型值合并成一个C类型值;

mergeCombiners:合并组合器函数,将两个C类型值合并成一个C类型值;

partitioner:指定分区函数;

mapSideCombine:布尔类型值,指定是否需要在Map端进行combine操作,类似于MapReduce中进行的combine操作;

combineByKey内部实现是通过三步来实现,1)根据是否需要在Map端进行combine操作决定是否对RDD先进行一次mapPartitions操作(利用createCombiner,mergeValue,mergeCombiners三个函数)来达到减少shuffle数据量的操作;2)根据partitioner函数对MapPartitionsRDD进行shuffle操作;3)对于shuffle的结果再进行一次combine操作;

scala> import scala.collection.mutable.HashSet
import scala.collection.mutable.HashSet

scala> val bufs = pairs.mapValues(v => HashSet(v))
bufs: org.apache.spark.rdd.RDD[(Int, scala.collection.mutable.HashSet[Int])] = MapPartitionsRDD[5] at mapValues at <console>:30

scala> import scala.collection.mutable.HashSet
import scala.collection.mutable.HashSet

scala> val pairs = sc.parallelize(Array((1,1),(1,2),(1,3),(1,1),(2,1)),2)
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:30

scala> val bufs = pairs.mapValues(v => HashSet(v))
bufs: org.apache.spark.rdd.RDD[(Int, scala.collection.mutable.HashSet[Int])] = MapPartitionsRDD[7] at mapValues at <console>:32

scala> val sums = bufs.foldByKey(new HashSet[Int])(_ ++= _)
sums: org.apache.spark.rdd.RDD[(Int, scala.collection.mutable.HashSet[Int])] = ShuffledRDD[8] at foldByKey at <console>:34

scala> sums.collect()
res7: Array[(Int, scala.collection.mutable.HashSet[Int])] = Array((2,Set(1)), (1,Set(1, 2, 3)))

scala> val reduceByKeyRDD = pairs.reduceByKey(_+_)
reduceByKeyRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[9] at reduceByKey at <console>:32

scala> reduceByKeyRDD.collect()
res8: Array[(Int, Int)] = Array((2,1), (1,7))

scala> val groupByKeyRDD = pairs.groupByKey()
groupByKeyRDD: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[10] at groupByKey at <console>:32

scala> groupByKeyRDD.collect()
res9: Array[(Int, Iterable[Int])] = Array((2,CompactBuffer(1)), (1,CompactBuffer(1, 2, 3, 1)))

join、leftOuterJoin、rightOuterJoin都是针对RDD[K,V]中K值相等的连接操作,分别对应内连接、左外连接、右外连接,最终都会调用cogroup来实现。而subtractByKey和基本转换操作subtract类似,只是针对RDD[K,V]中的K值来进行操作。

scala> val rdd1 = sc.parallelize(Array((1,1),(1,2),(2,1),(3,1)),1)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[18] at parallelize at <console>:30

scala> val rdd2 = sc.parallelize(Array((1,'x'),(2,'y'),(2,'z'),(4,'w')),1)
rdd2: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[19] at parallelize at <console>:30

scala> val cogroupRDD = rdd1.cogroup(rdd2)
cogroupRDD: org.apache.spark.rdd.RDD[(Int, (Iterable[Int], Iterable[Char]))] = MapPartitionsRDD[21] at cogroup at <console>:34

scala> cogroupRDD.collect()
res13: Array[(Int, (Iterable[Int], Iterable[Char]))] = Array((4,(CompactBuffer(),CompactBuffer(w))), (1,(CompactBuffer(1, 2),CompactBuffer(x))), (3,(CompactBuffer(1),CompactBuffer())), (2,(CompactBuffer(1),CompactBuffer(y, z))))

scala> val joinRDD = rdd1.join(rdd2)
joinRDD: org.apache.spark.rdd.RDD[(Int, (Int, Char))] = MapPartitionsRDD[24] at join at <console>:34

scala> joinRDD.collect()
res14: Array[(Int, (Int, Char))] = Array((1,(1,x)), (1,(2,x)), (2,(1,y)), (2,(1,z)))

scala> val leftOuterJoinRDD = rdd1.leftOuterJoin(rdd2)
leftOuterJoinRDD: org.apache.spark.rdd.RDD[(Int, (Int, Option[Char]))] = MapPartitionsRDD[27] at leftOuterJoin at <console>:34

scala> leftOuterJoinRDD.collect()
res15: Array[(Int, (Int, Option[Char]))] = Array((1,(1,Some(x))), (1,(2,Some(x))), (3,(1,None)), (2,(1,Some(y))), (2,(1,Some(z))))

scala> val rightOuterJoinRDD = rdd1.rightOuterJoin(rdd2)
rightOuterJoinRDD: org.apache.spark.rdd.RDD[(Int, (Option[Int], Char))] = MapPartitionsRDD[30] at rightOuterJoin at <console>:34

scala> rightOuterJoinRDD.collect()
res16: Array[(Int, (Option[Int], Char))] = Array((4,(None,w)), (1,(Some(1),x)), (1,(Some(2),x)), (2,(Some(1),y)), (2,(Some(1),z)))

scala> val subtractByKeyRDD = rdd1.subtractByKey(rdd2)
subtractByKeyRDD: org.apache.spark.rdd.RDD[(Int, Int)] = SubtractedRDD[31] at subtractByKey at <console>:34

scala> subtractByKeyRDD.collect()
res17: Array[(Int, Int)] = Array((3,1))

3.3RDD依赖关系

转换操作构建了RDD之间的大部分依赖关系,但是Spark内部生成的RDD对象数量一般多于用户书写的Spark应用程序包含的RDD,根本原因就是Spark的一些操作与RDD不是一一对应的。

4.控制操作(control operation)

cache、persist:在Spark中对RDD进行持久化操作是一项非常重要的功能,可以将RDD持久化在不同层次的存储介质中,以便后续的操作能够重复使用,这对iterative和interactive的应用来说会极大地提高性能。

checkpoint:将RDD持久化在HDFS中,其与persist(如果也持久化在磁盘上)的一个区别是checkpoint将会切断此RDD之前的依赖关系,而persist接口依然保留着RDD的依赖关系。

checkpoint的主要作用有如下两点:

1)如果一个Spark程序会长时间驻留运行(如Spark Streaming一般会7*24小时运行),过长的依赖将会占用很多系统资源,那么定期地将RDD进行checkpoint操作,能够有效地节省系统资源;

2)维护过长的依赖关系还会出现一个问题,如果Spark在运行过程中出现节点失败的情况,那么RDD进行容错重算的成本会非常高;

scala> val rdd = sc.makeRDD(1 to 4,1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:27

scala> val flatMapRDD = rdd.flatMap(x => Seq(x,x))
flatMapRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at flatMap at <console>:29

scala> sc.setCheckpointDir("/home/zhb/")

scala> flatMapRDD.checkpoint()

scala> flatMapRDD.dependencies.head.rdd
res2: org.apache.spark.rdd.RDD[_] = ParallelCollectionRDD[0] at makeRDD at <console>:27

scala> flatMapRDD.collect()
res3: Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4)

scala> flatMapRDD.dependencies.head.rdd
res4: org.apache.spark.rdd.RDD[_] = ReliableCheckpointRDD[2] at collect at <console>:32

5.行动操作(action operation)

行动操作是和转换操作相对应的一种对RDD的操作类型,在Spark的程序中,每调用一次行动操作,都会触发一次Spark的调度并返回相应的结果,行动操作可以分为两类,

行动操作将标量或者集合返回给Spark的客户端程序,比如返回RDD中数据集的数量或者是返回RDD中的一部分符合条件的数据;

行动操作将RDD直接保存到外部文件系统或者数据库中,比如将RDD保存到HDFS文件系统中;

5.1集合标量行动操作

first:返回RDD中的第一个元素;

count:返回RDD中元素的个数;

reduce:对RDD中的元素进行二元计算,返回计算结果;

scala> val rdd = sc.makeRDD(1 to 10,1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:27

scala> rdd.first
res5: Int = 1

scala> rdd.count
res6: Long = 10

scala> rdd.reduce(_ + _)
res7: Int = 55

scala> rdd.reduce(_ * _)
res8: Int = 3628800

scala> rdd.reduce(_ - _)
res9: Int = -53

collect/toArray:以集合形式返回RDD的元素;

take:将RDD作为集合,返回集合中[0,num-1]下标的元素;

top:按照默认的或者是指定的排序规则,返回前num个元素;

takeOrdered:以与top相反的排序规则,返回前num个元素;

scala> val nums = Array.range(1,10)
nums: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> val rdd = sc.makeRDD(scala.util.Random.shuffle(nums),2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at <console>:29

scala> rdd.collect()
res10: Array[Int] = Array(7, 6, 5, 1, 8, 9, 3, 4, 2)

scala> rdd.toArray()
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
res11: Array[Int] = Array(7, 6, 5, 1, 8, 9, 3, 4, 2)

scala> rdd.take(3)
res12: Array[Int] = Array(7, 6, 5)

scala> rdd.top(3)
res13: Array[Int] = Array(9, 8, 7)

scala> rdd.takeOrdered(3)
res14: Array[Int] = Array(1, 2, 3)

scala> implicit val ord = implicitly[Ordering[Int]].reverse
ord: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@133550ed

scala> rdd.take(3)
res15: Array[Int] = Array(7, 6, 5)

scala> rdd.top(3)
res16: Array[Int] = Array(1, 2, 3)

scala> rdd.takeOrdered(3)
res17: Array[Int] = Array(9, 8, 7)

aggregate:主要提供两个函数,一个是seqOp,其将RDD中的每一个分区的数据聚合成类型为U的值;另一个函数combOp,将各个分区聚合起来的值合并在一起得到最终类型为U的返回值。

scala> import scala.collection.mutable.HashMap
import scala.collection.mutable.HashMap

scala> val pairs = sc.makeRDD(Array(("a",1),("b",2),("a",4),("c",5),("a",3)))
pairs: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[10] at makeRDD at <console>:32

scala> type StringMap = HashMap[String,Int]
defined type alias StringMap

scala> val emptyMap = new StringMap{
     |     override def default(key:String):Int = 0
     | }
emptyMap: StringMap = Map()

scala> val mergeElement:(StringMap,(String,Int)) => StringMap = (map,pair) =>{
     |     map(pair._1) += pair._2
     |     map
     | }
mergeElement: (StringMap, (String, Int)) => StringMap = <function2>

scala> val mergeMaps:(StringMap,StringMap) => StringMap = (map1,map2) => {
     |     for((key,value) <- map2){
     |         map1(key) += value
     |     }
     |     map1
     | }
mergeMaps: (StringMap, StringMap) => StringMap = <function2>

scala> val aggregateResult = pairs.aggregate(emptyMap)(mergeElement,mergeMaps)
aggregateResult: StringMap = Map(b -> 2, a -> 8, c -> 5)

fold:是aggregate的便利接口,其中op操作既是seqOp操作,也是combOp操作,最终的返回类型也是T,即与RDD中的每一个元素的类型是一样的;

scala> val pairs = sc.makeRDD(Array(("a",1),("b",2),("a",4),("c",5),("a",3)))
pairs: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[12] at makeRDD at <console>:32

scala> val compareElement:((String,Int),(String,Int)) => (String,Int) = (val1,val2) => {
     |     if (val1._2 >= val2._2){
     |         val1
     |     }else{
     |         val2
     |     }
     | }
compareElement: ((String, Int), (String, Int)) => (String, Int) = <function2>

scala> val foldResult = pairs.fold(("0",0))(compareElement)
foldResult: (String, Int) = (c,5)

lookup:是针对(K,V)类型RDD的行动操作,对于给定的键值,返回与此键值相对应的所有值。

scala> val rdd = sc.makeRDD(Array(("a",1),("b",2),("a",4),("c",5),("a",3)),1)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at makeRDD at <console>:32

scala> rdd.lookup("a")
res19: Seq[Int] = WrappedArray(1, 4, 3)

5.2存储行动操作

对RDD最后的归宿处了返回为集合和标量,也可以将RDD存储到外部文件系统或者数据库中,Spark系统与Hadoop是完全兼容的,所以对于MapReduce所支持的读写文件或者数据库类型,Spark是完全兼容的。

saveAsTextFile、saveAsObjectFile、saveAsHadoopFile、outputFormatClass、saveAsHadoopDataset,前面4个API是saveAsHadoopDataset的简易实现版本,仅仅支持将RDD存储到HDFS中,而saveAsHadoopDataset的参数类型是JobConf,所以其不仅能将RDD存储到HDFS中,也可以将RDD存储到其它数据库中,如Hbase、MongoDB、Cassandra等。将RDD保存到HDFS中,通常需要关注或者设置五个参数,即文件保存的路径、key值的class类型、Value值的class类型、RDD输出格式,以及最后一个相关的参数codec。