spark(三):blockManager、broadcast、cache、checkpoint
发表于:2024-11-23 作者:热门IT资讯网编辑
编辑最后更新 2024年11月23日,blockManagerDriver和executor上分别都会启动blockManager,其中driver上拥有所有executor上的blockManager的引用;所有executor上的bl
blockManager
- Driver和executor上分别都会启动blockManager,其中driver上拥有所有executor上的blockManager的引用;所有executor上的blockManager都持有driver上的blockManager的引用;
- blockManagerSlave会不断向blockManagerMaster发送心跳,更新block信息等;
- BlockManager对象被创建的时候会创建出MemoryStore和DiskStore对象用以存取block,如果内存中拥有足够的内存, 就 使用 MemoryStore存储, 如果 不够, 就 spill 到 磁盘中, 通过 DiskStore进行存储。
- DiskStore 有一个DiskBlockManager,DiskBlockManager 主要用来创建并持有逻辑 blocks 与磁盘上的 blocks之间的映射,一个逻辑 block 通过 BlockId 映射到一个磁盘上的文件。 在 DiskStore 中会调用 diskManager.getFile 方法, 如果子文件夹不存在,会进行创建, 文件夹的命名方式为(spark-local-yyyyMMddHHmmss-xxxx, xxxx是一个随机数), 所有的block都会存储在所创建的folder里面。
- MemoryStore 相对于DiskStore需要根据block id hash计算出文件路径并将block存放到对应的文件里面,MemoryStore管理block就显得非常简单:MemoryStore内部维护了一个hash map来管理所有的block,以block id为key将block存放到hash map中。而从MemoryStore中取得block则非常简单,只需从hash map中取出block id对应的value即可。
- GET操作 如果 local 中存在就直接返回, 从本地获取一个Block, 会先判断如果是 useMemory, 直接从内存中取出, 如果是 useDisk, 会从磁盘中取出返回, 然后根据useMemory判断是否在内存中缓存一下,方便下次获取, 如果local 不存在, 从其他节点上获取, 当然元信息是存在 drive上的,要根据我们上文中提到的 GETlocation 协议获取 Block 所在节点位置, 然后到其他节点上获取。
- PUT操作 操作之前会加锁来避免多线程的问题, 存储的时候会根据 存储级别, 调用对应的是 memoryStore 还是 diskStore, 然后在具体存储器上面调用 存储接口。 如果有 replication 需求, 会把数据备份到其他的机器上面。
cache、persist、checkpoint
- 如果要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()即可。
- cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。
- persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。默认缓存级别是StorageLevel.MEMORY_ONLY,也就是cache就是这个默认级别的。
- checkpoint是将数据持久化到HDFS或者硬盘。
- rdd.persist(StorageLevel.DISK_ONLY) 与 checkpoint 也有区别。前者虽然可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理。一旦 driver program 执行结束,也就是 executor 所在进程 CoarseGrainedExecutorBackend stop,blockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉( 话说怎么 remove checkpoint 过的 RDD? ),是一直存在的,也就是说可以被下一个 driver program 使用,而 cached RDD 不能被其他 dirver program 使用。
broadcast、accumulator
- 广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。(注意是一个较大的只读变量,不能修改)
- Accumulator是spark提供的累加器,顾名思义,该变量只能够增加。
- 只有driver能获取到Accumulator的值(使用value方法),Task只能对其做增加操作(使用 +=)
- 使用累加器的过程中只能使用一次action的操作才能保证结果的准确性。如果需要使用多次则使用cache或persist操作切断依赖。