ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spark RDD Storage 알아보기 (Persist, Cache)
    Spark 2023. 12. 15. 06:40
    728x90
    반응형

     

    - 목차

     

    소개.

    RDD 는 persist 와 cache Function 을 가집니다.

    persist, cache 기능을 통해서 RDD 의 중간 상태를 Storage 에 임시 저장을 할 수 있습니다.

    예를 들어, RDD = SparkContext.parellelize([1,2,3,4,5]) 와 같은 CollectionRDD 가 존재할 때에

    RDD.persist(), RDD.cache() 와 같은 형태로 RDD 의 데이터 상태를 저장할 수 있습니다.

    그리고 저장소의 유형 또한 설정이 가능한데요.

    Memory, Disk 로 설정이 가능합니다.

    이번 글에서는 RDD 의 Persist 와 Storage 에 대해서 알아보려고 합니다.

     

    persist and cache.

    RDD 는 persist 와 cache function 가집니다.

    사용법은 아래와 같습니다.

     

    package org.example;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    
    import java.util.Arrays;
    import java.util.List;
    
    public class SparkTest {
      public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("SparkRDDExample")
                .set("spark.driver.bindAddress", "127.0.0.1")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        JavaRDD<Integer> numbersRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 20);
        numbersRDD.cache();
        
        JavaRDD<Integer> squaredRDD = numbersRDD.map(num -> num * num);
        squaredRDD.cache();
        
        JavaRDD<Integer> evenNumbersRDD = squaredRDD.filter(num -> num % 2 == 0);
        evenNumbersRDD.cache();
    
        JavaRDD<Integer> delayedRDD = evenNumbersRDD.map(num -> {
          Thread.sleep(10000000);
          return num;
        });
        
        List<Integer> results = delayedRDD.collect();
        sc.stop();
      }
    
    }

     

     

    총 4개의 RDD 로 구성되는데요.

    - numbersRDD
    - squaredRDD
    - evenNumbersRDD
    - delayedRDD

     

    numbersRDD, squaredRDD, evenNumbersRDD 3개의 RDD 는 cache() 에 의해서

    Cached 된 상태를 보여줍니다.

     

    Cached 상태는 메모리에 저장되었다는 의미와 동일합니다.

    그래서 cache() 와 persist(MEMORY_ONLY) 는 동일하게 동작합니다.

     

    뿐만 아니라 persist 로 저장소를 다양하게 선택할 수 있습니다.

    아래와 같이 Storage 를 지정할 수 있구요.

    JavaRDD<Integer> numbersRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 20);
    numbersRDD.persist(StorageLevel.MEMORY_ONLY());
    
    JavaRDD<Integer> squaredRDD = numbersRDD.map(num -> num * num);
    squaredRDD.persist(StorageLevel.MEMORY_AND_DISK());
    
    JavaRDD<Integer> evenNumbersRDD = squaredRDD.filter(num -> num % 2 == 0);
    evenNumbersRDD.persist(StorageLevel.DISK_ONLY());

     

    아래와 같이 각 RDD 의 Storage Level 을 확인할 수 있습니다.

     

    참고로 DISK_ONLY 로 설정된 데이터는 /tmp/ Directory 하위에서 확인할 수 있습니다.

    tree /tmp/blockmgr-fcbf53de-efaf-437b-8e59-ba5fdc4a164b 
    
    /tmp/blockmgr-fcbf53de-efaf-437b-8e59-ba5fdc4a164b
    ├── 0e
    ├── 11
    ├── 15
    ├── 16
    │   └── rdd_2_0
    ├── 17
    │   └── rdd_2_1
    ├── 18
    │   └── rdd_2_2
    └── 19
        └── rdd_2_3
    
    8 directories, 4 files

     

     

    Storage.

    Storage Level 은 크게 2가지입니다.

    Memory 또는 Disk 입니다.

    여기서 Replication 까지 조합하여 여러가지 형태의 Storage Level 설정이 가능해지는데요.

    각 Storage Level 에 대해서 알아보겠습니다.

     

    MEMORY_ONLY.

    MEMORY_ONLY 설정은 RDD 의 중간 데이터를 메모리에만 저장합니다.

    여기서 고려해야하는 점은 Storage 가 어떻게 사용되는가 입니다.

    Storage 는 보통 Retry 과정에서 퍼포먼스를 향상시킬 수 있는 하나의 방식입니다.

    RDD Lineage 에 의해서 RDD 를 생성하기 위한 Transformation 들이 기록이 됩니다.

    예를 들어, 위의 예시를 참고해보면

     

    각 RDD 는 아래와 같이 표현됩니다.

    PrevRDD 와 Transformtion 를 활용하여 Retry 가 가능하죠.

    numbersRDD => Source : [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    squaredRDD => PrevRDD : numbersRDD & Transform : num^2
    
    evenNumbersRDD => PrevRDD : squaredRDD & Transform : num % 2 == 0

     

     

    반면, Storage 를 사용하게 되면 이전 단계 RDD 의 값이 이미 저장되어있기 때문에 불필요한 Computation 과정을 건너뛰게 됩니다.

    이러한 방식으로 Optimization 이 가능합니다.

    MEMORY_ONLY 는 Worker Node 가 종료되어 메모리가 휘발되었다면 Retry 시에 다시 Computation 을 수행해야합니다.

    즉, MEMORY_ONLY 는 크게 이점이 없을 수도 있습니다.

     

    MEMORY_ONLY_2.

    MEMORY_ONLY_N 과 같은 방식으로 Worker Node 의 수만큼 Replication 을 수행할 수 있습니다.

    N 은 Replication Factor 라고 생각하시면 됩니다.

    이러한 방식으로 Worker Node 의 종료로 메모리가 휘발되는 케이스에 대한 염려를 줄어듭니다.

    다른 Worker Node 의 메모리에 복제되어 있던 RDD 를 사용하면 되기 때문이죠.

     

     

    MEMORY_AND_DISK.

    이는 메모리와 디스크를 모두 사용하는 방식입니다.

    단, 메모리의 크기가 부족하게 되면 Spill 방식으로 디스크를 사용하게 됩니다.

    이 방식 또한 Replication 이 가능합니다.

     

     

    DISK_ONLY.

    DISK_ONLY 는 RDD 의 중간 상태를 디스크에 저장합니다.

    디스크에 저장함으로써 Worker Node 의 종료에 대한 데이터 영구성 문제가 해결됩니다.

    DISK_ONLY 시에 RDD 가 저장되는 물리적인 위치는 spark.local.dir 에 설정한 위치를 따르게 됩니다.

     

    아래와 같이 설정하게 되면, /tmp/test 하위에 RDD 상태가 저장됩니다.

    SparkConf conf = new SparkConf().setAppName("SparkRDDExample")
                .set("spark.driver.bindAddress", "127.0.0.1")
                .set("spark.local.dir", "/tmp/test")
                .setMaster("local");

     

    tree /tmp/test
    /tmp/test
    ├── blockmgr-9c65f074-24f9-4562-a5af-333f1626affd
    │   ├── 0e
    │   ├── 11
    │   ├── 15
    │   ├── 16
    │   │   └── rdd_2_0
    │   ├── 17
    │   │   └── rdd_2_1
    │   ├── 18
    │   │   └── rdd_2_2
    │   └── 19
    │       └── rdd_2_3
    └── spark-f7c54072-5625-4758-99bb-b67a7ab6b5b9
        └── userFiles-671ddda7-2d0e-48ff-b252-54ffa4ce13e7
    
    11 directories, 4 files

    반응형

    'Spark' 카테고리의 다른 글

    [Spark] Unresolved Logical Plan 알아보기  (0) 2023.12.20
    Spark RDD Lineage 알아보기  (0) 2023.12.16
    [Spark] parallelize 알아보기  (0) 2023.12.15
    Spark DataFrame 알아보기  (0) 2023.12.15
    Helm 으로 Spark 구축하기  (0) 2023.12.15
Designed by Tistory.