-
[Spark] MapOutputTracker 알아보기Spark 2024. 7. 29. 05:37반응형
- 목차
MapOutputTracker 란 ?
스파크는 하둡의 MapReduce 처럼 Map 과 Reduce 라는 표현을 사용합니다.
셔플링이 발생하는 Wide Transformation 을 경계로 이전 Stage 를 Map Task, 후속 Stage 에 Reduce Task 를 배치합니다.
그리고 아래의 이미지처럼 Map Task 는 Reduce Task 에게 Shuffle Block 이라는 데이터들을 전달하게 됩니다.
Shuffle Block 은 Map Task 가 처리한 데이터의 결과물이며, BlockManager 에 의해서 관리됩니다.
MapOutputTracker 는 셔플링 과정에서 Map Task 가 생성한 결과물을 추적/관리하는 스파크의 구성요소이며,
이는 Driver 와 Executor 에 각각 존재합니다.
https://westlife0615.tistory.com/996
[Spark] Block 알아보기
- 목차 RDD Cache Block.먼저 알아볼 Block 의 종류는 RDD Cache Block 입니다.RDD 의 cache 나 persist 함수를 통해서 RDD 를 캐싱할 수 있게 됩니다. import org.apache.spark.storage.StorageLevelval baseRdd = sc.parallelize(1 to
westlife0615.tistory.com
MapOutputTrackerMaster 와 MapOutputTrackerWorker.
Map Task 의 데이터 처리 결과물을 관리하는 MapOutputTracker 는 Driver 와 Executor 에 각각 존재합니다.
Driver 에 존재하는 MapOutputTracker 는 MapOutputTrackerMaster 라고 불리고,
Executor 에 존재하는 MapOutputTracker 는 MapOutputTrackerWorker 라고 불립니다.
스파크는 MapOutput 을 관리하는 방식을 Driver 를 통한 중앙 관리 방식을 채택하였습니다.
그리하여 Map Task 가 생성한 데이터 처리 결과를 Driver 에게 보고하게 되고, 이를 Driver 는 관리합니다.
BlockManagerMaster 와 BlockManager.
BlockManager 또한 MapOutputTracker 와 같이 Driver - Executor 간의 서버/클라이언트 구조를 취합니다.
Map Task 는 Reduce Task 가 처리해야할 Shuffle Block 을 생성하게 됩니다.
그리고 이렇게 생성된 Shuffle Block 의 메타데이터를 Driver 에게 전달합니다.
Executor 의 BlockManager 는 UpdateBlockInfo RPC 를 Driver 의 BlockManagerMaster 에게 전달합니다.
Driver 의 BlockManagerMaster 는 전달받은 UpdateBlockInfo RPC 의 정보를 처리하여 내부 저장소에 저장하게 됩니다.
아래의 case class 는 UpdateBlockInfo 의 대략적인 정보를 설명합니다.
이를 간단히 요약하면
- BlockManagerId : Executor 의 IP 와 Port
- BlockId : rdd_1_0, shuffle_1_0_0 과 같은 블록의 고유한 식별값
- 그 외 블록의 사이즈
case class UpdateBlockInfo( var blockManagerId: BlockManagerId, var blockId: BlockId, var storageLevel: StorageLevel, var memSize: Long, var diskSize: Long)
이렇게 Executor 의 BlockManager 는 Driver 의 BlockManagerMasterEndpoint 에게 생성된 Block 의 정보를 제공합니다.
그리고 아래와 같이 Map 자료구조에 Block Info 가 저장되어 In-Memory 형식으로 블록 정보가 관리됩니다.
blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo],
UpdateBlockInfo RPC .
Worker Node 에서 실행되는 Executor 는 Driver 에게 UpdateBlockInfo RPC 요청을 전송합니다.
이는 Executor 에서 Block 이 생성되면 Driver 의 BlockManagerMasterEndpoint 로 블록이 생성됨을 알리기 위함입니다.
RDD 의 persist 나 cache 함수를 통해서 RDD Cache Block 이 만들어지거나
Wide Transformation 에 의해서 Shuffle Block 이 생성되면 UpdateBlockInfo RPC 가 전송됩니다.
아래는 Exectuor 가 Driver 에게 UpdateBlockInfo RPC 를 전송하는 TCP Packet 입니다.
생성되는 Block 의 갯수만큼 Executor 는 Driver 에게 UpdateBlockInfo RPC 를 전송하게 됩니다.
00:39:51.089350 IP worker1.spark.36934 > 8b324e9bf0d7.40683: Flags [P.], seq 20078:20250, ack 51693, win 4815, options [nop,nop,TS val 847474233 ecr 2154189235], length 172 0x0000: 4500 00e0 e695 4000 4006 fb57 ac12 0002 E.....@.@..W.... 0x0010: ac12 0004 9046 9eeb 3f89 9fd1 92a6 93ca .....F..?....... 0x0020: 8018 12cf 58fd 0000 0101 080a 3283 6e39 ....X.......2.n9 0x0030: 8066 51b3 0001 000c 3862 3332 3465 3962 .fQ.....8b324e9b 0x0040: 6630 6437 0000 9eeb 0012 426c 6f63 6b4d f0d7......BlockM 0x0050: 616e 6167 6572 4d61 7374 6572 aced 0005 anagerMaster.... 0x0060: 7372 003d 6f72 672e 6170 6163 6865 2e73 sr.=org.apache.s 0x0070: 7061 726b 2e73 746f 7261 6765 2e42 6c6f park.storage.Blo 0x0080: 636b 4d61 6e61 6765 724d 6573 7361 6765 ckManagerMessage 0x0090: 7324 5570 6461 7465 426c 6f63 6b49 6e66 s$UpdateBlockInf 0x00a0: 6f51 95eb 77f2 0f4a 640c 0000 7870 772f oQ..w..Jd...xpw/ 0x00b0: 0001 3000 0a31 3732 2e31 382e 302e 3200 ..0..172.18.0.2. 0x00c0: 0085 4900 0007 7264 645f 385f 3008 0100 ..I...rdd_8_0... 0x00d0: 0000 0000 0000 0000 0000 0000 0000 0278 ...............x
BlockManager 는 MapOutputTracker 에게 Block 정보를 공유한다.
위처럼 BlockManagerMasterEndpoint 로 UpdateBlockInfo RPC 가 요청되면 BlockManager 는 자신의 인메모리 자료구조에 Block 의 메타정보를 저장합니다.
그리고 BlockManager 는 MapOutputTracker 에게 Block 의 정보를 공유합니다.
이를 코드 레벨에서 보자면 아래와 같이 BlockManagerMasterEndpoint 를 UpdateBlockInfo RPC 요청을 전달받습니다.
그리고 Block 이 Shuffle Block 인지 아니면 RDD 나 Broadcast Block 인지 체크합니다.
( 블록의 종류는 크게 Shuffle, Broadcast, RDD 로 분류됩니다. )
class BlockManagerMasterEndpoint { // .. 생략 override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // .. 생략 case UpdateBlockInfo => if (blockId.isShuffle) { updateShuffleBlockInfo(blockId, blockManagerId).foreach(handleResult) } else { handleResult(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)) } // .. 생략 } }
그리고 Shuffle Block 에 한하여 아래의 함수와 같이 MapOutputTracker 에게 Shuffle Block 의 정보를 제공합니다.
이 과정을 통해서 MapOutputTracker 는 셔플 블록의 위치 정보와 Map Task 의 Partition 정보 등을 저장할 수 있게 됩니다.
private def updateShuffleBlockInfo(blockId: BlockId, blockManagerId: BlockManagerId) : Future[Boolean] = { blockId match { case ShuffleIndexBlockId(shuffleId, mapId, _) => Future { mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId) true } } }
GetMapOutputStatuses RPC.
GroupByKey 나 Join 과 같은 Wide Dependency 연산을 사용하게 되면,
Map Task 는 Shuffle Block 을 생성하게 되고, 생성된 Block 은 Driver 로 전송됩니다.
이 과정에서 BlockManager 의 UpdateBlockInfo 가 사용되구요.
Driver 의 MapOutputTracker 는 Shuffle Block 의 메타 정보를 가지게 됩니다.
GetMapOutputStatuses RPC 는 Reduce Task 가 Driver 에게 요청하는 RPC 입니다.
Reduce Task 는 Map Task 가 생성한 Shuffle Block 의 위치와 크기 정보를 알 수 없기 때문에 Driver 에게 요청을 하게 됩니다.
아래의 TCP Packet 은 ShuffleID 1번에 대한 MapStatus 정보를 요청하는 RPC Packet 입니다.
11:51:05.963121 IP worker1.spark.53692 > 6d3d8756f42c.41543: Flags [P.], seq 62381:62493, ack 99525, win 3220, options [nop,nop,TS val 2883747891 ecr 752328262], length 112 0x0000: 4500 00a4 05e0 4000 4006 dc48 ac12 0003 E.....@.@..H.... 0x0010: ac12 0004 d1bc a247 f028 ce57 eb33 5d9b .......G.(.W.3]. 0x0020: 8018 0c94 58c2 0000 0101 080a abe2 8033 ....X..........3 0x0030: 2cd7 9e46 0001 000c 3664 3364 3837 3536 ,..F....6d3d8756 0x0040: 6634 3263 0000 a247 0010 4d61 704f 7574 f42c...G..MapOut 0x0050: 7075 7454 7261 636b 6572 aced 0005 7372 putTracker....sr 0x0060: 0025 6f72 672e 6170 6163 6865 2e73 7061 .%org.apache.spa 0x0070: 726b 2e47 6574 4d61 704f 7574 7075 7453 rk.GetMapOutputS 0x0080: 7461 7475 7365 73e5 1892 4d5d 5209 ad02 tatuses...M]R... 0x0090: 0001 4900 0973 6875 6666 6c65 4964 7870 ..I..shuffleIdxp 0x00a0: 0000 0001 ....
반응형'Spark' 카테고리의 다른 글
[Spark] RangePartitioner 알아보기 (0) 2024.07.29 [Spark] RDD combineByKey 알아보기 (0) 2024.07.29 [Spark] Block 알아보기 (0) 2024.07.26 [Spark] s3a Protocol 에서 Driver 가 파일 존재를 확인하는 과정 (0) 2024.07.23 [Spark] RDD GroupBy 와 Shuffle 알아보기 (0) 2024.07.09