Spark存储管理

1.存储管理模块整体架构

从架构上看存储管理模块主要分为以下两层,

通信层:存储管理模块采用的是主从结构来实现通信层,主节点和从节点之间传输控制信息、状态信息;

存储层:存储管理模块需要把数据存储到硬盘或者内存中,必要时还需要复制到远端,这些操作由存储层来实现和提供相应接口;

从功能上看存储管理模块可以分为如下两个主要部分,

RDD缓存:整个存储管理模块主要的工作是作为RDD的缓存,包括基于内存和磁盘的缓存;

Shuffle数据的持久化:Shuffle中间结果的数据也是交由存储管理模块进行管理的。Shuffle性能的好坏直接影响了Spark应用程序整体的性能,因此存储管理模块中对于Shuffle数据的处理有别于传统的RDD缓存;

在存储管理模块中管理着各种不同的数据块,主要有以下几种,

  1. RDD数据块:用来标识所缓存的RDD数据;
  2. Shuffle数据块:用来标识所持久化的Shuffle数据;
  3. 广播变量数据块:用来标识所存储的广播变量数据;
  4. 任务返回结果数据块:用来标识存储在存储管理模块内部的任务返回结果。通常情况下,任务返回结果随任务一起通过Akka返回到Driver端。但是当任务返回结果很大时,会引起Akka帧溢出,这时的另一种方案是将返回结果以块的形式放入存储管理模块,然后在Driver端获取该数据即可,因为存储管理模块内部数据的传输是通过Socket连接的,因此就不会出现Akka帧溢出了;
  5. 流式数据块:只用在Spark Streaming中,用来标识所接收到的流式数据块;

2.RDD持久化

2.1RDD分区和数据块的关系

对于RDD的各种操作,如转换操作、执行操作,我们将操作函数施行于RDD之上,而最终这些操作都将施行于每一个分区之上,也就是在RDD上的所有运算都是基于分区的。而在存储管理模块内,我们所接触到的往往是数据块这个概念,在存储管理模块中对于数据的存取都是以数据块为单位进行的。分区是一个逻辑上的概念,而数据块是物理上数据实体。

1.在Spark中,分区和数据块是一一对应的,一个RDD中的一个分区对应着存储管理模块中的一个数据块,存储管理模块接触不到也不关心RDD,它只关心数据块,对于数据块和分区之间的映射则是通过名称上的约定进行的;

2.这种名称上的约定是按照如下方式建立的:Spark为每一个RDD在其内部维护了独立的ID号,同时,对于RDD的每一个分区也有一个独立的索引号,因此只要指导ID号和索引号,我们就能找到RDD中的相应分区,也就是说“ID号 + 索引号”就能全局唯一地确定这个分区,这样以“ID号 + 索引号”作为块的名称就自然建立起了分区和块的映射;

在显式调用函数缓存我们所需的RDD时,Spark在其内部建立了RDD分区和数据块之间的映射,而当我们需要读取缓存的RDD时,根据上面的映射关系,就能从存储管理模块中取得分区对应的数据块。

2.2内存缓存

当我们以默认或者是基于内存的持久化方式缓存RDD时,RDD中的每一个分区所对应的数据块会是被存储管理模块中的内存缓存(Memory Store)所管理的。内存缓存在其内部维护了一个以数据块名称为键,块内容为值的哈希表。

其中数据块的内容又被包装成为了结构体Entry,由于所有持久化在内存缓中的数据块是由背后的Java虚拟机进行管理的,因此内存缓存只需维护一个存储其内部引用的简单的哈希表即可。

当内存已经达到所设置的阈值时应如何处理?在Spark中,对于内存缓存可使用的内存阈值有这样一个配置,spark.storage.memoryFraction,默认情况下是0.6,也就是JVM内存的60%可被内存缓存用来存储块内容。当我们存储的数据块所占用的内存大于60%时,Spark会采取一些策略释放内存缓存空间:丢弃一些数据块,或是将一些数据块存储到磁盘上以释放内存缓存空间。是丢弃还是存储到磁盘上,依赖于进行操作的这些数据块的持久化选项,若持久化选项中包含了磁盘缓存,则会将这些块移入磁盘进行缓存,反之则直接删除。

