ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka Connect] JdbcSourceConnector 구현해보기 (1)
    Kafka/kafka Connect 2024. 6. 17. 05:59
    반응형

    - 목차

     

    들어가며.

    이번 글에서는 Kafka ConnectJdbcSourceConnector 에 대해서 간단히 알아보는 시간을 가지겠습니다.

    JdbcSourceConnector 를 실행하기 위해서 Kafka Broker, Zookeeper, Connect WorkerMySQL 등 여러가지 요소들이 실행되어야 합니다.

    또한 Kafka Connect Worker 내부에 JdbcSourceConnector Plugin 도 추가되어야 하는데요.

    이번 글에선 Docker 환경을 활용해서 JdbcSourceConnector 를 실행해보는 간단한 실습을 진행해보려고 합니다.

     

    Kafka Broker 와 Zookeeper 실행하기.

    먼저 Kafka Broker 와 Zookeeper 를 Docker 를 통해서 실행해보도록 하겠습니다.

     

    우선 이번 실습에서 사용할 모든 Docker Container 들은 kafka 라는 이름의 Docker Network 내부에서 진행하도록 하겠습니다.

     

    docker network create kafka

     

    그리고 아래의 명령어를 실행하여 Kafka Broker 와 Zookeeper 그리고 Kafka-UI 구성요소들을 실행합니다.

     

    docker run -d --network kafka \
      --name zookeeper1 --hostname zookeeper1 \
      -e ZOOKEEPER_SERVER_ID=1 \
      -e ZOOKEEPER_CLIENT_PORT=2181 \
      --restart always \
      confluentinc/cp-zookeeper:7.8.0
    
    docker run -d --network kafka \
      --name kafka1 --hostname kafka1 \
      -e KAFKA_BROKER_ID=1 \
      -e KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181 \
      -e KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka1:9092 \
      -e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
      -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT \
      -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
      -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
      -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
      --restart always \
      confluentinc/cp-kafka:7.8.0
    
    docker run -d --network kafka \
      --name kafka-ui \
      -e DYNAMIC_CONFIG_ENABLED='true' \
      -e KAFKA_CLUSTERS_0_NAME=test1 \
      -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092 \
      --restart always \
      -p 8080:8080 \
      provectuslabs/kafka-ui:v0.7.2

     

    위 명령어가 실행되면 총 3개의 Docker Container 들이 실행됩니다.

    Kafka Broker 가 하나만 존재하긴 하지만, Kafka Cluster 의 중앙 저장소인 Zookeeper 를 실행하구요.

    Kafka Broker 와 모니터링을 위한 Kafka-UI 툴을 실행합니다.

     

     

    http://localhost:8080 주소를 통해서 아래의 이미지와 같은 Kafka UI 대시보드에 접속할 수 있습니다.

    이 환경에서 여러가지 KafkaAdmin 관리가 가능합니다.

     

     

     

    MySQL 실행하기.

    이제 MySQL 서버를 하나 실행합니다.

    JdbcSourceConnector 는 MySQL 에 존재하는 데이터를 읽어들여 Kafka Topic 으로 저장합니다.

    그리하여 데이터소스로써 MySQL Docker Container 가 실행되어야 합니다.

     

    아래의 2개의 코드는 MySQL Docker Container 를 초기화/실행 하기 위한 명령어입니다.

    먼저 init.sql 파일을 구성하며, init.sql 은 1만명의 Rows 를 가지는 users 테이블을 생성하는 초기화 sql 파일입니다.

    그리고 init.sql 파일을 바인딩하는 MySQL Container 를 실행합니다.

    MySQL Container 를 실행과 더불어 init.sql 파일을 실행하여 Database 와 Table 을 구성합니다.

     

    cat <<'EOF'> /tmp/init.sql
    CREATE DATABASE base;
    USE base;
    CREATE TABLE users (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        email VARCHAR(100) NOT NULL UNIQUE
    );
    
    DELIMITER $$
    
    CREATE PROCEDURE GenerateUsers()
    BEGIN
        DECLARE i INT DEFAULT 1;
    
        WHILE i <= 10000 DO
            INSERT INTO users (name, email)
            VALUES (CONCAT('User', i), CONCAT('user', i, '@example.com'));
            SET i = i + 1;
        END WHILE;
    END$$
    
    DELIMITER ;
    
    CALL GenerateUsers();
    EOF
    docker run --platform linux/amd64 --network kafka -d \
      --name mysql --hostname mysql \
      --mount type=bind,source=/tmp/init.sql,target=/docker-entrypoint-initdb.d/init.sql \
      -e MYSQL_ROOT_PASSWORD=1234 \
      mysql:8.0.23

     

    MySQL Docker Container 가 실행된 이후의 상황은 아래의 이미지와 같이 4개의 Container 들이 생성됩니다.

     

     

    그리고 생성된 MySQL Docker Container 내부에는 아래의 이미지와 같이 Table 과 Rows 들이 생성됩니다.

     

     

     

     

    Jdbc Connector Plugin 이 추가된 Docker Image 만들기.

    Kafka Connect Worker 는 Jdbc Connector Plugin 이 추가되어 있어야 합니다.

    Worker 내부에 Jdbc Connector Plugin 을 추가하는 방법은 여러가지가 있지만, 저는 Plugin 이 추가된 Docker Image 를 생성하겠습니다.

     

    우선 Dockerfile 은 아래와 같습니다.

    confluentinc/cp-kafka-connect-base 이미지를 Base 로 하여, confluent-hub CLI 를 통해서 Plugin 을 설치합니다.

    그리고 MySQL Jdbc Driver 또한 설치합니다.

     

    cat <<'EOF'> Dockerfile
    FROM confluentinc/cp-kafka-connect-base:7.8.0
    
    ENV CONNECT_PLUGIN_PATH="/usr/share/java"
    
    RUN confluent-hub install --no-prompt --component-dir /usr/share/java confluentinc/kafka-connect-jdbc:10.7.4
    RUN curl -L -o /usr/share/java/confluentinc-kafka-connect-jdbc/lib/mysql-connector-java-8.0.23.jar \
        https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.23/mysql-connector-java-8.0.23.jar
    
    
    EXPOSE 8083
    
    CMD ["/etc/confluent/docker/run"]
    EOF

     

    그리고 이미지를 빌드합니다.

    이미지의 이름은 kafka-jdbc-connect 라고 지어보았습니다.

     

    docker build -t kafka-jdbc-connect .

     

    빌드된 kafka-jdbc-connect 를 활용하여 1개의 Connect Worker 를 실행합니다.

     

    docker run -d --network kafka \
      --name connect-1 --hostname connect-1 \
      -e CONNECT_BOOTSTRAP_SERVERS=kafka1:9092 \
      -e CONNECT_REST_ADVERTISED_HOST_NAME=connect-1 \
      -e CONNECT_GROUP_ID=connect-cluster \
      -e CONNECT_CONFIG_STORAGE_TOPIC=connect-configs \
      -e CONNECT_OFFSET_STORAGE_TOPIC=connect-offsets \
      -e CONNECT_STATUS_STORAGE_TOPIC=connect-status \
      -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
      -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
      -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
      -e CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      -e CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      -e CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      -e CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
      kafka-jdbc-connect:latest

     

    총 5개의 Docker Container 들을 Docker Desktop 에서 확인할 수 있습니다.

     

     

    생선된 Kafka Connect Topic 들 확인해보기.

    Kafka Connect 가 실행되면 connect-status, connect-configs, connect-offsets 토픽이 생성됩니다.

    이렇게 생성된 Connect Topic 들이 확인된다면 Kafka Connect Worker 가 정상 실행됨을 알 수 있습니다.

     

     

     

    JdbcSourceConnector 실행하기.

    이제 JdbcSourceConnector 를 실행시키기 위한 환경 설정이 마무리되었습니다.

    한번 JdbcSourceConnector 를 실행해보도록 하겠습니다.

    아래의 curl 명령어를 통해서 Connector 의 생성 및 실행이 가능합니다.

    Kafka Connect 는 HTTP Restful 형식으로 커넥터의 조회 및 수정이 가능합니다.

     

    docker exec connect-1 curl -X POST -H "Content-Type: application/json" \
      --data '{
          "name": "jdbc-source-connector",
          "config": {
              "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
              "tasks.max": "1",
              "connection.url": "jdbc:mysql://mysql:3306/base",
              "connection.user": "root",
              "connection.password": "1234",
              "table.whitelist": "users",
              "mode": "incrementing",
              "incrementing.column.name": "id",
              "topic.prefix": "mysql-",
              "poll.interval.ms": "5000"
          }
      }' \
      http://localhost:8083/connectors

     

     

    실행 중인 JdbcSourceConnector 확인하는 방법.

    JdbcSourceConnector 의 실행을 확인하는 방법들에 대해서 설명하도록 하겠습니다.

     

    connect-config Topic 확인하기.

    connect-config Topic 은 Connector 와 Task 의 정보가 기록됩니다.

    아래의 이미지는 connect-config Topic 에 삽입된 하나의 Record 의 정보를 캡쳐한 내용입니다.

    해당하는 Record 는 connector-jdbc-source-connector 라는 Key 를 가지며,

    Value 는 Connector 의 설정과 동일합니다.

     

     

    그리고 Kafka Connector 는 tasks.max 설정 값에 해당하는 값만큼 여러개의 Task 를 생성할 수 있습니다.

    저의 경우에는 1개의 Worker 만을 사용하며 tasks.max 의 값 또한 1이기 때문에 1개의 Tasks 가 생성됩니다.

    이 Task 에 대한 설정 내용 또한 connect-configs 토픽에 저장됩니다.

    아래의 내용은 Key : task-jdbc-source-connector-0 에 해당하는 Record 에 대한 정보입니다.

     

     

     

    즉, connect-config 토픽을 통해서 어떤 Connector 가 생성되었으며,  Connector 는 어떻게 Task 분배가 되었는지를 확인할 수 있습니다.

    connect-status Topic 확인하기.

    JdbcSourceConnector 가 실행된 이후에 connect-status Topic 은 현재 Task 의 진행 상태가 나열됩니다.

    connect-status 토픽의 레코드들이 Task 의 상태를 의미한다고 생각하시면 됩니다.

    처음에는 RUNNING 인 Record 가 삽입됩니다.

    그리고 MySQL Source 를 Polling 하는 과정에 대한 기록이 추가됩니다.

     

     

     

    Restful API 사용하기.

    Worker 내부에서 Connector 가 실행 중인지 확인할 수 있는 가장 정확한 방법은 Restful API 를 사용하는 것입니다.

     

    아래의 HTTP GET 요청은 생성된 Connector 목록을 확인하는 Find List API 입니다.

    docker exec connect-1 curl -X GET http://localhost:8083/connectors
    [
      "jdbc-source-connector"
    ]

     

    그리고 아래의 HTTP GET 요청은 특정 Connector 의 상세 정보를 확인할 수 있는 FindOne API 입니다.

     

    docker exec connect-1 curl -X GET http://localhost:8083/connectors/jdbc-source-connector
    {
      "name": "jdbc-source-connector",
      "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "topic.prefix": "mysql-",
        "connection.password": "1234",
        "tasks.max": "1",
        "connection.user": "root",
        "poll.interval.ms": "5000",
        "name": "jdbc-source-connector",
        "connection.url": "jdbc:mysql://mysql:3306/base",
        "table.whitelist": "users"
      },
      "tasks": [
        {
          "connector": "jdbc-source-connector",
          "task": 0
        }
      ],
      "type": "source"
    }

     

     

     

    Kafka Topic 에 생성된 데이터 확인.

    JdbcSourceConnector 는 외부 데이터 소스에서 데이터를 조회하여 Kafka Topic 으로 데이터를 삽입합니다.

    Connector  설정 중에서 topic.prefix 의 설정 값을 기준으로 Sink Topic 이 자동으로 생성됩니다.

    저는 topic.prefix 의 값을 mysql 으로 설정하였고, 따라서 mysql-users 라는 Topic 으로 데이터가 생성됩니다.

     

     

    마치며.

    이번 글에서는 JdbcSourceConnector 를 실행하는 방법에 대해서 간단히 알아보았습니다.

    이어지는 컨텐츠에서는 Jdbc Connector 에 대한 심도 있는 주제들을 다루어보도록 하겠습니다.

     

     

    반응형
Designed by Tistory.