ABOUT ME

와주셔서 감사합니다. 좋은 글을 많이 쓰겠습니다.

Today
Yesterday
Total
  • [Spark] Block 알아보기
    Spark 2024. 7. 26. 17:21
    반응형

    - 목차

     

    RDD Cache Block.

    먼저 알아볼 Block 의 종류는 RDD Cache Block 입니다.

    RDD 의 cache 나 persist 함수를 통해서 RDD 를 캐싱할 수 있게 됩니다. 

     

    import org.apache.spark.storage.StorageLevel
    
    val baseRdd = sc.parallelize(1 to 10, 4).map { x =>
      x * 2
    }
    
    val cachedRdd = baseRdd.persist(StorageLevel.DISK_ONLY)
    
    val sum1 = cachedRdd.sum()
    println(s"First sum: $sum1")

     

    Disk 에 저장된 Cache Block.

    캐싱된 RDD 는 Block 이라는 파일 형태로 Disk 에 저장됩니다.

    저장되는 File Path 는 spark.local.dir 이라는 설정을 통해서 지정할 수 있구요.

    기본값은 /tmp 로 설정됩니다.

     

    구체적인 저장 위치는 아래와 같습니다.

    좀 복잡하긴 한대요. 

    /tmp/spark-{uuid}/executor-{uuid}/blockmgr-{uuid} 와 같은 포맷으로 File Path 가 지정됩니다.

    /tmp
    /spark-9b5ea5fe-b81a-404b-bbc9-bcd5b3a601c5
    /executor-79f7cf33-bc99-45f4-ae9c-0e6ee5559dde
    /blockmgr-1f205f97-52f0-4863-9148-5373757cae25

     

     

    그리고 그 하위에 아래와 같은 형식으로 4개의 RDD File 이 생성되게 됩니다.

    rdd_1_0, rdd_1_1, rdd_1_2, rdd_1_3 인 4개의 파일이 생성됩니다.

    ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/15:
    rdd_1_0
    
    ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/16:
    rdd_1_1
    
    ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/17:
    rdd_1_2
    
    ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/18:
    rdd_1_3

     

     

    RDD Caching Block 의 이름은 RDD 와 파티션 아이디의 조합으로 구성됩니다.

    새롭게 생성되는 RDD 일수록 RDD ID 는 증가하며, Partition 이 증가할 수록 Caching RDD Block 의 갯수 또한 증가합니다.

     

    예를 들어, 10개의 Partition 을 가지는 RDD 를 캐싱하게 되면 아래와 같이 10개의 Caching Block 이 만들어집니다.

    ( RDD ID 가 6인 이유는 여러번 실험을 하였기 때문입니다. )

    import org.apache.spark.storage.StorageLevel
    sc.parallelize(1 to 10, 10).persist(StorageLevel.DISK_ONLY).collect()
    ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/1a:
    rdd_6_0
    
    ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/1b:
    rdd_6_1
    
    ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/1c:
    rdd_6_2
    
    ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/1d:
    rdd_6_3
    
    ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/1e:
    rdd_6_4
    
    ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/1f:
    rdd_6_5
    
    ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/20:
    rdd_6_6
    
    ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/21:
    rdd_6_7
    
    ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/22:
    rdd_6_8
    
    ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/23:
    rdd_6_9
    `-- executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8
        `-- blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170
            |-- 15
            |   `-- rdd_1_0
            |-- 16
            |   `-- rdd_1_1
            |-- 17
            |   `-- rdd_1_2
            |-- 18
            |   `-- rdd_1_3
            |-- 19
            |-- 1a
            |   `-- rdd_6_0
            |-- 1b
            |   `-- rdd_6_1
            |-- 1c
            |   `-- rdd_6_2
            |-- 1d
            |   `-- rdd_6_3
            |-- 1e
            |   `-- rdd_6_4
            |-- 1f
            |   `-- rdd_6_5
            |-- 20
            |   `-- rdd_6_6
            |-- 21
            |   `-- rdd_6_7
            |-- 22
            |   `-- rdd_6_8
            `-- 23
                `-- rdd_6_9

     

     

    UpdateBlockInfo RPC 에 대해서.

    이렇게 Block 이 생성되면, Executor 들은 Driver 에게 생성된 Block 에 대한 정보를 전달합니다.

    실제 Executor 와 Driver 사이에 교환되는 네트워크 패킷을 확인해보면 아래와 같이 Executor 가 Driver 에게 UpdateBlockInfo 요청을 하게 됩니다.

    rdd_8_0 에 해당하는 캐시 블록의 생성을 Driver 의 BlockManagerMaster 에게 전달하게 됩니다.

    그리고 BlockManagerMaster 를 이를 관리하죠.

    00:39:51.089350 IP worker1.spark.36934 > 8b324e9bf0d7.40683: Flags [P.], seq 20078:20250, ack 51693, win 4815, options [nop,nop,TS val 847474233 ecr 2154189235], length 172
    	0x0000:  4500 00e0 e695 4000 4006 fb57 ac12 0002  E.....@.@..W....
    	0x0010:  ac12 0004 9046 9eeb 3f89 9fd1 92a6 93ca  .....F..?.......
    	0x0020:  8018 12cf 58fd 0000 0101 080a 3283 6e39  ....X.......2.n9
    	0x0030:  8066 51b3 0001 000c 3862 3332 3465 3962  .fQ.....8b324e9b
    	0x0040:  6630 6437 0000 9eeb 0012 426c 6f63 6b4d  f0d7......BlockM
    	0x0050:  616e 6167 6572 4d61 7374 6572 aced 0005  anagerMaster....
    	0x0060:  7372 003d 6f72 672e 6170 6163 6865 2e73  sr.=org.apache.s
    	0x0070:  7061 726b 2e73 746f 7261 6765 2e42 6c6f  park.storage.Blo
    	0x0080:  636b 4d61 6e61 6765 724d 6573 7361 6765  ckManagerMessage
    	0x0090:  7324 5570 6461 7465 426c 6f63 6b49 6e66  s$UpdateBlockInf
    	0x00a0:  6f51 95eb 77f2 0f4a 640c 0000 7870 772f  oQ..w..Jd...xpw/
    	0x00b0:  0001 3000 0a31 3732 2e31 382e 302e 3200  ..0..172.18.0.2.
    	0x00c0:  0085 4900 0007 7264 645f 385f 3008 0100  ..I...rdd_8_0...
    	0x00d0:  0000 0000 0000 0000 0000 0000 0000 0278  ...............x

     

    이러한 정보 교환의 결과로써 Driver 의 Web UI 에서 캐싱된 RDD 의 메타정보를 확인할 수 있습니다.

     

     

     

     

    Shuffle Block.

    groupBy, partitionBy 등과 같은 연산을 사용하게 되면, 여러 개의 Stage 가 생성되면서 Wide Transformation 이 발생합니다.

    아래와 같이 간단한 groupByKey 의 예시를 살펴보면 아래의 스크립트와 시각화된 Job 을 확인할 수 있습니다.

    sc.parallelize(1 until 1000, 10)
    .map(entry => (entry % 10, entry))
    .groupByKey()
    .mapValues(iter => iter.sum)
    .collect()

     

     

    그리고 이렇게 Wide Transformation 이 발생하게 되면, Worker Node 는 아래와 같인 Shuffle Block 들이 생성되게 됩니다. 

    data, index, checksum 파일이 각각 생성되지만, 첫번째 Task 의 Partition 갯수만큼 Block 파일이 생성되게 됩니다.

     

    -- spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6
        `-- executor-9dab617c-7da5-4817-bd39-58f0e2991073
            `-- blockmgr-175d616d-a890-4db0-b589-6ce1897ca94f
                |-- 04
                |   |-- shuffle_0_8_0.checksum.ADLER32
                |   `-- shuffle_0_8_0.data
                |-- 05
                |-- 06
                |   `-- shuffle_0_6_0.checksum.ADLER32
                |-- 08
                |   `-- shuffle_0_8_0.index
                |-- 09
                |-- 0a
                |   `-- shuffle_0_6_0.index
                |-- 0b
                |-- 0c
                |   |-- shuffle_0_0_0.data
                |   `-- shuffle_0_4_0.index
                |-- 0d
                |   `-- shuffle_0_3_0.index
                |-- 0e
                |-- 0f
                |   `-- shuffle_0_1_0.index
                |-- 15
                |   |-- shuffle_0_1_0.checksum.ADLER32
                |   `-- shuffle_0_1_0.data
                |-- 16
                |-- 17
                |   `-- shuffle_0_3_0.checksum.ADLER32
                |-- 1a
                |-- 1b
                |   `-- shuffle_0_7_0.data
                |-- 1c
                |-- 1d
                |   `-- shuffle_0_9_0.data
                |-- 23
                |   `-- shuffle_0_9_0.checksum.ADLER32
                |-- 24
                |-- 25
                |   `-- shuffle_0_7_0.checksum.ADLER32
                |-- 26
                |-- 27
                |   |-- shuffle_0_5_0.checksum.ADLER32
                |   `-- shuffle_0_5_0.data
                |-- 28
                |-- 29
                |   `-- shuffle_0_3_0.data
                |-- 30
                |   `-- shuffle_0_0_0.index
                |-- 32
                |   `-- shuffle_0_2_0.index
                |-- 34
                |   `-- shuffle_0_0_0.checksum.ADLER32
                |-- 35
                |   `-- shuffle_0_5_0.index
                |-- 36
                |   |-- shuffle_0_2_0.checksum.ADLER32
                |   `-- shuffle_0_2_0.data
                |-- 37
                |   `-- shuffle_0_7_0.index
                |-- 38
                |   |-- shuffle_0_4_0.checksum.ADLER32
                |   `-- shuffle_0_4_0.data
                |-- 39
                |   `-- shuffle_0_9_0.index
                |-- 3a
                |   `-- shuffle_0_6_0.data

     

     

    Wide Transformation 이 발생하면 2개의 Stage 로 나뉘게 됩니다. 

    그리고 선행하는 Stage 의 Map Task 와 후속 Stage 인 Reduce Task 로 나뉘며, Reduce Task 는 ShuffledRDD 로 표현됩니다.

    ShuffledRDD 는 이전 Stage 의 Shuffle Block 파일들은 Netty 기반의 BlockTransferService 를 통해서 전달받게 됩니다.

     

     

     

     

    관련된 명령어들.

    docker network create spark
    
    docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh
    
    docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077
    docker run -d --name worker2 --hostname worker2 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077
    
    docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040
    
    docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040 --conf spark.executor.instances=2 --conf spark.executor.cores=1 --conf spark.executor.memory=1G

     

     

    반응형
Designed by Tistory.