直接删除是否会影响Spark程序的错误恢复机制么?这取决于依赖关系的可回溯性,若该RDD所依赖的祖先RDD是可被回溯并可用的,那么该RDD所对应的块被删除是不会影响错误恢复的;反之,若该RDD已经是祖先RDD,且数据无法被回溯到,那么程序就会出错。

2.3硬盘缓存

首先,这些数据块会被放到磁盘中的特定目录下。当我们配置spark.local.dir时,我们就配置了存储管理模块磁盘缓存存放数据的目录。磁盘缓存初始化时会在这些目录下创建Spark磁盘缓存文件夹,文件夹的命名方式是:spark-local-yyyyMMddHHmmss-xxxx,其中xxxx是随机数,此后所有的块内容都将存储到这些创建的目录中。

其次,在磁盘缓存中,一个数据块对应着文件系统的一个文件,文件名和块名称的映射关系是通过哈希算法计算所得的。首先对数据块名称计算哈希值,并将哈希值取模获得dirId和subDirId,这就获得了该块所需存储的路径。其次,在该路径上若还没有建立相应的文件夹,就会创建文件夹。最后在上述获得的路径以块名称作为文件名,就建立了数据块和相应文件路径及文件名的映射了,数据块对应的文件路径为:dirId/subDirId/block_id,这样就建立了块和文件之间的对应关系,而存取块内容就变成了写入和读取相应的文件了。

2.4持久化选项

需要对某一个RDD持久化时,可以调用RDD所提供的persist()和cache()函数进行操作。当我们提交作业进行首次计算时,会把该RDD的所有数据实体化并存储到内存和磁盘中,再次使用该RDD的时候,数据就可以直接从之前的缓存区中获得而无须再次进行计算。同时,被缓存的数据块是可容错恢复的,若RDD的某一个分区丢失,它会通过继承关系自动重算获得。

对于RDD的持久化,Spark提供了不同的选项,使我们能将RDD持久化到内存、磁盘,或是以序列化的方式持久化到内存中,甚至可以在集群的不同节点之间存储多份拷贝。所有这些不同的存储策略都是通过不同的持久化选项来决定的。当用户使用persist()函数进行持久化时,需要选择一种持久化方式,这种方式决定了RDD最后的持久化策略。如果用户调用cache()函数进行持久化,则以默认MEMORY_ONLY(内存)的方式持久化。所有这些持久化策略都是由类org.apache.spark.storage.StroageLevel决定的,用户只需选择所需的策略而无须涉及背后的与存储管理模块的交互。

Spark内部支持的持久化选项。 |持久化选项|说明| |--|--| |MEMORY_ONLY|以非序列化的方式将RDD以Java对象的方式存储到JVM内存堆中。如果内存缓存无法容纳该RDD,则一些分区将不会被缓存到内存中,而会在每次需要时重新计算。需要注意的是,这是默认的持久化选项| |MEMORY_AND_DISK|以非序列化的方式将RDD以Java对象的方式存储到JVM中。如果内存缓存无法容纳该RDD,则将无法容纳下的分区存储到磁盘上,当再次需要使用时,从其所在的缓存中读取| |MEMORY_ONLY_SER|以序列化的方式将序列化后的RDD数据存储到JVM中。这种持久化方式比非序列化的方式所占用的空间更小,但是读取的时候需要耗费更多的CPU资源进行反序列化| |MEMORY_AND_DISK_SER|与MEMORY_AND_DISK非常相似,不同之处在于会把内存无法容下的分区写入磁盘进行缓存,而无须每次需要时重新计算| |DISK_ONLY|将RDD的分区只缓存在磁盘中| |MEMORY_ONLY_2,MEMORY_AND_DISK_2等|与上述的持久化方式相同,不同的是这些持久化方式会将分区复制到另一个不同的节点上以保持有两份拷贝|

2.5如何选择不同的持久化选项

