ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] JDBC DataFrameReader 알아보기 (MySQL)
    Spark 2024. 1. 28. 07:39
    728x90
    반응형

    - 목차

     

    들어가며.

    JDBC Connector 를 활용한 DataFrameReader 에 대한 글을 작성하려고 합니다.

     

    Docker 로 MySQL 실행하기.

    먼저 Docker 를 활용하여 MySQL 컨테이너를 실행해보도록 하겠습니다.

    간단한 실행만으로 실습에 필요한 데이터를 생성할 수 있도록 Command 위주로 작성하겠습니다.

     

    Docker Run MySQL.

    docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=1234 mysql:8.0

     

    Docker Exec MySQL Container.

    docker exec -it mysql sh

     

    Create Table.

    mysql -uroot -p1234;
    
    use mysql;
    
    create table users (
      id bigint not null auto_increment primary key,
      name varchar(64) not null,
      age int not null,
      city varchar(64)
    );
    
    insert into users(name, age, city) 
    values ("Andy", 32, "Seoul"), ("Brad", 31, "Seoul"), ("Chris", 33, "Seoul");
    
    select * from mysql.users;
    +----+-------+-----+-------+
    | id | name  | age | city  |
    +----+-------+-----+-------+
    |  1 | Andy  |  32 | Seoul |
    |  2 | Brad  |  31 | Seoul |
    |  3 | Chris |  33 | Seoul |
    +----+-------+-----+-------+
    3 rows in set (0.01 sec)

     

     

    JDBC DataFrameReader 생성하기.

    JDBC 타입의 Reader 를 사용하기 위해서 JDBC Connector 가 필요합니다.

    저는 MySQL 를 사용하기 때문에 MySQL Connector Dependency 를 build.sbt 파일에 추가하겠습니다.

     

    version := "0.1.0-SNAPSHOT"
    scalaVersion := "2.13.12"
    val sparkVersion = "3.2.4"
    
    lazy val root = (project in file("."))
      .settings(
        name := "spark"
      )
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % sparkVersion,
      "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
      // https://mvnrepository.com/artifact/mysql/mysql-connector-java
      "mysql" % "mysql-connector-java" % "8.0.32"
    )

     

     

    Spark Driver 프로그램의 코드는 단순합니다.

    MySQL 의 Connection 을 위한 정보를 작성하고 실행하면 원하시는 결과를 얻을 수 있습니다.

     

    package test.spark.reader
    
    import org.apache.spark.sql.SparkSession
    
    object JDBCReader {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName("test")
          .master("local[*]")
          .config("spark.driver.bindAddress", "localhost")
          .getOrCreate();
    
        val df = spark.read.format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/mysql")
          .option("driver", "com.mysql.cj.jdbc.Driver")
          .option("dbtable", "users")
          .option("user", "root")
          .option("password", "1234")
          .load()
    
        val count = df.count()
        print(count)
        df.show()
        spark.close()
    
      }
    }
    +---+-----+---+-----+
    | id| name|age| city|
    +---+-----+---+-----+
    |  1| Andy| 32|Seoul|
    |  2| Brad| 31|Seoul|
    |  3|Chris| 33|Seoul|
    +---+-----+---+-----+

     

     

    특정 쿼리로 데이터 조회하기.

    Spark JDBC Reader 는 query 옵션을 사용하여 특정 쿼리를 수행할 수 있습니다.

    이러한 방식을 통하여 Filter, Limit 과 같은 Transformation 과 Union, Join 을 통한 여러 테이블의 병합 또한 가능합니다.

     

    먼저 school 과 student 테이블을 만들고, Join Query 를 수행하는 SparkSQL 프로그램을 작성해보겠습니다.

     

    테이블 및 데이터 생성.

    create table school (id int not null primary key, name varchar(64), location varchar(64));
    create table student (id int not null primary key, name varchar(64), school_ref int);
    alter table student add foreign key (school_ref) references school(id);
    show create table student;
    +---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | Table   | Create Table                                                                                                                                                                                                                                                                                                                      |
    +---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | student | CREATE TABLE `student` (
      `id` int NOT NULL,
      `name` varchar(64) DEFAULT NULL,
      `school_ref` int DEFAULT NULL,
      PRIMARY KEY (`id`),
      KEY `school_ref` (`school_ref`),
      CONSTRAINT `student_ibfk_1` FOREIGN KEY (`school_ref`) REFERENCES `school` (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
    +---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    1 row in set (0.00 sec)
    insert into school(id, name, location) values(1, "A", "Seoul");
    insert into school(id, name, location) values(2, "B", "Seoul");
    insert into school(id, name, location) values(3, "C", "Busan");
    
    insert into student (id, name, school_ref) values(1, "Andy", 1);
    insert into student (id, name, school_ref) values(2, "Bob", 1);
    insert into student (id, name, school_ref) values(3, "Chris", 2);
    insert into student (id, name, school_ref) values(4, "Daniel", 3);

     

    입력된 query 를 수행하는 JDBC DataFrameReader 코드.

    package test.spark.reader
    
    import org.apache.spark.sql.SparkSession
    
    object JDBCReader {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName("test")
          .master("local[*]")
          .config("spark.driver.bindAddress", "localhost")
          .getOrCreate();
    
        val df = spark.read.format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/mysql")
          .option("driver", "com.mysql.cj.jdbc.Driver")
          .option("user", "root")
          .option("password", "1234")
          .option("query",
            s"""
               |select student.id as student_id, student.name as student_name, school.name as school_name, location as school_location
               |from student
               |inner join school on student.id = school.id""".stripMargin)
          .load()
    
        val count = df.count()
        print(count)
        df.show()
        spark.close()
    
      }
    }
    +----------+------------+-----------+---------------+
    |student_id|student_name|school_name|school_location|
    +----------+------------+-----------+---------------+
    |         1|        Andy|          A|          Seoul|
    |         2|         Bob|          B|          Seoul|
    |         3|       Chris|          C|          Busan|
    +----------+------------+-----------+---------------+

     

    마치며.

    혹시나 읽어보시다가 궁금증이 생기시거나 오류를 발견해주신다면 많은 질문과 요청해주시면 좋을 것 같아요.

    읽어주셔서 감사합니다.

     

     

    반응형
Designed by Tistory.