ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka-Connect] Debezium MySQL Connector 구현하기
    Kafka 2024. 2. 18. 07:04
    728x90
    반응형

     

    - 목차

     

    들어가며.

    이번 글에서는 간단하게 Debezium MySQL Connector 을 활용하여 MySQL CDC 를 구현하는 방법에 대해서 알아보려고 합니다.

     

    MySQL Docker Container 실행하기.

    먼저 MySQL Docker Container 를 실행합니다.

    test_db 데이터베이스와 test_table 테이블을 생성합니다.

    그리고 Replication 을 수행할 user 인 kafkaconnect 를 생성합니다.

     

    cat <<EOF> /tmp/init.sql
    create database test_db;
    use test_db;
    create table test_db.test_table (id int not null auto_increment, name varchar(32), PRIMARY KEY (id));
    insert test_db.test_table(name) values ('Andy'), ('Bob'), ('Chris'), ('Daniel');
    CREATE USER 'kafkaconnect'@'%' IDENTIFIED WITH mysql_native_password BY 'debezium';
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'kafkaconnect';
    FLUSH PRIVILEGES;
    EOF

     

    < MySQL Docker Run >

    docker run -d --name mysql -e MYSQL_ROOT_PASSWORD=1234 -p 3306:3306 \
    --mount type=bind,source=/tmp/init.sql,target=/docker-entrypoint-initdb.d/init.sql \
    mysql:8.0.23

     

    < MySQL Shell 에서 Row 조회 >

    docker exec -it mysql mysql -uroot -p1234
    select * from test_db.test_table;
    +----+--------+
    | id | name   |
    +----+--------+
    |  1 | Andy   |
    |  2 | Bob    |
    |  3 | Chris  |
    |  4 | Daniel |
    +----+--------+
    4 rows in set (0.06 sec)

     

    Debezium Kafka Connect 구현하기.

    먼저 Kafka Cluster 가 구축되어야합니다.

    아래 링크는 Docker 를 사용하여 Kafka Cluster 를 구축하는 간단한 내용을 담고 있습니다.

     

    https://westlife0615.tistory.com/474

     

    Docker 로 Kafka Cluster 구축해보기.

    - 목차 소개. 저는 로컬 환경에서 카프카 관련 테스트를 진행하는 경우가 많이 생기더군요. 그래서 docker-compose 를 활용하여 Kafka, ZooKeeper 클러스터를 구축하는 내용을 작성하려고 합니다. docker-com

    westlife0615.tistory.com

     

    Kafka Cluster 가 구축된다면, Kafka Connect 를 실행해야합니다.

     

    < kafka-connect.properties >

    cat <<EOF> /tmp/kafka-connect.properties
    bootstrap.servers=host.docker.internal:29191,host.docker.internal:29192,host.docker.internal:29193
    group.id=mysql-connect-cluster
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=3
    config.storage.topic=connect-configs
    config.storage.replication.factor=3
    status.storage.topic=connect-status
    status.storage.replication.factor=3
    offset.flush.interval.ms=10000
    plugin.path=/usr/share/java,/usr/share/confluent-hub-components
    rest.port=8083
    EOF

     

    < Connector Config >

    cat <<EOF> /tmp/config.json
    {
      "name": "test-connector",
      "config": {
        "name": "test-connector",
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "host.docker.internal",
        "database.port": "3306",
        "database.user": "kafkaconnect",
        "database.password": "debezium",
        "database.server.id": "12345",
        "database.server.name": "dbserver1",
        "database.whitelist": "test_db",
        "database.allowPublicKeyRetrieval":"true",    
        "snapshot.mode": "when_needed",
        "include.schema.changes": "false",
        "topic.prefix": "test_db.test_table",
        "table.include.list": "test_db.test_table",
        "skip.messages.without.change": "true",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "snapshot.locking.mode": "none"
      }
    }
    EOF

     

    < Kafka Connect 실행하기 >

    docker run -d --name kafka-connect \
    -p 8083:8083 \
    --mount type=bind,source=/tmp/kafka-connect.properties,target=/etc/kafka/kafka-connect.properties,readonly \
    --mount type=bind,source=/tmp/config.json,target=/tmp/config.json \
    confluentinc/cp-kafka-connect-base:7.6.0 \
    connect-distributed /etc/kafka/kafka-connect.properties

     

    < Debezium Connector 설치하기 >

    docker exec -it kafka-connect confluent-hub install --no-prompt debezium/debezium-connector-mysql:2.2.1

     

     

    위 과정을 마치게 되면 Kafka Connect 컨테이너에 Debezium Connector 가 설치됩니다.

    이제 kafka-connect 를 실행합니다.

    docker restart kafka-connect

     

     

    Kafka Connect Restful APIs.

     

    < 플러그인 조회하기 >

    docker exec -it kafka-connect curl -X GET --location 'http://localhost:8083/connector-plugins'
    [
      {
        "class": "io.debezium.connector.mysql.MySqlConnector",
        "type": "source",
        "version": "2.2.1.Final"
      },
      {
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "7.6.0-ccs"
      },
      {
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source",
        "version": "7.6.0-ccs"
      },
      {
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "7.6.0-ccs"
      }
    ]

     

     

    < Kafka Connector 실행하기 >

    docker exec -it kafka-connect curl -X POST --location 'http://localhost:8083/connectors?expand=status&expand=info' \
    --header 'Content-Type: application/json' \
    --header 'Accept: application/json' \
    --data '@/tmp/config.json'

     

    < Kafka Connector 재실행하기 >

    docker exec -it kafka-connect curl -X POST --location 'http://localhost:8083/connectors/test-connector/restart' \
    --header 'Content-Type: application/json' \
    --header 'Accept: application/json'

     

    < Kafka Connector 재실행하기 >

    docker exec -it kafka-connect curl -X GET --location 'http://localhost:8083/connectors/test-connector/status'

     

    < Kafka Connector 삭제하기 >

    docker exec -it kafka-connect curl -X DELETE --location 'http://localhost:8083/connectors/test-connector'

     

     

    Debezium Connector 의 설정.

    tasks.max

    tasks.max 설정은 Kafka Connect Cluster 의 인스턴스의 갯수를 지정하는 설정입니다.

     

     

    database.server.id

    MySQL 을 Primary - Replica 구조로 Replication 을 수행하게 될 때에 각 MySQL 인스턴스에 server-id 를 설정합니다.

    예를 들어 Primary MySQL 의 my.cnf 에 server-id = 1, Replica MySQL 의 my.cnf 에 server-id = 2 를 설정하여

    각 MySQL 인스턴스에 서버 아이디를 부여할 수 있습니다.

    이처럼 Debezium Connector 는 MySQL 입장에서 하나의 Replication 을 수행하는 Slave 처럼 동작하게 되는데,

    유니크한 database.server.id 를 부여하여 정상적인 Replication 을 수행할 수 있도록 합니다.

     

    database.user

    Debezium Connector 는 CDC 를 수행하기 위해서 MySQL 의 binlog 에 접근해야합니다.

    이러한 접근을 위해서 user & password 기반의 인증 절차가 필요하며, 이 과정에서 사용할 user 정보를 입력합니다.

     

    Debezium Kafka Connector 가 MySQL 에 연결된 이후에 processlist 를 조회하게 되면

    아래와 같이 database.user 에 설정된 database.user 정보가 확인됩니다.

    show processlist;
    +----+-----------------+--------------------+---------+-------------+------+---------------------------------------------------------------+------------------+
    | Id | User            | Host               | db      | Command     | Time | State                                                         | Info             |
    +----+-----------------+--------------------+---------+-------------+------+---------------------------------------------------------------+------------------+
    |  5 | event_scheduler | localhost          | NULL    | Daemon      | 1935 | Waiting on empty queue                                        | NULL             |
    |  8 | root            | localhost          | test_db | Query       |    0 | init                                                          | show processlist |
    | 22 | kafkaconnect    | 192.168.65.1:27552 | test_db | Sleep       |  909 |                                                               | NULL             |
    | 23 | kafkaconnect    | 192.168.65.1:27553 | NULL    | Binlog Dump |  909 | Master has sent all binlog to slave; waiting for more updates | NULL             |
    +----+-----------------+--------------------+---------+-------------+------+---------------------------------------------------------------+------------------+
    4 rows in set (0.00 sec)

     

    database.password

    MySQL 접근 인증을 위해서 database.user 와 함께 사용할 password 정보를 입력할 수 있습니다.

     

    database.hostname

    database.hostname 옵션은 Debezium Connector 와 연결할 MySQL 서버의 IP 또는 도메인 이름을 지정하는 설정입니다.

     

    database.server.name

    database.server.name 은 Kafka Connect 에서 사용할 고유한 이름으로,

    Debezium Connector 와 연결할 MySQL 서버를 지칭하는 논리적 이름이 사용됩니다.

    database.hostname 으로 사용된 IP 값은 인간친화적인 표현법이 아니므로 이를 대체합니다.

     

    database.server.name 은 Debezium Kafka Connector 와 관련된 Topic 의 Prefix 로 사용됩니다.

    그래서 일종의 Namespace 로 사용됩니다.

     

     

    database.include.list

    Change Data Capture (CDC) 를 수행할 MySQL 의 database 목록을 설정할 수 있습니다.

    MySQL 에 db1, db2, db3, ... , dbN 인 database 들이 존재할 때,

    CDC 를 수행할 database 가 db1, db2 라면 database.include.list 는 db1 과 db2 가 됩니다.

     

    database.exclude.list

    database.include.list 의 반대 개념으로 CDC 모니터링의 대상에서 제외시키는 설정입니다.

    모니터링해야할 database 의 수가 많고 제외시켜야할 수가 적은 경우에 database.exclude.list 를 사용하는 것이 효율적입니다.

     

    table.include.list

    table.include.list 는 이름처럼 CDC 모니터링 대상인 table 을 지정하는 설정입니다.

     

    table.exclude.list

    table.include.list 와 반대되는 개념으로 CDC 모니터링에서 제외시킬 Table 목록을 설정할 수 있습니다.

     

     

    반응형
Designed by Tistory.