Spark提供了不同的持久化选项,意味着在CPU利用率和内存的使用量上提供了不同的选择方法,用户可以根据程序的情况下和机器的配置选择合理的持久化方式,使得在增加CPU利用率的同时兼顾内存的使用量,以下是对持久化方式选择的几条建议: 1. 如果使用默认的持久化方式(MEMORY_ONLY)完全能够缓存RDD,那么无需选择其它的持久化方式,因为这是CPU利用率最高的一种方式,能够确保程序跑得尽可能快; 2. 如果默认的持久化方式无法完全缓存所需的RDD,可以尝试使用MEMORY_ONLY_SER这种持久化方式,并选用更快速度的序列化方式,这能够更高效地使用内存,同时确保程序跑得相对较快。 3. 除非对于RDD的重算所带来的开销比缓存到磁盘来得更大,一般情况下 不要选择将RDD缓存到磁盘上。通常情况下,对于某一分区的重算会比从磁盘中读取要快。 4. 如果想要确保快速的错误恢复机制,应尽可能地选用带有备份的持久化方式。虽然上面也说道所有的持久化方式都能通过重算丢失数据从错误中恢复,但是备份可以使得任务继续进行而无须等待丢失分区的重新计算。

同时,Spark的持久化选项也提供了一些接口供用户扩展,用户可以通过StorageLevel的apply函数来实现。

最适合用户的持久化方式需要用户结合自身应用的特点以及机器的硬件性能做出权衡,用户可以通过实验和测试选择自身所需的持久化方式。

3.Shuffle数据持久化

与RDD持久化的不同之处在于:首先Shuffle数据块必须是在磁盘上进行缓存的,而不能选择在内存中缓存;其次,在RDD基于磁盘的持久化中,每一个数据块对应着一个文件,而在Shuffle数据块持久化中,Shuffle数据块表示的只是逻辑上的概念,不同的实现方式可以决定Shuffle数据块的不同存储方式。

在Spark存储管理模块中,Shuffle数据块的存储有两种方式:一种是将Shuffle数据块映射成文件,这是默认的方式;另一种是将Shuffle数据块映射成文件中的一段,这种方式需要将spark.shuffle.consolidateFiles设置为True。默认的方式会产生大量的文件,这会对磁盘和文件系统的性能造成极大的影响。将分时运行的Map任务所产生的Shuffle数据块合并到同一个文件中,以减少Shuffle文件的总数。

Shuffle数据块的读取和传输,Shuffle是将一组任务的输出结果重新组合作为下一组任务的输入这样的一个过程,由于任务分布在不同的节点上,因此为了将重组结果作为输入,必然涉及Shuffle数据的读取和传输。在Spark存储管理模块中,Shuffle数据的读取和传输有两种方式,一种是基于NIO以socket连接去获取数据,另一种是基于OIO通过Netty服务端获取数据。前者是默认的获取方式,通过配置spark.shuffle.use.netty为ture,可以启用第二种获取方式。之所以有两种Shuffle数据的获取方式,是由于默认的的方式在一些情况下无法充分利用网络带宽,用户可以通过比较两种方式在性能上的差异来自行决定选用哪种Shuffle数据获取方式。

总的来说,Spark存储管理模块为Shuffle数据的持久化做了许多有别于RDD持久化的工作,包括存取Shuffle数据块的方式,以及读取和传输Shuffle数据块的方式,所有这些实现都是为了使Shuffle获得更好的性能和容错。

4.广播变量持久化

在编写Spark应用程序时,为了加速对一些小块数据的读取,我们往往希望这些数据在所有节点上都有一份拷贝,每个任务都能从本节点的拷贝中读取数据而无须通过远程传输获取数据。Spark为我们提供了广播变量这样一个接口,使我们能把本地的数据广播到所有的执行节点上,从而加速程序的运行。在Spark中,广播变量也是由存储管理模块进行管理的。

首先,在存储管理模块中,对于广播变量数据块的描述,由专门的块进行;

其次,广播变量数据块是以MEMORY_AND_DISK的持久化方式存入本节点的存储管理模块中的,当广播变量在远端被反序列化创建时,该变量会将自身与数据块关联起来,当用户的应用程序通过接口value()获取数据时,该变量对应的数据块就会返回给用户进行处理;

最后,通过设置过期清理机制,Spark内部会清理过期的广播变量,包括本地持久化的数据和存储在远程存储管理模块中的数据;