ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka Connect] JdbcSourceConnector Bulk Mode 알아보기
    Kafka/kafka Connect 2024. 6. 18. 05:59
    반응형

    - 목차

     

    들어가며.

    이번 글에서는 JdbcSourceConnectorBulk Mode 에 대해서 알아보도록 하겠습니다.

    JdbcSourceConnector크게 두가지 모드로 동작합니다.

    한가지 방식인 incrementing 모드이고 다른 하나는 bulk 모드입니다.

    incrementing 모드는 특정 Table 의 모든 Row 들을 하나씩 추출하게 됩니다.

    만약 Table 의 Primary Key 를 기준으로 incrementing 모드로 데이터 추출을 하게 된다면,

    Primary Key 1번부터 Row by Row 방식으로 데이터를 처리하게 되죠.

    아래의 두 링크에서 incrementing 방식의 JdbcSourceConnector 의 동작을 확인할 수 있습니다.

     

    https://westlife0615.tistory.com/903

     

    [Kafka Connect] JdbcSourceConnector 구현해보기 (1)

    - 목차 들어가며.이번 글에서는 Kafka Connect 의 JdbcSourceConnector 에 대해서 간단히 알아보는 시간을 가지겠습니다.JdbcSourceConnector 를 실행하기 위해서 Kafka Broker, Zookeeper, Connect Worker 와 MySQL 등 여러

    westlife0615.tistory.com

    https://westlife0615.tistory.com/904

     

    [Kafka Connect] JdbcSourceConnector table.whitelist 알아보기

    - 목차 들어가며.이번 글에서는 JdbcSourceConnector 의 table.whitelist 속성에 대해서 알아보도록 하겠습니다.JdbcSourceConnector 는 Kafka Connect 의 Source Connector 로써 외부 저장소로부터 데이터를 추출하는 작

    westlife0615.tistory.com

     

     

    반면 bulk 모드로 동작하는 JdbcSourceConnector 는 그 동작 방식이 약간 다릅니다.

    Table 또는 특정 Query 에 해당하는 데이터들을 주기적으로 계속 조회하고 처리하게 됩니다.

    예를 들어, 테이블의 모든 데이터 수를 추출하는 Query 를 만들고 이 Query 를 bulk 방식으로 추출할 수 있습니다.

    아래는 테이블의 통계를 추출하는 임의의 쿼리이구요.

    JdbcSourceConnector 는 주기적으로 아래의 통계 쿼리를 실행하여 그 결과값을 Kafka Topic 으로 생성할 수 있습니다.

     

    SELECT 
        COUNT(*) AS total_rows,
        MIN(sales) AS min_sales,
        MAX(sales) AS max_sales,
        AVG(sales) AS avg_sales
    FROM sales_data;

     

    이러한 시나리오는 주기적인 통계 결과값을 노출해야하는 관리자 페이지에서 활용이 가능합니다.

     

    bulk mode 는 모든 데이터를 한번에 조회하기 때문에 table.whitelist 에 적용된 모든 테이블의 데이터를 추출하는 건 비효율적일 수 있습니다.

    따라서 bulk mode 는 query 속성과 함께 사용하곤 합니다.

     

    카프카 환경 구축하기.

    아래의 링크는 카프카 환경을 구축하기 위한 Docker Compose Yaml 파일이 존재합니다.

    이를 통해서 간단한 카프카 환경을 구축 후 JdbcSourceConnector bulk mode 에 대한 여러 테스트를 수행하겠습니다.

     

    https://westlife0615.tistory.com/91

     

    [Kafka] Kafka Cluster 구축을 위한 docker-compose 파일

    - 목차 들어가며.이번 글은 Kafka 와 관련한 여러 테스트를 손쉽게 진행하기 위해서 Kafka 관련 Docker Compose File 들을 기록합니다.  single kafka & single zookeeper & single kafka-connect & kafka-ui .1개의 Kakfa Brok

    westlife0615.tistory.com

     

    위의 링크의 내용에 포함된 docker-compose.yaml 파일을 실행하면 아래와 같이 Kafka Cluster 와 Connect 를 실행할 수 있습니다.

     

     

    MySQL 컨테이너 실행하기.

    JdbcSourceConnectorSource Database 로 사용된 MySQL 컨테이너를 실행합니다.

    아래의 코드들은 MySQL Container 를 실행하기 위한 초기화 sql 파일과 Docker 명령어입니다.

     

    cat <<'EOF'> /tmp/init.sql
    CREATE DATABASE base;
    USE base;
    
    CREATE TABLE users (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        gender ENUM('Male', 'Female') NOT NULL,
        age INT NOT NULL
    );
    
    DELIMITER $$
    
    CREATE PROCEDURE GenerateUsers()
    BEGIN
        DECLARE i INT DEFAULT 1;
        DECLARE gender_choice ENUM('Male', 'Female');
    
        WHILE i <= 10000 DO
            SET gender_choice = IF(MOD(i, 2) = 0, 'Male', 'Female');
            
            INSERT INTO users (name, gender, age)
            VALUES (
                CONCAT('User', i),
                gender_choice,
                FLOOR(RAND() * (60 - 18 + 1)) + 18
            );
    
            SET i = i + 1;
        END WHILE;
    END$$
    
    DELIMITER ;
    
    CALL GenerateUsers();
    EOF

     

    docker run --platform linux/amd64 --network kafka -d \
      --name mysql --hostname mysql \
      --mount type=bind,source=/tmp/init.sql,target=/docker-entrypoint-initdb.d/init.sql \
      -e MYSQL_ROOT_PASSWORD=1234 \
      mysql:8.0.23

     

    위 명령어의 실행이 마무리되면 아래와 같이 users 테이블과 Row 들이 생성됩니다.

     

    select * from users order by rand() limit 10;
    +------+----------+--------+-----+
    | id   | name     | gender | age |
    +------+----------+--------+-----+
    | 7221 | User7221 | Female |  43 |
    |  944 | User944  | Male   |  18 |
    | 4843 | User4843 | Female |  59 |
    | 8160 | User8160 | Male   |  21 |
    |  373 | User373  | Female |  49 |
    | 8937 | User8937 | Female |  35 |
    | 9414 | User9414 | Male   |  48 |
    |  418 | User418  | Male   |  54 |
    | 8034 | User8034 | Male   |  34 |
    | 8439 | User8439 | Female |  36 |
    +------+----------+--------+-----+
    10 rows in set (0.02 sec)

     

    이 모든 과정이 마무리되면 아래의 Docker Desktop 과 같은 컨테이너들이 생성됩니다.

     

    query 와 table.whitelist ?

    JdbcSourceConnector 는 데이터 소스의 테이블을 지정하는 방식을 두가지가 존재합니다.

    하나는 table.whitelist 속성을 사용하여 특정 테이블의 모든 Row 들을 one-by-one 으로 조회합니다.

    그리고 query 속성을 통해서 지정된 쿼리의 결과를 조회할 수 있습니다.

     

    querytable.whitelist 는 데이터 소스를 어떻게 조회할건지 결정하는 방식이기에 함꼐 사용할 수 없습니다.

    그리고 일반적으로 table.whitelist 는 incrementing 방식과 함께 사용하며, query 속성은 bulk mode 로 사용하곤 합니다.

     

    query 속성을 통해서 복잡한 Aggregation Query 나 Join Query 를 적용할 수 있습니다.

    이어지는 내용에서 관련된 예시들을 테스트해보도록 하겠습니다.

     

    bulk mode 사용해보기.

    아래의 명령어는 실행 중인 Kafka Connect Worker 에게 "bulk-connector" 라는 이름의 Kafka Connector 를 생성하는 명령어입니다.

     

    docker exec connect-1 curl -X POST -H "Content-Type: application/json" \
      --data '{
          "name": "bulk-connector",
          "config": {
              "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
              "tasks.max": "1",
              "connection.url": "jdbc:mysql://mysql:3306/base",
              "connection.user": "root",
              "connection.password": "1234",
              "query": "select count(*) as cnt from users",
              "mode": "bulk",
              "topic.prefix": "statistics-",
              "poll.interval.ms": "5000"
          }
      }' \
      http://localhost:8083/connectors

     

    위 명령어가 실행되면 connect-status Topic 은 아래와 같은 Record 가 추가됩니다.

    생성한 bulk-connector 의 0 번 Task 가 RUNNING 상태가 됩니다.

     

     

    그리고 topic.prefix 로 지정한 "statistics-" 토픽에 아래와 같은 Query 의 결과값이 저장됩니다.

    Polling 주기를 5초로 설정하였기 때문에 5초마다 Record 가 생성됩니다.

    이렇게 Query 의 결과값을 지속적으로 추적할 수 있습니다.

     

     

     

    참고로 아래의 명령어는 생성된 "bulk-connector" 의 상세 정보를 확인하는 명령어입니다.

     

    docker exec connect-1 \
    	curl -X GET http://localhost:8083/connectors/bulk-connector
    {
      "name": "bulk-connector",
      "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "mode": "bulk",
        "topic.prefix": "statistics-",
        "connection.password": "1234",
        "tasks.max": "1",
        "connection.user": "root",
        "query": "select count(*) as cnt from users",
        "poll.interval.ms": "5000",
        "name": "bulk-connector",
        "connection.url": "jdbc:mysql://mysql:3306/base"
      },
      "tasks": [],
      "type": "source"
    }

     

    Connector 를 종료하는 명령어는 아래와 같습니다.

     

    docker exec connect-1 \
    	curl -X DELETE http://localhost:8083/connectors/bulk-connector

     

     

    복잡한 Query 사용해보기.

    아래와 같은 성별과 연령에 대한 통계 쿼리를 주기적으로 실행하는 Kafka Connector 를 구성해보겠습니다.

     

    SELECT 
        gender,
        CASE 
            WHEN age BETWEEN 0 AND 18 THEN '0-18'
            WHEN age BETWEEN 19 AND 35 THEN '19-35'
            WHEN age BETWEEN 36 AND 60 THEN '36-60'
            ELSE '61+'
        END AS age_group,
        COUNT(*) AS total_count
    FROM 
        users
    GROUP BY 
        gender, age_group

     

    위 쿼리에 대한 Connector 를 실행하는 명령어는 아래와 같습니다.

     

    
    docker exec connect-1 curl -X POST -H "Content-Type: application/json" \
      --data '{
          "name": "gender-age-statistics-connector",
          "config": {
              "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
              "tasks.max": "1",
              "connection.url": "jdbc:mysql://mysql:3306/base",
              "connection.user": "root",
              "connection.password": "1234",
              "query": "SELECT gender, age_group, COUNT(*) as total from (SELECT gender, CASE WHEN age BETWEEN 0 AND 18 THEN '0-18' WHEN age BETWEEN 19 AND 35 THEN '19-35' WHEN age BETWEEN 36 AND 60 THEN '36-60' ELSE '61+' END AS age_group FROM users ) as t GROUP BY t.gender, t.age_group",
              "mode": "bulk",
              "topic.prefix": "gender-age-statistics",
              "poll.interval.ms": "5000"
          }
      }' \
      http://localhost:8083/connectors

     

     

    위의 커넥터의 실행 후에 "gender-age-statistics" Topic 이 생성됩니다.

     

     

    그리고 아래의 이미지와 같은 형식으로 Aggregation Query 의 결과로 조회되는 Row 들이 Topic 으로 전달됩니다.

     

     

     

    반응형
Designed by Tistory.