ABOUT ME

와주셔서 감사합니다. 좋은 글을 많이 쓰겠습니다.

Today
Yesterday
Total
  • [Spark] MapOutputTracker 알아보기
    Spark 2024. 7. 29. 05:37
    반응형

    - 목차

     

    MapOutputTracker 란 ?

    스파크는 하둡의 MapReduce 처럼 Map 과 Reduce 라는 표현을 사용합니다.

    셔플링이 발생하는 Wide Transformation 을 경계로 이전 Stage 를 Map Task, 후속 Stage 에 Reduce Task 를 배치합니다.

    그리고 아래의 이미지처럼 Map Task 는 Reduce Task 에게 Shuffle Block 이라는 데이터들을 전달하게 됩니다.

     

     

    Shuffle Block 은 Map Task 가 처리한 데이터의 결과물이며, BlockManager 에 의해서 관리됩니다.

    MapOutputTracker 는 셔플링 과정에서 Map Task 가 생성한 결과물을 추적/관리하는 스파크의 구성요소이며,

    이는 Driver 와 Executor 에 각각 존재합니다.

     

    https://westlife0615.tistory.com/996

     

    [Spark] Block 알아보기

    - 목차 RDD Cache Block.먼저 알아볼 Block 의 종류는 RDD Cache Block 입니다.RDD 의 cache 나 persist 함수를 통해서 RDD 를 캐싱할 수 있게 됩니다.  import org.apache.spark.storage.StorageLevelval baseRdd = sc.parallelize(1 to

    westlife0615.tistory.com

     

     

     

    MapOutputTrackerMaster 와 MapOutputTrackerWorker.

    Map Task 의 데이터 처리 결과물을 관리하는 MapOutputTracker 는 Driver 와 Executor 에 각각 존재합니다.

    Driver 에 존재하는 MapOutputTracker 는 MapOutputTrackerMaster 라고 불리고,

    Executor 에 존재하는 MapOutputTracker 는 MapOutputTrackerWorker 라고 불립니다.

    스파크는 MapOutput 을 관리하는 방식을 Driver 를 통한 중앙 관리 방식을 채택하였습니다.

    그리하여 Map Task 가 생성한 데이터 처리 결과를 Driver 에게 보고하게 되고, 이를 Driver 는 관리합니다.

     

     

    BlockManagerMaster 와 BlockManager.

    BlockManager 또한 MapOutputTracker 와 같이 Driver - Executor 간의 서버/클라이언트 구조를 취합니다.

    Map Task 는 Reduce Task 가 처리해야할 Shuffle Block 을 생성하게 됩니다.

    그리고 이렇게 생성된 Shuffle Block 의 메타데이터를 Driver 에게 전달합니다.

    Executor 의 BlockManager 는 UpdateBlockInfo RPC 를 Driver 의 BlockManagerMaster 에게 전달합니다.

    Driver 의 BlockManagerMaster 는 전달받은 UpdateBlockInfo RPC 의 정보를 처리하여 내부 저장소에 저장하게 됩니다.

     

    아래의 case class 는 UpdateBlockInfo 의 대략적인 정보를 설명합니다.

    이를 간단히 요약하면

    • BlockManagerId : Executor 의 IP 와 Port
    • BlockId : rdd_1_0, shuffle_1_0_0 과 같은 블록의 고유한 식별값
    • 그 외 블록의 사이즈
    case class UpdateBlockInfo(
      var blockManagerId: BlockManagerId,
      var blockId: BlockId,
      var storageLevel: StorageLevel,
      var memSize: Long,
      var diskSize: Long)

     

    이렇게 Executor 의 BlockManager 는 Driver 의 BlockManagerMasterEndpoint 에게 생성된 Block 의 정보를 제공합니다.

    그리고 아래와 같이 Map 자료구조에 Block Info 가 저장되어 In-Memory 형식으로 블록 정보가 관리됩니다.

    blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo],

     

     

    UpdateBlockInfo RPC .

     

    Worker Node 에서 실행되는 Executor 는 Driver 에게 UpdateBlockInfo RPC 요청을 전송합니다.

    이는 Executor 에서 Block 이 생성되면 Driver 의 BlockManagerMasterEndpoint 로 블록이 생성됨을 알리기 위함입니다.

    RDD 의 persist 나 cache 함수를 통해서 RDD Cache Block 이 만들어지거나

    Wide Transformation 에 의해서 Shuffle Block 이 생성되면 UpdateBlockInfo RPC 가 전송됩니다.

     

    아래는 Exectuor 가 Driver 에게 UpdateBlockInfo RPC 를 전송하는 TCP Packet 입니다.

    생성되는 Block 의 갯수만큼 Executor 는 Driver 에게 UpdateBlockInfo RPC 를 전송하게 됩니다.

     

    00:39:51.089350 IP worker1.spark.36934 > 8b324e9bf0d7.40683: Flags [P.], seq 20078:20250, ack 51693, win 4815, options [nop,nop,TS val 847474233 ecr 2154189235], length 172
    	0x0000:  4500 00e0 e695 4000 4006 fb57 ac12 0002  E.....@.@..W....
    	0x0010:  ac12 0004 9046 9eeb 3f89 9fd1 92a6 93ca  .....F..?.......
    	0x0020:  8018 12cf 58fd 0000 0101 080a 3283 6e39  ....X.......2.n9
    	0x0030:  8066 51b3 0001 000c 3862 3332 3465 3962  .fQ.....8b324e9b
    	0x0040:  6630 6437 0000 9eeb 0012 426c 6f63 6b4d  f0d7......BlockM
    	0x0050:  616e 6167 6572 4d61 7374 6572 aced 0005  anagerMaster....
    	0x0060:  7372 003d 6f72 672e 6170 6163 6865 2e73  sr.=org.apache.s
    	0x0070:  7061 726b 2e73 746f 7261 6765 2e42 6c6f  park.storage.Blo
    	0x0080:  636b 4d61 6e61 6765 724d 6573 7361 6765  ckManagerMessage
    	0x0090:  7324 5570 6461 7465 426c 6f63 6b49 6e66  s$UpdateBlockInf
    	0x00a0:  6f51 95eb 77f2 0f4a 640c 0000 7870 772f  oQ..w..Jd...xpw/
    	0x00b0:  0001 3000 0a31 3732 2e31 382e 302e 3200  ..0..172.18.0.2.
    	0x00c0:  0085 4900 0007 7264 645f 385f 3008 0100  ..I...rdd_8_0...
    	0x00d0:  0000 0000 0000 0000 0000 0000 0000 0278  ...............x

     

     

    BlockManager 는 MapOutputTracker 에게 Block 정보를 공유한다.

    위처럼 BlockManagerMasterEndpoint 로 UpdateBlockInfo RPC 가 요청되면 BlockManager 는 자신의 인메모리 자료구조에 Block 의 메타정보를 저장합니다.

     

     

    그리고 BlockManager 는 MapOutputTracker 에게 Block 의 정보를 공유합니다.

    이를 코드 레벨에서 보자면 아래와 같이 BlockManagerMasterEndpoint 를 UpdateBlockInfo RPC 요청을 전달받습니다.

    그리고 Block 이 Shuffle Block 인지 아니면 RDD 나 Broadcast Block 인지 체크합니다.

    ( 블록의 종류는 크게 Shuffle, Broadcast, RDD 로 분류됩니다. )

    class BlockManagerMasterEndpoint {
    	// .. 생략
      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    	// .. 생략
    
        case UpdateBlockInfo =>
    
          if (blockId.isShuffle) {
            updateShuffleBlockInfo(blockId, blockManagerId).foreach(handleResult)
          } else {
            handleResult(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
          }
    	// .. 생략
      }
    }

     

    그리고 Shuffle Block 에 한하여 아래의 함수와 같이 MapOutputTracker 에게 Shuffle Block 의 정보를 제공합니다.

    이 과정을 통해서 MapOutputTracker 는 셔플 블록의 위치 정보와 Map Task 의 Partition 정보 등을 저장할 수 있게 됩니다.

     

     private def updateShuffleBlockInfo(blockId: BlockId, blockManagerId: BlockManagerId)
        : Future[Boolean] = {
       blockId match {
         case ShuffleIndexBlockId(shuffleId, mapId, _) =>
           Future {
             mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId)
             true
           }
       }
     }

     

     

    GetMapOutputStatuses RPC.

    GroupByKey 나 Join 과 같은 Wide Dependency 연산을 사용하게 되면,

    Map Task 는 Shuffle Block 을 생성하게 되고, 생성된 Block 은 Driver 로 전송됩니다.

    이 과정에서 BlockManager 의 UpdateBlockInfo 가 사용되구요.

    Driver 의 MapOutputTracker 는 Shuffle Block 의 메타 정보를 가지게 됩니다.

     

    GetMapOutputStatuses RPC 는 Reduce Task 가 Driver 에게 요청하는 RPC 입니다.

    Reduce Task 는 Map Task 가 생성한 Shuffle Block 의 위치와 크기 정보를 알 수 없기 때문에 Driver 에게 요청을 하게 됩니다.

     

    아래의 TCP Packet 은 ShuffleID 1번에 대한 MapStatus 정보를 요청하는 RPC Packet 입니다.

    11:51:05.963121 IP worker1.spark.53692 > 6d3d8756f42c.41543: Flags [P.], seq 62381:62493, ack 99525, win 3220, options [nop,nop,TS val 2883747891 ecr 752328262], length 112
    	0x0000:  4500 00a4 05e0 4000 4006 dc48 ac12 0003  E.....@.@..H....
    	0x0010:  ac12 0004 d1bc a247 f028 ce57 eb33 5d9b  .......G.(.W.3].
    	0x0020:  8018 0c94 58c2 0000 0101 080a abe2 8033  ....X..........3
    	0x0030:  2cd7 9e46 0001 000c 3664 3364 3837 3536  ,..F....6d3d8756
    	0x0040:  6634 3263 0000 a247 0010 4d61 704f 7574  f42c...G..MapOut
    	0x0050:  7075 7454 7261 636b 6572 aced 0005 7372  putTracker....sr
    	0x0060:  0025 6f72 672e 6170 6163 6865 2e73 7061  .%org.apache.spa
    	0x0070:  726b 2e47 6574 4d61 704f 7574 7075 7453  rk.GetMapOutputS
    	0x0080:  7461 7475 7365 73e5 1892 4d5d 5209 ad02  tatuses...M]R...
    	0x0090:  0001 4900 0973 6875 6666 6c65 4964 7870  ..I..shuffleIdxp
    	0x00a0:  0000 0001                                ....

     

     

    반응형
Designed by Tistory.