-
[Spark] Block 알아보기Spark 2024. 7. 26. 17:21반응형
- 목차
RDD Cache Block.
먼저 알아볼 Block 의 종류는 RDD Cache Block 입니다.
RDD 의 cache 나 persist 함수를 통해서 RDD 를 캐싱할 수 있게 됩니다.
import org.apache.spark.storage.StorageLevel val baseRdd = sc.parallelize(1 to 10, 4).map { x => x * 2 } val cachedRdd = baseRdd.persist(StorageLevel.DISK_ONLY) val sum1 = cachedRdd.sum() println(s"First sum: $sum1")
Disk 에 저장된 Cache Block.
캐싱된 RDD 는 Block 이라는 파일 형태로 Disk 에 저장됩니다.
저장되는 File Path 는 spark.local.dir 이라는 설정을 통해서 지정할 수 있구요.
기본값은 /tmp 로 설정됩니다.
구체적인 저장 위치는 아래와 같습니다.
좀 복잡하긴 한대요.
/tmp/spark-{uuid}/executor-{uuid}/blockmgr-{uuid} 와 같은 포맷으로 File Path 가 지정됩니다.
/tmp /spark-9b5ea5fe-b81a-404b-bbc9-bcd5b3a601c5 /executor-79f7cf33-bc99-45f4-ae9c-0e6ee5559dde /blockmgr-1f205f97-52f0-4863-9148-5373757cae25
그리고 그 하위에 아래와 같은 형식으로 4개의 RDD File 이 생성되게 됩니다.
rdd_1_0, rdd_1_1, rdd_1_2, rdd_1_3 인 4개의 파일이 생성됩니다.
./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/15: rdd_1_0 ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/16: rdd_1_1 ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/17: rdd_1_2 ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/18: rdd_1_3
RDD Caching Block 의 이름은 RDD 와 파티션 아이디의 조합으로 구성됩니다.
새롭게 생성되는 RDD 일수록 RDD ID 는 증가하며, Partition 이 증가할 수록 Caching RDD Block 의 갯수 또한 증가합니다.
예를 들어, 10개의 Partition 을 가지는 RDD 를 캐싱하게 되면 아래와 같이 10개의 Caching Block 이 만들어집니다.
( RDD ID 가 6인 이유는 여러번 실험을 하였기 때문입니다. )
import org.apache.spark.storage.StorageLevel sc.parallelize(1 to 10, 10).persist(StorageLevel.DISK_ONLY).collect()
./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/1a: rdd_6_0 ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/1b: rdd_6_1 ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/1c: rdd_6_2 ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/1d: rdd_6_3 ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/1e: rdd_6_4 ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/1f: rdd_6_5 ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/20: rdd_6_6 ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/21: rdd_6_7 ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/22: rdd_6_8 ./spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6/executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8/blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170/23: rdd_6_9
`-- executor-fcd8cd5b-28f6-4057-a8c5-22ea4a3450a8 `-- blockmgr-2825caef-cd56-4b33-a1ec-34419fd96170 |-- 15 | `-- rdd_1_0 |-- 16 | `-- rdd_1_1 |-- 17 | `-- rdd_1_2 |-- 18 | `-- rdd_1_3 |-- 19 |-- 1a | `-- rdd_6_0 |-- 1b | `-- rdd_6_1 |-- 1c | `-- rdd_6_2 |-- 1d | `-- rdd_6_3 |-- 1e | `-- rdd_6_4 |-- 1f | `-- rdd_6_5 |-- 20 | `-- rdd_6_6 |-- 21 | `-- rdd_6_7 |-- 22 | `-- rdd_6_8 `-- 23 `-- rdd_6_9
UpdateBlockInfo RPC 에 대해서.
이렇게 Block 이 생성되면, Executor 들은 Driver 에게 생성된 Block 에 대한 정보를 전달합니다.
실제 Executor 와 Driver 사이에 교환되는 네트워크 패킷을 확인해보면 아래와 같이 Executor 가 Driver 에게 UpdateBlockInfo 요청을 하게 됩니다.
rdd_8_0 에 해당하는 캐시 블록의 생성을 Driver 의 BlockManagerMaster 에게 전달하게 됩니다.
그리고 BlockManagerMaster 를 이를 관리하죠.
00:39:51.089350 IP worker1.spark.36934 > 8b324e9bf0d7.40683: Flags [P.], seq 20078:20250, ack 51693, win 4815, options [nop,nop,TS val 847474233 ecr 2154189235], length 172 0x0000: 4500 00e0 e695 4000 4006 fb57 ac12 0002 E.....@.@..W.... 0x0010: ac12 0004 9046 9eeb 3f89 9fd1 92a6 93ca .....F..?....... 0x0020: 8018 12cf 58fd 0000 0101 080a 3283 6e39 ....X.......2.n9 0x0030: 8066 51b3 0001 000c 3862 3332 3465 3962 .fQ.....8b324e9b 0x0040: 6630 6437 0000 9eeb 0012 426c 6f63 6b4d f0d7......BlockM 0x0050: 616e 6167 6572 4d61 7374 6572 aced 0005 anagerMaster.... 0x0060: 7372 003d 6f72 672e 6170 6163 6865 2e73 sr.=org.apache.s 0x0070: 7061 726b 2e73 746f 7261 6765 2e42 6c6f park.storage.Blo 0x0080: 636b 4d61 6e61 6765 724d 6573 7361 6765 ckManagerMessage 0x0090: 7324 5570 6461 7465 426c 6f63 6b49 6e66 s$UpdateBlockInf 0x00a0: 6f51 95eb 77f2 0f4a 640c 0000 7870 772f oQ..w..Jd...xpw/ 0x00b0: 0001 3000 0a31 3732 2e31 382e 302e 3200 ..0..172.18.0.2. 0x00c0: 0085 4900 0007 7264 645f 385f 3008 0100 ..I...rdd_8_0... 0x00d0: 0000 0000 0000 0000 0000 0000 0000 0278 ...............x
이러한 정보 교환의 결과로써 Driver 의 Web UI 에서 캐싱된 RDD 의 메타정보를 확인할 수 있습니다.
Shuffle Block.
groupBy, partitionBy 등과 같은 연산을 사용하게 되면, 여러 개의 Stage 가 생성되면서 Wide Transformation 이 발생합니다.
아래와 같이 간단한 groupByKey 의 예시를 살펴보면 아래의 스크립트와 시각화된 Job 을 확인할 수 있습니다.
sc.parallelize(1 until 1000, 10) .map(entry => (entry % 10, entry)) .groupByKey() .mapValues(iter => iter.sum) .collect()
그리고 이렇게 Wide Transformation 이 발생하게 되면, Worker Node 는 아래와 같인 Shuffle Block 들이 생성되게 됩니다.
data, index, checksum 파일이 각각 생성되지만, 첫번째 Task 의 Partition 갯수만큼 Block 파일이 생성되게 됩니다.
-- spark-9241da5a-1f17-4f47-bcc7-5e215de4ede6 `-- executor-9dab617c-7da5-4817-bd39-58f0e2991073 `-- blockmgr-175d616d-a890-4db0-b589-6ce1897ca94f |-- 04 | |-- shuffle_0_8_0.checksum.ADLER32 | `-- shuffle_0_8_0.data |-- 05 |-- 06 | `-- shuffle_0_6_0.checksum.ADLER32 |-- 08 | `-- shuffle_0_8_0.index |-- 09 |-- 0a | `-- shuffle_0_6_0.index |-- 0b |-- 0c | |-- shuffle_0_0_0.data | `-- shuffle_0_4_0.index |-- 0d | `-- shuffle_0_3_0.index |-- 0e |-- 0f | `-- shuffle_0_1_0.index |-- 15 | |-- shuffle_0_1_0.checksum.ADLER32 | `-- shuffle_0_1_0.data |-- 16 |-- 17 | `-- shuffle_0_3_0.checksum.ADLER32 |-- 1a |-- 1b | `-- shuffle_0_7_0.data |-- 1c |-- 1d | `-- shuffle_0_9_0.data |-- 23 | `-- shuffle_0_9_0.checksum.ADLER32 |-- 24 |-- 25 | `-- shuffle_0_7_0.checksum.ADLER32 |-- 26 |-- 27 | |-- shuffle_0_5_0.checksum.ADLER32 | `-- shuffle_0_5_0.data |-- 28 |-- 29 | `-- shuffle_0_3_0.data |-- 30 | `-- shuffle_0_0_0.index |-- 32 | `-- shuffle_0_2_0.index |-- 34 | `-- shuffle_0_0_0.checksum.ADLER32 |-- 35 | `-- shuffle_0_5_0.index |-- 36 | |-- shuffle_0_2_0.checksum.ADLER32 | `-- shuffle_0_2_0.data |-- 37 | `-- shuffle_0_7_0.index |-- 38 | |-- shuffle_0_4_0.checksum.ADLER32 | `-- shuffle_0_4_0.data |-- 39 | `-- shuffle_0_9_0.index |-- 3a | `-- shuffle_0_6_0.data
Wide Transformation 이 발생하면 2개의 Stage 로 나뉘게 됩니다.
그리고 선행하는 Stage 의 Map Task 와 후속 Stage 인 Reduce Task 로 나뉘며, Reduce Task 는 ShuffledRDD 로 표현됩니다.
ShuffledRDD 는 이전 Stage 의 Shuffle Block 파일들은 Netty 기반의 BlockTransferService 를 통해서 전달받게 됩니다.
관련된 명령어들.
docker network create spark docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077 docker run -d --name worker2 --hostname worker2 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077 docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040 docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040 --conf spark.executor.instances=2 --conf spark.executor.cores=1 --conf spark.executor.memory=1G
반응형'Spark' 카테고리의 다른 글
[Spark] RDD combineByKey 알아보기 (0) 2024.07.29 [Spark] MapOutputTracker 알아보기 (0) 2024.07.29 [Spark] s3a Protocol 에서 Driver 가 파일 존재를 확인하는 과정 (0) 2024.07.23 [Spark] RDD GroupBy 와 Shuffle 알아보기 (0) 2024.07.09 [Spark] RDD treeReduce 알아보기 (0) 2024.07.09