-
[Kafka-Connect] Debezium MySQL Connector 구현하기Kafka 2024. 2. 18. 07:04728x90반응형
- 목차
들어가며.
이번 글에서는 간단하게 Debezium MySQL Connector 을 활용하여 MySQL CDC 를 구현하는 방법에 대해서 알아보려고 합니다.
MySQL Docker Container 실행하기.
먼저 MySQL Docker Container 를 실행합니다.
test_db 데이터베이스와 test_table 테이블을 생성합니다.
그리고 Replication 을 수행할 user 인 kafkaconnect 를 생성합니다.
cat <<EOF> /tmp/init.sql create database test_db; use test_db; create table test_db.test_table (id int not null auto_increment, name varchar(32), PRIMARY KEY (id)); insert test_db.test_table(name) values ('Andy'), ('Bob'), ('Chris'), ('Daniel'); CREATE USER 'kafkaconnect'@'%' IDENTIFIED WITH mysql_native_password BY 'debezium'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'kafkaconnect'; FLUSH PRIVILEGES; EOF
< MySQL Docker Run >
docker run -d --name mysql -e MYSQL_ROOT_PASSWORD=1234 -p 3306:3306 \ --mount type=bind,source=/tmp/init.sql,target=/docker-entrypoint-initdb.d/init.sql \ mysql:8.0.23
< MySQL Shell 에서 Row 조회 >
docker exec -it mysql mysql -uroot -p1234
select * from test_db.test_table;
+----+--------+ | id | name | +----+--------+ | 1 | Andy | | 2 | Bob | | 3 | Chris | | 4 | Daniel | +----+--------+ 4 rows in set (0.06 sec)
Debezium Kafka Connect 구현하기.
먼저 Kafka Cluster 가 구축되어야합니다.
아래 링크는 Docker 를 사용하여 Kafka Cluster 를 구축하는 간단한 내용을 담고 있습니다.
https://westlife0615.tistory.com/474
Kafka Cluster 가 구축된다면, Kafka Connect 를 실행해야합니다.
< kafka-connect.properties >
cat <<EOF> /tmp/kafka-connect.properties bootstrap.servers=host.docker.internal:29191,host.docker.internal:29192,host.docker.internal:29193 group.id=mysql-connect-cluster key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false offset.storage.topic=connect-offsets offset.storage.replication.factor=3 config.storage.topic=connect-configs config.storage.replication.factor=3 status.storage.topic=connect-status status.storage.replication.factor=3 offset.flush.interval.ms=10000 plugin.path=/usr/share/java,/usr/share/confluent-hub-components rest.port=8083 EOF
< Connector Config >
cat <<EOF> /tmp/config.json { "name": "test-connector", "config": { "name": "test-connector", "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "host.docker.internal", "database.port": "3306", "database.user": "kafkaconnect", "database.password": "debezium", "database.server.id": "12345", "database.server.name": "dbserver1", "database.whitelist": "test_db", "database.allowPublicKeyRetrieval":"true", "snapshot.mode": "when_needed", "include.schema.changes": "false", "topic.prefix": "test_db.test_table", "table.include.list": "test_db.test_table", "skip.messages.without.change": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "snapshot.locking.mode": "none" } } EOF
< Kafka Connect 실행하기 >
docker run -d --name kafka-connect \ -p 8083:8083 \ --mount type=bind,source=/tmp/kafka-connect.properties,target=/etc/kafka/kafka-connect.properties,readonly \ --mount type=bind,source=/tmp/config.json,target=/tmp/config.json \ confluentinc/cp-kafka-connect-base:7.6.0 \ connect-distributed /etc/kafka/kafka-connect.properties
< Debezium Connector 설치하기 >
docker exec -it kafka-connect confluent-hub install --no-prompt debezium/debezium-connector-mysql:2.2.1
위 과정을 마치게 되면 Kafka Connect 컨테이너에 Debezium Connector 가 설치됩니다.
이제 kafka-connect 를 실행합니다.
docker restart kafka-connect
Kafka Connect Restful APIs.
< 플러그인 조회하기 >
docker exec -it kafka-connect curl -X GET --location 'http://localhost:8083/connector-plugins'
[ { "class": "io.debezium.connector.mysql.MySqlConnector", "type": "source", "version": "2.2.1.Final" }, { "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", "type": "source", "version": "7.6.0-ccs" }, { "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", "type": "source", "version": "7.6.0-ccs" }, { "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "type": "source", "version": "7.6.0-ccs" } ]
< Kafka Connector 실행하기 >
docker exec -it kafka-connect curl -X POST --location 'http://localhost:8083/connectors?expand=status&expand=info' \ --header 'Content-Type: application/json' \ --header 'Accept: application/json' \ --data '@/tmp/config.json'
< Kafka Connector 재실행하기 >
docker exec -it kafka-connect curl -X POST --location 'http://localhost:8083/connectors/test-connector/restart' \ --header 'Content-Type: application/json' \ --header 'Accept: application/json'
< Kafka Connector 재실행하기 >
docker exec -it kafka-connect curl -X GET --location 'http://localhost:8083/connectors/test-connector/status'
< Kafka Connector 삭제하기 >
docker exec -it kafka-connect curl -X DELETE --location 'http://localhost:8083/connectors/test-connector'
Debezium Connector 의 설정.
tasks.max
tasks.max 설정은 Kafka Connect Cluster 의 인스턴스의 갯수를 지정하는 설정입니다.
database.server.id
MySQL 을 Primary - Replica 구조로 Replication 을 수행하게 될 때에 각 MySQL 인스턴스에 server-id 를 설정합니다.
예를 들어 Primary MySQL 의 my.cnf 에 server-id = 1, Replica MySQL 의 my.cnf 에 server-id = 2 를 설정하여
각 MySQL 인스턴스에 서버 아이디를 부여할 수 있습니다.
이처럼 Debezium Connector 는 MySQL 입장에서 하나의 Replication 을 수행하는 Slave 처럼 동작하게 되는데,
유니크한 database.server.id 를 부여하여 정상적인 Replication 을 수행할 수 있도록 합니다.
database.user
Debezium Connector 는 CDC 를 수행하기 위해서 MySQL 의 binlog 에 접근해야합니다.
이러한 접근을 위해서 user & password 기반의 인증 절차가 필요하며, 이 과정에서 사용할 user 정보를 입력합니다.
Debezium Kafka Connector 가 MySQL 에 연결된 이후에 processlist 를 조회하게 되면
아래와 같이 database.user 에 설정된 database.user 정보가 확인됩니다.
show processlist; +----+-----------------+--------------------+---------+-------------+------+---------------------------------------------------------------+------------------+ | Id | User | Host | db | Command | Time | State | Info | +----+-----------------+--------------------+---------+-------------+------+---------------------------------------------------------------+------------------+ | 5 | event_scheduler | localhost | NULL | Daemon | 1935 | Waiting on empty queue | NULL | | 8 | root | localhost | test_db | Query | 0 | init | show processlist | | 22 | kafkaconnect | 192.168.65.1:27552 | test_db | Sleep | 909 | | NULL | | 23 | kafkaconnect | 192.168.65.1:27553 | NULL | Binlog Dump | 909 | Master has sent all binlog to slave; waiting for more updates | NULL | +----+-----------------+--------------------+---------+-------------+------+---------------------------------------------------------------+------------------+ 4 rows in set (0.00 sec)
database.password
MySQL 접근 인증을 위해서 database.user 와 함께 사용할 password 정보를 입력할 수 있습니다.
database.hostname
database.hostname 옵션은 Debezium Connector 와 연결할 MySQL 서버의 IP 또는 도메인 이름을 지정하는 설정입니다.
database.server.name
database.server.name 은 Kafka Connect 에서 사용할 고유한 이름으로,
Debezium Connector 와 연결할 MySQL 서버를 지칭하는 논리적 이름이 사용됩니다.
database.hostname 으로 사용된 IP 값은 인간친화적인 표현법이 아니므로 이를 대체합니다.
database.server.name 은 Debezium Kafka Connector 와 관련된 Topic 의 Prefix 로 사용됩니다.
그래서 일종의 Namespace 로 사용됩니다.
database.include.list
Change Data Capture (CDC) 를 수행할 MySQL 의 database 목록을 설정할 수 있습니다.
MySQL 에 db1, db2, db3, ... , dbN 인 database 들이 존재할 때,
CDC 를 수행할 database 가 db1, db2 라면 database.include.list 는 db1 과 db2 가 됩니다.
database.exclude.list
database.include.list 의 반대 개념으로 CDC 모니터링의 대상에서 제외시키는 설정입니다.
모니터링해야할 database 의 수가 많고 제외시켜야할 수가 적은 경우에 database.exclude.list 를 사용하는 것이 효율적입니다.
table.include.list
table.include.list 는 이름처럼 CDC 모니터링 대상인 table 을 지정하는 설정입니다.
table.exclude.list
table.include.list 와 반대되는 개념으로 CDC 모니터링에서 제외시킬 Table 목록을 설정할 수 있습니다.
반응형'Kafka' 카테고리의 다른 글
[Kafka] Partition 알아보기 (0) 2024.05.12 [Kafka] max.block.ms 알아보기 (0) 2024.02.18 [Kafka-Streams] Json 기반 Custom Serdes 구현하기 (0) 2024.02.17 [Kafka] Transaction Coordinator 알아보기 (0) 2024.02.07 [Kafka] Rebalance 가 발생하는 경우들 알아보기 ( Rebalance Scenario ) (0) 2024.02.04