-
[Kafka Connect] JdbcSourceConnector Bulk Mode 알아보기Kafka/kafka Connect 2024. 6. 18. 05:59반응형
- 목차
들어가며.
이번 글에서는 JdbcSourceConnector 의 Bulk Mode 에 대해서 알아보도록 하겠습니다.
JdbcSourceConnector 는 크게 두가지 모드로 동작합니다.
한가지 방식인 incrementing 모드이고 다른 하나는 bulk 모드입니다.
incrementing 모드는 특정 Table 의 모든 Row 들을 하나씩 추출하게 됩니다.
만약 Table 의 Primary Key 를 기준으로 incrementing 모드로 데이터 추출을 하게 된다면,
Primary Key 1번부터 Row by Row 방식으로 데이터를 처리하게 되죠.
아래의 두 링크에서 incrementing 방식의 JdbcSourceConnector 의 동작을 확인할 수 있습니다.
https://westlife0615.tistory.com/903
https://westlife0615.tistory.com/904
반면 bulk 모드로 동작하는 JdbcSourceConnector 는 그 동작 방식이 약간 다릅니다.
Table 또는 특정 Query 에 해당하는 데이터들을 주기적으로 계속 조회하고 처리하게 됩니다.
예를 들어, 테이블의 모든 데이터 수를 추출하는 Query 를 만들고 이 Query 를 bulk 방식으로 추출할 수 있습니다.
아래는 테이블의 통계를 추출하는 임의의 쿼리이구요.
JdbcSourceConnector 는 주기적으로 아래의 통계 쿼리를 실행하여 그 결과값을 Kafka Topic 으로 생성할 수 있습니다.
SELECT COUNT(*) AS total_rows, MIN(sales) AS min_sales, MAX(sales) AS max_sales, AVG(sales) AS avg_sales FROM sales_data;
이러한 시나리오는 주기적인 통계 결과값을 노출해야하는 관리자 페이지에서 활용이 가능합니다.
bulk mode 는 모든 데이터를 한번에 조회하기 때문에 table.whitelist 에 적용된 모든 테이블의 데이터를 추출하는 건 비효율적일 수 있습니다.
따라서 bulk mode 는 query 속성과 함께 사용하곤 합니다.
카프카 환경 구축하기.
아래의 링크는 카프카 환경을 구축하기 위한 Docker Compose Yaml 파일이 존재합니다.
이를 통해서 간단한 카프카 환경을 구축 후 JdbcSourceConnector bulk mode 에 대한 여러 테스트를 수행하겠습니다.
https://westlife0615.tistory.com/91
위의 링크의 내용에 포함된 docker-compose.yaml 파일을 실행하면 아래와 같이 Kafka Cluster 와 Connect 를 실행할 수 있습니다.
MySQL 컨테이너 실행하기.
JdbcSourceConnector 의 Source Database 로 사용된 MySQL 컨테이너를 실행합니다.
아래의 코드들은 MySQL Container 를 실행하기 위한 초기화 sql 파일과 Docker 명령어입니다.
cat <<'EOF'> /tmp/init.sql CREATE DATABASE base; USE base; CREATE TABLE users ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100) NOT NULL, gender ENUM('Male', 'Female') NOT NULL, age INT NOT NULL ); DELIMITER $$ CREATE PROCEDURE GenerateUsers() BEGIN DECLARE i INT DEFAULT 1; DECLARE gender_choice ENUM('Male', 'Female'); WHILE i <= 10000 DO SET gender_choice = IF(MOD(i, 2) = 0, 'Male', 'Female'); INSERT INTO users (name, gender, age) VALUES ( CONCAT('User', i), gender_choice, FLOOR(RAND() * (60 - 18 + 1)) + 18 ); SET i = i + 1; END WHILE; END$$ DELIMITER ; CALL GenerateUsers(); EOF
docker run --platform linux/amd64 --network kafka -d \ --name mysql --hostname mysql \ --mount type=bind,source=/tmp/init.sql,target=/docker-entrypoint-initdb.d/init.sql \ -e MYSQL_ROOT_PASSWORD=1234 \ mysql:8.0.23
위 명령어의 실행이 마무리되면 아래와 같이 users 테이블과 Row 들이 생성됩니다.
select * from users order by rand() limit 10; +------+----------+--------+-----+ | id | name | gender | age | +------+----------+--------+-----+ | 7221 | User7221 | Female | 43 | | 944 | User944 | Male | 18 | | 4843 | User4843 | Female | 59 | | 8160 | User8160 | Male | 21 | | 373 | User373 | Female | 49 | | 8937 | User8937 | Female | 35 | | 9414 | User9414 | Male | 48 | | 418 | User418 | Male | 54 | | 8034 | User8034 | Male | 34 | | 8439 | User8439 | Female | 36 | +------+----------+--------+-----+ 10 rows in set (0.02 sec)
이 모든 과정이 마무리되면 아래의 Docker Desktop 과 같은 컨테이너들이 생성됩니다.
query 와 table.whitelist ?
JdbcSourceConnector 는 데이터 소스의 테이블을 지정하는 방식을 두가지가 존재합니다.
하나는 table.whitelist 속성을 사용하여 특정 테이블의 모든 Row 들을 one-by-one 으로 조회합니다.
그리고 query 속성을 통해서 지정된 쿼리의 결과를 조회할 수 있습니다.
query 와 table.whitelist 는 데이터 소스를 어떻게 조회할건지 결정하는 방식이기에 함꼐 사용할 수 없습니다.
그리고 일반적으로 table.whitelist 는 incrementing 방식과 함께 사용하며, query 속성은 bulk mode 로 사용하곤 합니다.
query 속성을 통해서 복잡한 Aggregation Query 나 Join Query 를 적용할 수 있습니다.
이어지는 내용에서 관련된 예시들을 테스트해보도록 하겠습니다.
bulk mode 사용해보기.
아래의 명령어는 실행 중인 Kafka Connect Worker 에게 "bulk-connector" 라는 이름의 Kafka Connector 를 생성하는 명령어입니다.
docker exec connect-1 curl -X POST -H "Content-Type: application/json" \ --data '{ "name": "bulk-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://mysql:3306/base", "connection.user": "root", "connection.password": "1234", "query": "select count(*) as cnt from users", "mode": "bulk", "topic.prefix": "statistics-", "poll.interval.ms": "5000" } }' \ http://localhost:8083/connectors
위 명령어가 실행되면 connect-status Topic 은 아래와 같은 Record 가 추가됩니다.
생성한 bulk-connector 의 0 번 Task 가 RUNNING 상태가 됩니다.
그리고 topic.prefix 로 지정한 "statistics-" 토픽에 아래와 같은 Query 의 결과값이 저장됩니다.
Polling 주기를 5초로 설정하였기 때문에 5초마다 Record 가 생성됩니다.
이렇게 Query 의 결과값을 지속적으로 추적할 수 있습니다.
참고로 아래의 명령어는 생성된 "bulk-connector" 의 상세 정보를 확인하는 명령어입니다.
docker exec connect-1 \ curl -X GET http://localhost:8083/connectors/bulk-connector
{ "name": "bulk-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "bulk", "topic.prefix": "statistics-", "connection.password": "1234", "tasks.max": "1", "connection.user": "root", "query": "select count(*) as cnt from users", "poll.interval.ms": "5000", "name": "bulk-connector", "connection.url": "jdbc:mysql://mysql:3306/base" }, "tasks": [], "type": "source" }
Connector 를 종료하는 명령어는 아래와 같습니다.
docker exec connect-1 \ curl -X DELETE http://localhost:8083/connectors/bulk-connector
복잡한 Query 사용해보기.
아래와 같은 성별과 연령에 대한 통계 쿼리를 주기적으로 실행하는 Kafka Connector 를 구성해보겠습니다.
SELECT gender, CASE WHEN age BETWEEN 0 AND 18 THEN '0-18' WHEN age BETWEEN 19 AND 35 THEN '19-35' WHEN age BETWEEN 36 AND 60 THEN '36-60' ELSE '61+' END AS age_group, COUNT(*) AS total_count FROM users GROUP BY gender, age_group
위 쿼리에 대한 Connector 를 실행하는 명령어는 아래와 같습니다.
docker exec connect-1 curl -X POST -H "Content-Type: application/json" \ --data '{ "name": "gender-age-statistics-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://mysql:3306/base", "connection.user": "root", "connection.password": "1234", "query": "SELECT gender, age_group, COUNT(*) as total from (SELECT gender, CASE WHEN age BETWEEN 0 AND 18 THEN '0-18' WHEN age BETWEEN 19 AND 35 THEN '19-35' WHEN age BETWEEN 36 AND 60 THEN '36-60' ELSE '61+' END AS age_group FROM users ) as t GROUP BY t.gender, t.age_group", "mode": "bulk", "topic.prefix": "gender-age-statistics", "poll.interval.ms": "5000" } }' \ http://localhost:8083/connectors
위의 커넥터의 실행 후에 "gender-age-statistics" Topic 이 생성됩니다.
그리고 아래의 이미지와 같은 형식으로 Aggregation Query 의 결과로 조회되는 Row 들이 Topic 으로 전달됩니다.
반응형'Kafka > kafka Connect' 카테고리의 다른 글
Debezium Connector 실습 환경 구축하기 (0) 2024.06.19 [Kafka Connect] JdbcSourceConnector table.whitelist 알아보기 (0) 2024.06.18 [Kafka Connect] JdbcSourceConnector 구현해보기 (1) (0) 2024.06.17 [Kafka-Connect] Debezium MySQL Connector 구현하기 (0) 2024.02.18 Debezium Plugin 이 포함된 Kafka Connect Docker 이미지 만들기 (0) 2023.11.21