-
[Spark] JDBC DataFrameReader 알아보기 (MySQL)Spark 2024. 1. 28. 07:39728x90반응형
- 목차
들어가며.
JDBC Connector 를 활용한 DataFrameReader 에 대한 글을 작성하려고 합니다.
Docker 로 MySQL 실행하기.
먼저 Docker 를 활용하여 MySQL 컨테이너를 실행해보도록 하겠습니다.
간단한 실행만으로 실습에 필요한 데이터를 생성할 수 있도록 Command 위주로 작성하겠습니다.
Docker Run MySQL.
docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=1234 mysql:8.0
Docker Exec MySQL Container.
docker exec -it mysql sh
Create Table.
mysql -uroot -p1234; use mysql; create table users ( id bigint not null auto_increment primary key, name varchar(64) not null, age int not null, city varchar(64) ); insert into users(name, age, city) values ("Andy", 32, "Seoul"), ("Brad", 31, "Seoul"), ("Chris", 33, "Seoul"); select * from mysql.users; +----+-------+-----+-------+ | id | name | age | city | +----+-------+-----+-------+ | 1 | Andy | 32 | Seoul | | 2 | Brad | 31 | Seoul | | 3 | Chris | 33 | Seoul | +----+-------+-----+-------+ 3 rows in set (0.01 sec)
JDBC DataFrameReader 생성하기.
JDBC 타입의 Reader 를 사용하기 위해서 JDBC Connector 가 필요합니다.
저는 MySQL 를 사용하기 때문에 MySQL Connector Dependency 를 build.sbt 파일에 추가하겠습니다.
version := "0.1.0-SNAPSHOT" scalaVersion := "2.13.12" val sparkVersion = "3.2.4" lazy val root = (project in file(".")) .settings( name := "spark" ) libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion, "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", // https://mvnrepository.com/artifact/mysql/mysql-connector-java "mysql" % "mysql-connector-java" % "8.0.32" )
Spark Driver 프로그램의 코드는 단순합니다.
MySQL 의 Connection 을 위한 정보를 작성하고 실행하면 원하시는 결과를 얻을 수 있습니다.
package test.spark.reader import org.apache.spark.sql.SparkSession object JDBCReader { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("test") .master("local[*]") .config("spark.driver.bindAddress", "localhost") .getOrCreate(); val df = spark.read.format("jdbc") .option("url", "jdbc:mysql://localhost:3306/mysql") .option("driver", "com.mysql.cj.jdbc.Driver") .option("dbtable", "users") .option("user", "root") .option("password", "1234") .load() val count = df.count() print(count) df.show() spark.close() } }
+---+-----+---+-----+ | id| name|age| city| +---+-----+---+-----+ | 1| Andy| 32|Seoul| | 2| Brad| 31|Seoul| | 3|Chris| 33|Seoul| +---+-----+---+-----+
특정 쿼리로 데이터 조회하기.
Spark JDBC Reader 는 query 옵션을 사용하여 특정 쿼리를 수행할 수 있습니다.
이러한 방식을 통하여 Filter, Limit 과 같은 Transformation 과 Union, Join 을 통한 여러 테이블의 병합 또한 가능합니다.
먼저 school 과 student 테이블을 만들고, Join Query 를 수행하는 SparkSQL 프로그램을 작성해보겠습니다.
테이블 및 데이터 생성.
create table school (id int not null primary key, name varchar(64), location varchar(64)); create table student (id int not null primary key, name varchar(64), school_ref int); alter table student add foreign key (school_ref) references school(id);
show create table student;
+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Table | Create Table | +---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | student | CREATE TABLE `student` ( `id` int NOT NULL, `name` varchar(64) DEFAULT NULL, `school_ref` int DEFAULT NULL, PRIMARY KEY (`id`), KEY `school_ref` (`school_ref`), CONSTRAINT `student_ibfk_1` FOREIGN KEY (`school_ref`) REFERENCES `school` (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci | +---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row in set (0.00 sec)
insert into school(id, name, location) values(1, "A", "Seoul"); insert into school(id, name, location) values(2, "B", "Seoul"); insert into school(id, name, location) values(3, "C", "Busan"); insert into student (id, name, school_ref) values(1, "Andy", 1); insert into student (id, name, school_ref) values(2, "Bob", 1); insert into student (id, name, school_ref) values(3, "Chris", 2); insert into student (id, name, school_ref) values(4, "Daniel", 3);
입력된 query 를 수행하는 JDBC DataFrameReader 코드.
package test.spark.reader import org.apache.spark.sql.SparkSession object JDBCReader { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("test") .master("local[*]") .config("spark.driver.bindAddress", "localhost") .getOrCreate(); val df = spark.read.format("jdbc") .option("url", "jdbc:mysql://localhost:3306/mysql") .option("driver", "com.mysql.cj.jdbc.Driver") .option("user", "root") .option("password", "1234") .option("query", s""" |select student.id as student_id, student.name as student_name, school.name as school_name, location as school_location |from student |inner join school on student.id = school.id""".stripMargin) .load() val count = df.count() print(count) df.show() spark.close() } }
+----------+------------+-----------+---------------+ |student_id|student_name|school_name|school_location| +----------+------------+-----------+---------------+ | 1| Andy| A| Seoul| | 2| Bob| B| Seoul| | 3| Chris| C| Busan| +----------+------------+-----------+---------------+
마치며.
혹시나 읽어보시다가 궁금증이 생기시거나 오류를 발견해주신다면 많은 질문과 요청해주시면 좋을 것 같아요.
읽어주셔서 감사합니다.
반응형'Spark' 카테고리의 다른 글
[Spark] approxCountDistinct 알아보기 (0) 2024.02.22 [Spark] Logical Plan 알아보기 1 (Catalyst Optimizer) (2) 2024.01.28 [Spark] S3 DataFrameReader 구현하기 (s3a) (0) 2024.01.28 [Spark] groupBy, RelationalGroupedDataset 알아보기 (0) 2024.01.26 [Spark] Union 알아보기 (0) 2024.01.26