ABOUT ME

와주셔서 감사합니다. 좋은 글을 많이 쓰겠습니다.

Today
Yesterday
Total
  • [Spark] RDD Cogroup 연산 알아보기
    Spark 2024. 7. 29. 05:38
    반응형

    - 목차

     

    Cogroup 이란 ?

    Cogroup 연산은 여러 RDD 를 key 기준으로 그룹핑하여, 동일한 key에 매핑된 value 들을 각각의 RDD에서 모아오는 연산입니다.

    Cogroup 연산의 Input 과 Output 은 아래와 같습니다.

    // 2개의 Input
    RDD1: RDD[(K, V1)]
    RDD2: RDD[(K, V2)]
    
    // Output
    RDD[(K, (Iterable[V1], Iterable[V2]))]

     

    그리하여 아래와 같은 연산은 동일한 Key 를 가지는 각 Input RDD 의 Value 들이 Iterable 형식으로 표현됩니다.

    val rdd1 = sc.parallelize(Seq(
      ("a", 1), ("b", 2), ("a", 3)
    ))
    
    val rdd2 = sc.parallelize(Seq(
      ("a", 100), ("b", 200), ("c", 300)
    ))
    
    val cogrouped = rdd1.cogroup(rdd2)
    
    cogrouped.collect().foreach(println)

     

    (a, (CompactBuffer(1, 3), CompactBuffer(100)))
    (b, (CompactBuffer(2), CompactBuffer(200)))
    (c, (CompactBuffer(), CompactBuffer(300)))

     

    Cogroup 과 Shuffle Block.

    먼저 Cogroup 연산은 Input RDD 의 Partition 의 갯수가 다르다면 셔플링을 발생시킵니다.

    아래의 예시는 2개의 RDD 에 대해서 CoGroup 연산을 적용하며,

    rdd1 은 10개의 Partition, rdd2 는 5개의 Partition 을 가집니다.

    그리고 CoGroupRDD 의 파티션 갯수는 4개로 설정하였습니다.

     

    val rdd1 = sc.parallelize(1 until 1000, 10).map { value =>
        val key = value % 10
        (key, value)
    }
    
    val rdd2 = sc.parallelize(1001 until 2000, 5).map { value =>
        val key = value % 10
        (key, value)
    }
    
    val cogrouped = rdd1.cogroup(rdd2, 4)
    
    cogrouped.collect().foreach(println)

     

    위 코드가 실행되면 아래와 같이 총 15개의 Shuffle Block 파일들이 생성됩니다.

    위 10개는 rdd1 으로부터 생성된 10개의 Shuffle Block 이구요.

    아래의 5개 파일은 rdd2 로부터 생성된 5개의 Shuffle Block 입니다.

     

                |   `-- shuffle_0_8_0.data
                |   `-- shuffle_0_7_0.data
                |   `-- shuffle_0_9_0.data
                |   |-- shuffle_0_5_0.data
                |   `-- shuffle_0_11_0.data
                |   `-- shuffle_0_14_0.data
                |   `-- shuffle_0_13_0.data
                |   `-- shuffle_0_12_0.data
                |   `-- shuffle_0_10_0.data
                |   `-- shuffle_0_6_0.data
                
                |   `-- shuffle_1_3_0.data
                |   `-- shuffle_1_1_0.data
                |   `-- shuffle_1_2_0.data
                |   `-- shuffle_1_4_0.data
                |   `-- shuffle_1_0_0.data

     

     

    아래의 DAG Visualization 과 같이 rdd1 과 rdd2 는 Stage 0, Stage 1 에서 데이터 처리 후에 각각 Shuffle Block 들을 생성하게 됩니다.

     

     

    Stage 0 의 구체적은 RDD Lineage 는 아래와 같습니다.

    ParallelCollectionRDD 에서 1 ~ 1000 까지의 데이터를 생성하며, MapPartitionsRDD 에서 PairRDD 로 변형하는 과정을 취합니다.

     

    CoGroupedRDD .

    2개의 RDD 들을 대상으로 cogroup 연산을 수행하게 되면 총 3개의 Stage 들이 생성되게 됩니다.

    Stage 0 과 Stage 1 는 각각의 RDD 에 해당하는 Stage 이구요.

    Stage 2 는 CoGroupedRDD 가 생성됩니다.

    이는 ShuffledRDD 처럼 Wide Transformation 이후에 셔플링된 결과를 취합하는 대표적인 RDD 입니다.

    위에서 만들어진 총 15개의 Shuffle Block 을 가져와 실질적인 CoGroup 연산을 수행하게 됩니다.

     

    아래의 함수는 CoGroupedRDD 의 compute 함수의 내용입니다.

    ( 다소 생략된 내용입니다. )

    아래 함수에서 중요한 포인트는 아래와 같습니다. 

    • getReader(shuffleDependency) 를 통해서 다른 Executor 또는 Worker Node 에 있는 원격의 Shuffle Block 들을 불러옵니다.
    • map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
      연산을 통하여 Key, (Iterable[V1], Iterable[V2]) 형식의 데이터를 만듭니다. 

     

      override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
        val split = s.asInstanceOf[CoGroupPartition]
        val numRdds = dependencies.length
    	// .. 중략
        val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
        for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
          case shuffleDependency: ShuffleDependency[_, _, _] =>
    
            val metrics = context.taskMetrics().createTempShuffleReadMetrics()
            val it = SparkEnv.get.shuffleManager
              .getReader(
                shuffleDependency.shuffleHandle, split.index, split.index + 1, context, metrics)
              .read()
            rddIterators += ((it, depNum))
        }
    	// .. 중략
        val map = createExternalMap(numRdds)
        for ((it, depNum) <- rddIterators) {
          map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
        }
    
      }

     

     

    Join 연산과 CoGroup 연산의 관계 알아보기.

    실제로 CoGroup 연산은 자주 활용되진 않습니다.

    하지만 Join 연산은 내부적으로 CoGroup 을 활용합니다.

    아래의 간단한 Join 연산을 수행해보도록 하겠습니다.

     

    val rdd1 = sc.parallelize(Seq(
      (1, "apple"),
      (2, "banana"),
      (3, "orange")
    ))
    
    val rdd2 = sc.parallelize(Seq(
      (1, 100),
      (2, 200),
      (4, 400)
    ))
    
    val joined = rdd1.join(rdd2)
    
    joined.collect().foreach(println)

     

    그럼 아래의 이미지와 같이 간단한 DAG Visualization 을 확인할 수 있습니다.

     

     

     

    Stage 5 인 Join Stage 를 좀 더 자세히 살펴봅니다.

    Join 연산은 내부적으로 CoGroupedRDD 를 활용하구여. 

     

    실제 join 이라는 RDD 함수 또한 아래와 같이 cogroup 연산을 수행합니다.

    그리고 flatMapValues 를 통하여 InnerJoin, LeftOuterJoin, FullOuterJoin 등의 실질적인 필터링 과정을  취하게 됩니다.

    따라서 위 DAG Visualization 에서 MapPartitionsRDD 는 flatMapValues 에 설정된 Closure Function 을 실행하는 Mapper 단계라고 생각하시면 됩니다.

     

      def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
        this.cogroup(other, partitioner).flatMapValues( pair =>
          for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
        )
      }
      
      def leftOuterJoin[W](
          other: RDD[(K, W)],
          partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
        this.cogroup(other, partitioner).flatMapValues { pair =>
          if (pair._2.isEmpty) {
            pair._1.iterator.map(v => (v, None))
          } else {
            for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
          }
        }
      }
      
      def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
          : RDD[(K, (Option[V], W))] = self.withScope {
        this.cogroup(other, partitioner).flatMapValues { pair =>
          if (pair._1.isEmpty) {
            pair._2.iterator.map(w => (None, w))
          } else {
            for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
          }
        }
      }

     

     

     

     

    관련 명령어들.

    docker network create spark
    
    docker run -d --name master --hostname master --network spark -p 8080:8080 -p 7077:7077 bitnami/spark:3.2.3 start-master.sh
    
    docker run -d --name worker1 --hostname worker1 --network spark bitnami/spark:3.2.3 start-worker.sh spark://master:7077
    
    docker run -it --rm --name client --network spark -p 4040:4040 bitnami/spark:3.2.3 spark-shell --master spark://master:7077 --conf spark.ui.port=4040
    
    docker exec -it --user root --privileged worker1 bash
    apt update && apt install tree -y

     

    반응형
Designed by Tistory.