-
[Kafka Connect] JdbcSourceConnector 구현해보기 (1)Kafka/kafka Connect 2024. 6. 17. 05:59반응형
- 목차
들어가며.
이번 글에서는 Kafka Connect 의 JdbcSourceConnector 에 대해서 간단히 알아보는 시간을 가지겠습니다.
JdbcSourceConnector 를 실행하기 위해서 Kafka Broker, Zookeeper, Connect Worker 와 MySQL 등 여러가지 요소들이 실행되어야 합니다.
또한 Kafka Connect Worker 내부에 JdbcSourceConnector Plugin 도 추가되어야 하는데요.
이번 글에선 Docker 환경을 활용해서 JdbcSourceConnector 를 실행해보는 간단한 실습을 진행해보려고 합니다.
Kafka Broker 와 Zookeeper 실행하기.
먼저 Kafka Broker 와 Zookeeper 를 Docker 를 통해서 실행해보도록 하겠습니다.
우선 이번 실습에서 사용할 모든 Docker Container 들은 kafka 라는 이름의 Docker Network 내부에서 진행하도록 하겠습니다.
docker network create kafka
그리고 아래의 명령어를 실행하여 Kafka Broker 와 Zookeeper 그리고 Kafka-UI 구성요소들을 실행합니다.
docker run -d --network kafka \ --name zookeeper1 --hostname zookeeper1 \ -e ZOOKEEPER_SERVER_ID=1 \ -e ZOOKEEPER_CLIENT_PORT=2181 \ --restart always \ confluentinc/cp-zookeeper:7.8.0 docker run -d --network kafka \ --name kafka1 --hostname kafka1 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181 \ -e KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka1:9092 \ -e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \ -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \ --restart always \ confluentinc/cp-kafka:7.8.0 docker run -d --network kafka \ --name kafka-ui \ -e DYNAMIC_CONFIG_ENABLED='true' \ -e KAFKA_CLUSTERS_0_NAME=test1 \ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092 \ --restart always \ -p 8080:8080 \ provectuslabs/kafka-ui:v0.7.2
위 명령어가 실행되면 총 3개의 Docker Container 들이 실행됩니다.
Kafka Broker 가 하나만 존재하긴 하지만, Kafka Cluster 의 중앙 저장소인 Zookeeper 를 실행하구요.
Kafka Broker 와 모니터링을 위한 Kafka-UI 툴을 실행합니다.
http://localhost:8080 주소를 통해서 아래의 이미지와 같은 Kafka UI 대시보드에 접속할 수 있습니다.
이 환경에서 여러가지 KafkaAdmin 관리가 가능합니다.
MySQL 실행하기.
이제 MySQL 서버를 하나 실행합니다.
JdbcSourceConnector 는 MySQL 에 존재하는 데이터를 읽어들여 Kafka Topic 으로 저장합니다.
그리하여 데이터소스로써 MySQL Docker Container 가 실행되어야 합니다.
아래의 2개의 코드는 MySQL Docker Container 를 초기화/실행 하기 위한 명령어입니다.
먼저 init.sql 파일을 구성하며, init.sql 은 1만명의 Rows 를 가지는 users 테이블을 생성하는 초기화 sql 파일입니다.
그리고 init.sql 파일을 바인딩하는 MySQL Container 를 실행합니다.
MySQL Container 를 실행과 더불어 init.sql 파일을 실행하여 Database 와 Table 을 구성합니다.
cat <<'EOF'> /tmp/init.sql CREATE DATABASE base; USE base; CREATE TABLE users ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100) NOT NULL, email VARCHAR(100) NOT NULL UNIQUE ); DELIMITER $$ CREATE PROCEDURE GenerateUsers() BEGIN DECLARE i INT DEFAULT 1; WHILE i <= 10000 DO INSERT INTO users (name, email) VALUES (CONCAT('User', i), CONCAT('user', i, '@example.com')); SET i = i + 1; END WHILE; END$$ DELIMITER ; CALL GenerateUsers(); EOF
docker run --platform linux/amd64 --network kafka -d \ --name mysql --hostname mysql \ --mount type=bind,source=/tmp/init.sql,target=/docker-entrypoint-initdb.d/init.sql \ -e MYSQL_ROOT_PASSWORD=1234 \ mysql:8.0.23
MySQL Docker Container 가 실행된 이후의 상황은 아래의 이미지와 같이 4개의 Container 들이 생성됩니다.
그리고 생성된 MySQL Docker Container 내부에는 아래의 이미지와 같이 Table 과 Rows 들이 생성됩니다.
Jdbc Connector Plugin 이 추가된 Docker Image 만들기.
Kafka Connect Worker 는 Jdbc Connector Plugin 이 추가되어 있어야 합니다.
Worker 내부에 Jdbc Connector Plugin 을 추가하는 방법은 여러가지가 있지만, 저는 Plugin 이 추가된 Docker Image 를 생성하겠습니다.
우선 Dockerfile 은 아래와 같습니다.
confluentinc/cp-kafka-connect-base 이미지를 Base 로 하여, confluent-hub CLI 를 통해서 Plugin 을 설치합니다.
그리고 MySQL Jdbc Driver 또한 설치합니다.
cat <<'EOF'> Dockerfile FROM confluentinc/cp-kafka-connect-base:7.8.0 ENV CONNECT_PLUGIN_PATH="/usr/share/java" RUN confluent-hub install --no-prompt --component-dir /usr/share/java confluentinc/kafka-connect-jdbc:10.7.4 RUN curl -L -o /usr/share/java/confluentinc-kafka-connect-jdbc/lib/mysql-connector-java-8.0.23.jar \ https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.23/mysql-connector-java-8.0.23.jar EXPOSE 8083 CMD ["/etc/confluent/docker/run"] EOF
그리고 이미지를 빌드합니다.
이미지의 이름은 kafka-jdbc-connect 라고 지어보았습니다.
docker build -t kafka-jdbc-connect .
빌드된 kafka-jdbc-connect 를 활용하여 1개의 Connect Worker 를 실행합니다.
docker run -d --network kafka \ --name connect-1 --hostname connect-1 \ -e CONNECT_BOOTSTRAP_SERVERS=kafka1:9092 \ -e CONNECT_REST_ADVERTISED_HOST_NAME=connect-1 \ -e CONNECT_GROUP_ID=connect-cluster \ -e CONNECT_CONFIG_STORAGE_TOPIC=connect-configs \ -e CONNECT_OFFSET_STORAGE_TOPIC=connect-offsets \ -e CONNECT_STATUS_STORAGE_TOPIC=connect-status \ -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \ -e CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ kafka-jdbc-connect:latest
총 5개의 Docker Container 들을 Docker Desktop 에서 확인할 수 있습니다.
생선된 Kafka Connect Topic 들 확인해보기.
Kafka Connect 가 실행되면 connect-status, connect-configs, connect-offsets 토픽이 생성됩니다.
이렇게 생성된 Connect Topic 들이 확인된다면 Kafka Connect Worker 가 정상 실행됨을 알 수 있습니다.
JdbcSourceConnector 실행하기.
이제 JdbcSourceConnector 를 실행시키기 위한 환경 설정이 마무리되었습니다.
한번 JdbcSourceConnector 를 실행해보도록 하겠습니다.
아래의 curl 명령어를 통해서 Connector 의 생성 및 실행이 가능합니다.
Kafka Connect 는 HTTP Restful 형식으로 커넥터의 조회 및 수정이 가능합니다.
docker exec connect-1 curl -X POST -H "Content-Type: application/json" \ --data '{ "name": "jdbc-source-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://mysql:3306/base", "connection.user": "root", "connection.password": "1234", "table.whitelist": "users", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "mysql-", "poll.interval.ms": "5000" } }' \ http://localhost:8083/connectors
실행 중인 JdbcSourceConnector 확인하는 방법.
JdbcSourceConnector 의 실행을 확인하는 방법들에 대해서 설명하도록 하겠습니다.
connect-config Topic 확인하기.
connect-config Topic 은 Connector 와 Task 의 정보가 기록됩니다.
아래의 이미지는 connect-config Topic 에 삽입된 하나의 Record 의 정보를 캡쳐한 내용입니다.
해당하는 Record 는 connector-jdbc-source-connector 라는 Key 를 가지며,
Value 는 Connector 의 설정과 동일합니다.
그리고 Kafka Connector 는 tasks.max 설정 값에 해당하는 값만큼 여러개의 Task 를 생성할 수 있습니다.
저의 경우에는 1개의 Worker 만을 사용하며 tasks.max 의 값 또한 1이기 때문에 1개의 Tasks 가 생성됩니다.
이 Task 에 대한 설정 내용 또한 connect-configs 토픽에 저장됩니다.
아래의 내용은 Key : task-jdbc-source-connector-0 에 해당하는 Record 에 대한 정보입니다.
즉, connect-config 토픽을 통해서 어떤 Connector 가 생성되었으며, Connector 는 어떻게 Task 분배가 되었는지를 확인할 수 있습니다.
connect-status Topic 확인하기.
JdbcSourceConnector 가 실행된 이후에 connect-status Topic 은 현재 Task 의 진행 상태가 나열됩니다.
connect-status 토픽의 레코드들이 Task 의 상태를 의미한다고 생각하시면 됩니다.
처음에는 RUNNING 인 Record 가 삽입됩니다.
그리고 MySQL Source 를 Polling 하는 과정에 대한 기록이 추가됩니다.
Restful API 사용하기.
Worker 내부에서 Connector 가 실행 중인지 확인할 수 있는 가장 정확한 방법은 Restful API 를 사용하는 것입니다.
아래의 HTTP GET 요청은 생성된 Connector 목록을 확인하는 Find List API 입니다.
docker exec connect-1 curl -X GET http://localhost:8083/connectors
[ "jdbc-source-connector" ]
그리고 아래의 HTTP GET 요청은 특정 Connector 의 상세 정보를 확인할 수 있는 FindOne API 입니다.
docker exec connect-1 curl -X GET http://localhost:8083/connectors/jdbc-source-connector
{ "name": "jdbc-source-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "mysql-", "connection.password": "1234", "tasks.max": "1", "connection.user": "root", "poll.interval.ms": "5000", "name": "jdbc-source-connector", "connection.url": "jdbc:mysql://mysql:3306/base", "table.whitelist": "users" }, "tasks": [ { "connector": "jdbc-source-connector", "task": 0 } ], "type": "source" }
Kafka Topic 에 생성된 데이터 확인.
JdbcSourceConnector 는 외부 데이터 소스에서 데이터를 조회하여 Kafka Topic 으로 데이터를 삽입합니다.
Connector 설정 중에서 topic.prefix 의 설정 값을 기준으로 Sink Topic 이 자동으로 생성됩니다.
저는 topic.prefix 의 값을 mysql 으로 설정하였고, 따라서 mysql-users 라는 Topic 으로 데이터가 생성됩니다.
마치며.
이번 글에서는 JdbcSourceConnector 를 실행하는 방법에 대해서 간단히 알아보았습니다.
이어지는 컨텐츠에서는 Jdbc Connector 에 대한 심도 있는 주제들을 다루어보도록 하겠습니다.
반응형'Kafka > kafka Connect' 카테고리의 다른 글
[Kafka Connect] JdbcSourceConnector Bulk Mode 알아보기 (0) 2024.06.18 [Kafka Connect] JdbcSourceConnector table.whitelist 알아보기 (0) 2024.06.18 [Kafka-Connect] Debezium MySQL Connector 구현하기 (0) 2024.02.18 Debezium Plugin 이 포함된 Kafka Connect Docker 이미지 만들기 (0) 2023.11.21 [Kafka Connect] SpoolDir Connector 구현해보기 (0) 2023.02.24