ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka Connect] JdbcSourceConnector table.whitelist 알아보기
    Kafka/kafka Connect 2024. 6. 18. 05:59
    반응형

    - 목차

     

    들어가며.

    이번 글에서는 JdbcSourceConnectortable.whitelist 속성에 대해서 알아보도록 하겠습니다.

    JdbcSourceConnectorKafka ConnectSource Connector 로써 외부 저장소로부터 데이터를 추출하는 작업을 수행합니다.

    이름처럼 Jdbc Driver 와 호환되는 모든 SQL 기반의 데이터베이스를 그 대상으로 삼습니다.

    MySQL 과 같은 SQL 기반 데이터베이스를 Table 이라는 논리적은 규격을 정의하고, 그 규격대로 데이터를 생성합니다.

    JdbcSourceConnector 의 table.whitelist 속성은 JdbcSourceConnector 가 획득해야하는 모든 테이블을의 이름을 작성합니다.

    이렇게 table.whitelist 에 작성된 테이블들은 JdbcSourceConnector 에 의해서 조회 대상이 됩니다.

     

    이번 컨텐츠는 table.whitelist 의 활용과 그 내부의 세부적인 사항들에 대해서 알아보도록 하겠습니다.

     

    Kafka 환경 구축하기.

    먼저 JdbcSourceConnector 와 table.whitelist 속성에 대해서 알아보기 위해 Kafka 환경을 구축하도록 하겠습니다.

    아래의 링크의 글은 Docker 를 기반으로 Kafka 와 Zookeeper, Connect Worker 와 MySQL 를 실행하는 내용을 포함합니다.

     

    https://westlife0615.tistory.com/903

     

    [Kafka Connect] JdbcSourceConnector 구현해보기 (1)

    - 목차 들어가며.이번 글에서는 Kafka Connect 의 JdbcSourceConnector 에 대해서 간단히 알아보는 시간을 가지겠습니다.JdbcSourceConnector 를 실행하기 위해서 Kafka Broker, Zookeeper, Connect Worker 와 MySQL 등 여러

    westlife0615.tistory.com

     

    Kafka Cluster 구축.

    아래의 Docker 명령어들은 Kafka 와 Zookeeper 그리고 Kafka-UI 도커 컨테이너를 실행하는 명령어 목록입니다.

    상세한 설명은 https://westlife0615.tistory.com/903 의 내용을 참고해주시면 좋을 것 같습니다.

     

    docker network create kafka
    
    docker run -d --network kafka \
      --name zookeeper1 --hostname zookeeper1 \
      -e ZOOKEEPER_SERVER_ID=1 \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      --restart always \
      confluentinc/cp-zookeeper:7.8.0
    
    docker run -d --network kafka \
      --name kafka1 --hostname kafka1 \
      -e KAFKA_BROKER_ID=1 \
      -e KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181 \
      -e KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka1:9092 \
      -e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
      -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT \
      -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
      -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
      -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
      --restart always \
      confluentinc/cp-kafka:7.8.0
    
    docker run -d --network kafka \
      --name kafka-ui \
      -e DYNAMIC_CONFIG_ENABLED='true' \
      -e KAFKA_CLUSTERS_0_NAME=test1 \
      -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092 \
      --restart always \
      -p 8080:8080 \
      provectuslabs/kafka-ui:v0.7.2

     

     

    Kafka Connect 구축.

    아래의 코드들은 Kafka Connect Worker 를 실행하기 위한 예시 코드들입니다.

    JdbcSourceConnector 와 MySQL Driver 가 추가된 Docker Image 를 생성하구요.

    생성된 Docker Image 를 기반으로 Kafka Connect Worker 를 실행할 수 있습니다.

     

    cat <<'EOF'> Dockerfile
    FROM confluentinc/cp-kafka-connect-base:7.8.0
    
    ENV CONNECT_PLUGIN_PATH="/usr/share/java"
    
    RUN confluent-hub install --no-prompt --component-dir /usr/share/java confluentinc/kafka-connect-jdbc:10.7.4
    RUN curl -L -o /usr/share/java/confluentinc-kafka-connect-jdbc/lib/mysql-connector-java-8.0.23.jar \
        https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.23/mysql-connector-java-8.0.23.jar
    
    
    EXPOSE 8083
    
    CMD ["/etc/confluent/docker/run"]
    EOF
    docker build -t kafka-jdbc-connect .
    docker run -d --network kafka \
      --name connect-1 --hostname connect-1 \
      -e CONNECT_BOOTSTRAP_SERVERS=kafka1:9092 \
      -e CONNECT_REST_ADVERTISED_HOST_NAME=connect-1 \
      -e CONNECT_GROUP_ID=connect-cluster \
      -e CONNECT_CONFIG_STORAGE_TOPIC=connect-configs \
      -e CONNECT_OFFSET_STORAGE_TOPIC=connect-offsets \
      -e CONNECT_STATUS_STORAGE_TOPIC=connect-status \
      -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
      -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
      -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
      -e CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      -e CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      -e CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      -e CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      kafka-jdbc-connect:latest

     

     

    MySQL 구축하기.

    MySQL 은 JdbcSourceConnector 가 데이터를 읽어들일 외부 데이터 저장소의 역할을 합니다.

    이번 글의 주제가 table.whitelist 이기 때문에 여러 개의 Table 을 생성해보도록 하겠습니다.

    아래의 코드들은 MySQL 의 초기화 스크립트와 이를 실행하기 위한 명령어입니다.

     

    cat <<'EOF'> /tmp/init.sql
    CREATE DATABASE base;
    USE base;
    
    CREATE TABLE users (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        email VARCHAR(100) NOT NULL UNIQUE
    );
    
    CREATE TABLE products (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        description VARCHAR(100) NOT NULL UNIQUE
    );
    
    DELIMITER $$
    
    CREATE PROCEDURE GenerateUsers()
    BEGIN
        DECLARE i INT DEFAULT 1;
    
        WHILE i <= 10000 DO
            INSERT INTO users (name, email)
            VALUES (CONCAT('User', i), CONCAT('user', i, '@example.com'));
            SET i = i + 1;
        END WHILE;
    END$$
    
    DELIMITER $$
    
    CREATE PROCEDURE GenerateProducts()
    BEGIN
        DECLARE i INT DEFAULT 1;
    
        WHILE i <= 10000 DO
            INSERT INTO products (name, description)
            VALUES (CONCAT('Product', i), CONCAT('product', '-', i));
            SET i = i + 1;
        END WHILE;
    END$$
    
    DELIMITER ;
    
    CALL GenerateUsers();
    CALL GenerateProducts();
    EOF
    docker run --platform linux/amd64 --network kafka -d \
      --name mysql --hostname mysql \
      --mount type=bind,source=/tmp/init.sql,target=/docker-entrypoint-initdb.d/init.sql \
      -e MYSQL_ROOT_PASSWORD=1234 \
      mysql:8.0.23

     

    위 코드들이 모두 실행되면, MySQL Docker Container 가 실행되고, 아래와 같이 Table 과 Rows 들이 초기화됩니다.

     

     

     

    1개의 Table 조회하는 JdbcSourceConnector 생성.

    table.whitelist 속성을 통해서 1개의 Table 을 지정하고, 특정 테이블의 Rows 들을 조회할 수 있습니다.

    아래의 명령어는 MySQL 의 products 테이블의 모든 Row 들을 mysql-products Topic 으로 전송하는 Connector 의 예시입니다.

     

    docker exec connect-1 curl -X POST -H "Content-Type: application/json" \
      --data '{
          "name": "jdbc-source-connector",
          "config": {
              "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
              "tasks.max": "1",
              "connection.url": "jdbc:mysql://mysql:3306/base",
              "connection.user": "root",
              "connection.password": "1234",
              "table.whitelist": "products",
              "mode": "incrementing",
              "incrementing.column.name": "id",
              "topic.prefix": "mysql-",
              "poll.interval.ms": "5000"
          }
      }' \
      http://localhost:8083/connectors

     

    총 1만개의 products 테이블의 Rows 들이 mysql-products Topic 으로 옮겨집니다.

     

     

     

     

    mysql.log 확인하기.

    mysql.log 파일 내부에는 Client 가 MySQL 에 요청한 Query 의 목록들이 기록됩니다.

    ( 이는 general_log 설정이 ON 으로 활성화되어 있어야 기록이 시작되는데, 이에 대한 설명은 생략하도록 하겠습니다. )

     

    아래의 결과와 같이 JdbcSourceConnector 는 Select 쿼리를 기반으로 동작하게 됩니다.

     

    Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > -1 ORDER BY `base`.`products`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC
    Query commit
    Quit
    Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC
    Query commit

     

     

    2개의 테이블들을 조회하는 JdbcSourceConnector 생성하기.

    table.whitelist 는 , (comma) 를 기준으로 여러개의 테이블 이름을 지정할 수 있습니다.

    아래의 명령어는 users, products 테이블들로부터 데이터를 추출하여 Kafka Topic 으로 전달하는 Connector 의 예시입니다.

    Connector 의 이름은 jdbc-source-connector-2 로 설정하였고, table.whitelist 의 값은 products,users 가 문자열로 설정됩니다.

     

    docker exec connect-1 curl -X POST -H "Content-Type: application/json" \
      --data '{
          "name": "jdbc-source-connector-2",
          "config": {
              "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
              "tasks.max": "1",
              "connection.url": "jdbc:mysql://mysql:3306/base",
              "connection.user": "root",
              "connection.password": "1234",
              "table.whitelist": "products,users",
              "mode": "incrementing",
              "incrementing.column.name": "id",
              "topic.prefix": "mysql-",
              "poll.interval.ms": "5000"
          }
      }' \
      http://localhost:8083/connectors

     

    위 JdbcSourceConnector 의 실행 결과 아래와 같이 2개의 Topic 에 데이터가 옮겨집니다.

    mysql-products Topic 은 기존의 jdbc-source-connector 가 실행했던 1만개의 Record 에서 새롭게 1만개의 Record 가 추가되었습니다.

    총 2만개의 Record 들이 확인되며, mysql-users Topic 에는 이와 같이 1만개의 Record 들이 삽입됩니다.

     

     

     

    JdbcSourceConnector 종료하는 방법.

    일반적으로 Kafka Connect 의 Connector 들은 Streaming Mode 로 동작하기에 종료되지 않습니다.

    따라서 Connector 를 삭제하기 위한 Restful API 를 요청해야합니다.

    그 예시는 아래와 같습니다.

    삭제하고자 하는 Connector 의 이름을 /connectors/{Connector Name} 의 마지막 Pathname 영역에 명시해주어야 합니다.

     

    docker exec connect-1 curl -X DELETE http://localhost:8083/connectors/jdbc-source-connector

     

    tasks.max 를 활용해서 병렬 처리하기.

    일반적으로 Kafka Connect 는 여러개의 Worker 들로 구성되며, 동시처리가 가능합니다.

    Kafka Connect 의 Distributed Mode 실행을 위해서 추가적인 Worker 를 생성하도록 하겠습니다.

     

    아래의 명령어를 통해서 2개의 Connect Worker 들을 추가합니다.

    총 3개의 Kafka Connect Worker 들이 생성되게 됩니다.

     

    docker run -d --network kafka \
      --name connect-2 --hostname connect-2 \
      -e CONNECT_BOOTSTRAP_SERVERS=kafka1:9092 \
      -e CONNECT_REST_ADVERTISED_HOST_NAME=connect-2 \
      -e CONNECT_GROUP_ID=connect-cluster \
      -e CONNECT_CONFIG_STORAGE_TOPIC=connect-configs \
      -e CONNECT_OFFSET_STORAGE_TOPIC=connect-offsets \
      -e CONNECT_STATUS_STORAGE_TOPIC=connect-status \
      -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
      -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
      -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
      -e CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      -e CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      -e CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      -e CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      kafka-jdbc-connect:latest
    
    docker run -d --network kafka \
      --name connect-3 --hostname connect-3 \
      -e CONNECT_BOOTSTRAP_SERVERS=kafka1:9092 \
      -e CONNECT_REST_ADVERTISED_HOST_NAME=connect-3 \
      -e CONNECT_GROUP_ID=connect-cluster \
      -e CONNECT_CONFIG_STORAGE_TOPIC=connect-configs \
      -e CONNECT_OFFSET_STORAGE_TOPIC=connect-offsets \
      -e CONNECT_STATUS_STORAGE_TOPIC=connect-status \
      -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
      -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
      -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
      -e CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      -e CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      -e CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      -e CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      kafka-jdbc-connect:latest

     

     

    이게 총 3개의 Worker 들이 실행 중이기 때문에 tasks.max 의 값을 3으로 설정하게 되면, JdbcSourceConnector 는 총 3개의 Task 로 동작하게 됩니다.

     

    그리고 아래와 같이 생성된 2개의 Table 인 users 와 products 를 table.whitelist 에 추가합니다.

    users,products 테이블을 추출하는 JdbcSourceConnector 를 실행하며, 실행을 위한 명령어는 아래와 같습니다.

     

    docker exec connect-1 curl -X POST -H "Content-Type: application/json" \
      --data '{
          "name": "jdbc-source-connector",
          "config": {
              "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
              "tasks.max": "3",
              "connection.url": "jdbc:mysql://mysql:3306/base",
              "connection.user": "root",
              "connection.password": "1234",
              "table.whitelist": "users,products",
              "mode": "incrementing",
              "incrementing.column.name": "id",
              "topic.prefix": "mysql-",
              "poll.interval.ms": "5000"
          }
      }' \
      http://localhost:8083/connectors

     

    JdbcSourceConnector 가 실행된 후에 connect-configs 토픽의 Record 들을 확인봅니다.

    아래와 같이 task-jdbc-source-connector-0, task-jdbc-source-connector-1 를 Key 로 하는 2개의 Record 가 존재합니다.

    이는 총 2개의 Task 가 실행 중임을 의미합니다.

    JdbcSourceConnector 의 Stream Mode 는 table.whitelist 에 등록된 테이블 갯수만큼 병렬 또는 동시 처리가 가능합니다.

     

     

    아래의 Query 목록은 mysql.log 파일에서 확인 가능한 Select Query 목록입니다.

    아래의 기록처럼 users 과 products 테이블을 조회하는 쿼리가 함께 수행됩니다.

     

    Query SELECT * FROM `base`.`users` WHERE `base`.`users`.`id` > 10000 ORDER BY `base`.`users`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`users` WHERE `base`.`users`.`id` > 10000 ORDER BY `base`.`users`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`users` WHERE `base`.`users`.`id` > 10000 ORDER BY `base`.`users`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`users` WHERE `base`.`users`.`id` > 10000 ORDER BY `base`.`users`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`users` WHERE `base`.`users`.`id` > 10000 ORDER BY `base`.`users`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`products` WHERE `base`.`products`.`id` > 10000 ORDER BY `base`.`products`.`id` ASC
    Query commit
    Query SELECT * FROM `base`.`users` WHERE `base`.`users`.`id` > 10000 ORDER BY `base`.`users`.`id` ASC
    Query commit

     

     

    반응형
Designed by Tistory.