-
[Kafka Connect] JdbcSourceConnector table.whitelist 알아보기Kafka/kafka Connect 2024. 6. 18. 05:59반응형
- 목차
들어가며.
이번 글에서는 JdbcSourceConnector 의 table.whitelist 속성에 대해서 알아보도록 하겠습니다.
JdbcSourceConnector 는 Kafka Connect 의 Source Connector 로써 외부 저장소로부터 데이터를 추출하는 작업을 수행합니다.
이름처럼 Jdbc Driver 와 호환되는 모든 SQL 기반의 데이터베이스를 그 대상으로 삼습니다.
MySQL 과 같은 SQL 기반 데이터베이스를 Table 이라는 논리적은 규격을 정의하고, 그 규격대로 데이터를 생성합니다.
JdbcSourceConnector 의 table.whitelist 속성은 JdbcSourceConnector 가 획득해야하는 모든 테이블을의 이름을 작성합니다.
이렇게 table.whitelist 에 작성된 테이블들은 JdbcSourceConnector 에 의해서 조회 대상이 됩니다.
이번 컨텐츠는 table.whitelist 의 활용과 그 내부의 세부적인 사항들에 대해서 알아보도록 하겠습니다.
Kafka 환경 구축하기.
먼저 JdbcSourceConnector 와 table.whitelist 속성에 대해서 알아보기 위해 Kafka 환경을 구축하도록 하겠습니다.
아래의 링크의 글은 Docker 를 기반으로 Kafka 와 Zookeeper, Connect Worker 와 MySQL 를 실행하는 내용을 포함합니다.
https://westlife0615.tistory.com/903
Kafka Cluster 구축.
아래의 Docker 명령어들은 Kafka 와 Zookeeper 그리고 Kafka-UI 도커 컨테이너를 실행하는 명령어 목록입니다.
상세한 설명은 https://westlife0615.tistory.com/903 의 내용을 참고해주시면 좋을 것 같습니다.
docker network create kafka docker run -d --network kafka \ --name zookeeper1 --hostname zookeeper1 \ -e ZOOKEEPER_SERVER_ID=1 \ -e ZOOKEEPER_CLIENT_PORT=2181 \ --restart always \ confluentinc/cp-zookeeper:7.8.0 docker run -d --network kafka \ --name kafka1 --hostname kafka1 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181 \ -e KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka1:9092 \ -e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \ -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \ --restart always \ confluentinc/cp-kafka:7.8.0 docker run -d --network kafka \ --name kafka-ui \ -e DYNAMIC_CONFIG_ENABLED='true' \ -e KAFKA_CLUSTERS_0_NAME=test1 \ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092 \ --restart always \ -p 8080:8080 \ provectuslabs/kafka-ui:v0.7.2
Kafka Connect 구축.
아래의 코드들은 Kafka Connect Worker 를 실행하기 위한 예시 코드들입니다.
JdbcSourceConnector 와 MySQL Driver 가 추가된 Docker Image 를 생성하구요.
생성된 Docker Image 를 기반으로 Kafka Connect Worker 를 실행할 수 있습니다.
cat <<'EOF'> Dockerfile FROM confluentinc/cp-kafka-connect-base:7.8.0 ENV CONNECT_PLUGIN_PATH="/usr/share/java" RUN confluent-hub install --no-prompt --component-dir /usr/share/java confluentinc/kafka-connect-jdbc:10.7.4 RUN curl -L -o /usr/share/java/confluentinc-kafka-connect-jdbc/lib/mysql-connector-java-8.0.23.jar \ https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.23/mysql-connector-java-8.0.23.jar EXPOSE 8083 CMD ["/etc/confluent/docker/run"] EOF
docker build -t kafka-jdbc-connect .
docker run -d --network kafka \ --name connect-1 --hostname connect-1 \ -e CONNECT_BOOTSTRAP_SERVERS=kafka1:9092 \ -e CONNECT_REST_ADVERTISED_HOST_NAME=connect-1 \ -e CONNECT_GROUP_ID=connect-cluster \ -e CONNECT_CONFIG_STORAGE_TOPIC=connect-configs \ -e CONNECT_OFFSET_STORAGE_TOPIC=connect-offsets \ -e CONNECT_STATUS_STORAGE_TOPIC=connect-status \ -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ kafka-jdbc-connect:latest
MySQL 구축하기.
MySQL 은 JdbcSourceConnector 가 데이터를 읽어들일 외부 데이터 저장소의 역할을 합니다.
이번 글의 주제가 table.whitelist 이기 때문에 여러 개의 Table 을 생성해보도록 하겠습니다.
아래의 코드들은 MySQL 의 초기화 스크립트와 이를 실행하기 위한 명령어입니다.
cat <<'EOF'> /tmp/init.sql CREATE DATABASE base; USE base; CREATE TABLE users ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100) NOT NULL, email VARCHAR(100) NOT NULL UNIQUE ); CREATE TABLE products ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100) NOT NULL, description VARCHAR(100) NOT NULL UNIQUE ); DELIMITER $$ CREATE PROCEDURE GenerateUsers() BEGIN DECLARE i INT DEFAULT 1; WHILE i <= 10000 DO INSERT INTO users (name, email) VALUES (CONCAT('User', i), CONCAT('user', i, '@example.com')); SET i = i + 1; END WHILE; END$$ DELIMITER $$ CREATE PROCEDURE GenerateProducts() BEGIN DECLARE i INT DEFAULT 1; WHILE i <= 10000 DO INSERT INTO products (name, description) VALUES (CONCAT('Product', i), CONCAT('product', '-', i)); SET i = i + 1; END WHILE; END$$ DELIMITER ; CALL GenerateUsers(); CALL GenerateProducts(); EOF
docker run --platform linux/amd64 --network kafka -d \ --name mysql --hostname mysql \ --mount type=bind,source=/tmp/init.sql,target=/docker-entrypoint-initdb.d/init.sql \ -e MYSQL_ROOT_PASSWORD=1234 \ mysql:8.0.23
위 코드들이 모두 실행되면, MySQL Docker Container 가 실행되고, 아래와 같이 Table 과 Rows 들이 초기화됩니다.
1개의 Table 조회하는 JdbcSourceConnector 생성.
table.whitelist 속성을 통해서 1개의 Table 을 지정하고, 특정 테이블의 Rows 들을 조회할 수 있습니다.
아래의 명령어는 MySQL 의 products 테이블의 모든 Row 들을 mysql-products Topic 으로 전송하는 Connector 의 예시입니다.
docker exec connect-1 curl -X POST -H "Content-Type: application/json" \ --data '{ "name": "jdbc-source-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://mysql:3306/base", "connection.user": "root", "connection.password": "1234", "table.whitelist": "products", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "mysql-", "poll.interval.ms": "5000" } }' \ http://localhost:8083/connectors
총 1만개의 products 테이블의 Rows 들이 mysql-products Topic 으로 옮겨집니다.
mysql.log 확인하기.
mysql.log 파일 내부에는 Client 가 MySQL 에 요청한 Query 의 목록들이 기록됩니다.
( 이는 general_log 설정이 ON 으로 활성화되어 있어야 기록이 시작되는데, 이에 대한 설명은 생략하도록 하겠습니다. )
아래의 결과와 같이 JdbcSourceConnector 는 Select 쿼리를 기반으로 동작하게 됩니다.
Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > -1 ORDER BY `base`.`products`.`id` ASC Query commit Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC Query commit Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC Query commit Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC Query commit Quit Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC Query commit Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC Query commit Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC Query commit Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC Query commit Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC Query commit
2개의 테이블들을 조회하는 JdbcSourceConnector 생성하기.
table.whitelist 는 , (comma) 를 기준으로 여러개의 테이블 이름을 지정할 수 있습니다.
아래의 명령어는 users, products 테이블들로부터 데이터를 추출하여 Kafka Topic 으로 전달하는 Connector 의 예시입니다.
Connector 의 이름은 jdbc-source-connector-2 로 설정하였고, table.whitelist 의 값은 products,users 가 문자열로 설정됩니다.
docker exec connect-1 curl -X POST -H "Content-Type: application/json" \ --data '{ "name": "jdbc-source-connector-2", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://mysql:3306/base", "connection.user": "root", "connection.password": "1234", "table.whitelist": "products,users", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "mysql-", "poll.interval.ms": "5000" } }' \ http://localhost:8083/connectors
위 JdbcSourceConnector 의 실행 결과 아래와 같이 2개의 Topic 에 데이터가 옮겨집니다.
mysql-products Topic 은 기존의 jdbc-source-connector 가 실행했던 1만개의 Record 에서 새롭게 1만개의 Record 가 추가되었습니다.
총 2만개의 Record 들이 확인되며, mysql-users Topic 에는 이와 같이 1만개의 Record 들이 삽입됩니다.
JdbcSourceConnector 종료하는 방법.
일반적으로 Kafka Connect 의 Connector 들은 Streaming Mode 로 동작하기에 종료되지 않습니다.
따라서 Connector 를 삭제하기 위한 Restful API 를 요청해야합니다.
그 예시는 아래와 같습니다.
삭제하고자 하는 Connector 의 이름을 /connectors/{Connector Name} 의 마지막 Pathname 영역에 명시해주어야 합니다.
docker exec connect-1 curl -X DELETE http://localhost:8083/connectors/jdbc-source-connector
tasks.max 를 활용해서 병렬 처리하기.
일반적으로 Kafka Connect 는 여러개의 Worker 들로 구성되며, 동시처리가 가능합니다.
Kafka Connect 의 Distributed Mode 실행을 위해서 추가적인 Worker 를 생성하도록 하겠습니다.
아래의 명령어를 통해서 2개의 Connect Worker 들을 추가합니다.
총 3개의 Kafka Connect Worker 들이 생성되게 됩니다.
docker run -d --network kafka \ --name connect-2 --hostname connect-2 \ -e CONNECT_BOOTSTRAP_SERVERS=kafka1:9092 \ -e CONNECT_REST_ADVERTISED_HOST_NAME=connect-2 \ -e CONNECT_GROUP_ID=connect-cluster \ -e CONNECT_CONFIG_STORAGE_TOPIC=connect-configs \ -e CONNECT_OFFSET_STORAGE_TOPIC=connect-offsets \ -e CONNECT_STATUS_STORAGE_TOPIC=connect-status \ -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ kafka-jdbc-connect:latest docker run -d --network kafka \ --name connect-3 --hostname connect-3 \ -e CONNECT_BOOTSTRAP_SERVERS=kafka1:9092 \ -e CONNECT_REST_ADVERTISED_HOST_NAME=connect-3 \ -e CONNECT_GROUP_ID=connect-cluster \ -e CONNECT_CONFIG_STORAGE_TOPIC=connect-configs \ -e CONNECT_OFFSET_STORAGE_TOPIC=connect-offsets \ -e CONNECT_STATUS_STORAGE_TOPIC=connect-status \ -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ kafka-jdbc-connect:latest
이게 총 3개의 Worker 들이 실행 중이기 때문에 tasks.max 의 값을 3으로 설정하게 되면, JdbcSourceConnector 는 총 3개의 Task 로 동작하게 됩니다.
그리고 아래와 같이 생성된 2개의 Table 인 users 와 products 를 table.whitelist 에 추가합니다.
users,products 테이블을 추출하는 JdbcSourceConnector 를 실행하며, 실행을 위한 명령어는 아래와 같습니다.
docker exec connect-1 curl -X POST -H "Content-Type: application/json" \ --data '{ "name": "jdbc-source-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "3", "connection.url": "jdbc:mysql://mysql:3306/base", "connection.user": "root", "connection.password": "1234", "table.whitelist": "users,products", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "mysql-", "poll.interval.ms": "5000" } }' \ http://localhost:8083/connectors
JdbcSourceConnector 가 실행된 후에 connect-configs 토픽의 Record 들을 확인봅니다.
아래와 같이 task-jdbc-source-connector-0, task-jdbc-source-connector-1 를 Key 로 하는 2개의 Record 가 존재합니다.
이는 총 2개의 Task 가 실행 중임을 의미합니다.
JdbcSourceConnector 의 Stream Mode 는 table.whitelist 에 등록된 테이블 갯수만큼 병렬 또는 동시 처리가 가능합니다.
아래의 Query 목록은 mysql.log 파일에서 확인 가능한 Select Query 목록입니다.
아래의 기록처럼 users 과 products 테이블을 조회하는 쿼리가 함께 수행됩니다.
Query SELECT * FROM `base`.`users` WHERE `base`.`users`.`id` > 10000 ORDER BY `base`.`users`.`id` ASC Query commit Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC Query commit Query SELECT * FROM `base`.`users` WHERE `base`.`users`.`id` > 10000 ORDER BY `base`.`users`.`id` ASC Query commit Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC Query commit Query SELECT * FROM `base`.`users` WHERE `base`.`users`.`id` > 10000 ORDER BY `base`.`users`.`id` ASC Query commit Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC Query commit Query SELECT * FROM `base`.`users` WHERE `base`.`users`.`id` > 10000 ORDER BY `base`.`users`.`id` ASC Query commit Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC Query commit Query SELECT * FROM `base`.`users` WHERE `base`.`users`.`id` > 10000 ORDER BY `base`.`users`.`id` ASC Query commit Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC Query commit Query SELECT * FROM `base`.`users` WHERE `base`.`users`.`id` > 10000 ORDER BY `base`.`users`.`id` ASC Query commit
반응형'Kafka > kafka Connect' 카테고리의 다른 글
Debezium Connector 실습 환경 구축하기 (0) 2024.06.19 [Kafka Connect] JdbcSourceConnector Bulk Mode 알아보기 (0) 2024.06.18 [Kafka Connect] JdbcSourceConnector 구현해보기 (1) (0) 2024.06.17 [Kafka-Connect] Debezium MySQL Connector 구현하기 (0) 2024.02.18 Debezium Plugin 이 포함된 Kafka Connect Docker 이미지 만들기 (0) 2023.11.21