-
Kafka-Connect & MySQL 구현하기Kafka 2023. 12. 9. 07:37728x90반응형
- 목차
소개.
kafka-connect 를 활용하여 MySQL Source 또는 Sink 구현해보려고 합니다.
먼저 실습을 위해서 Kafka Cluster 를 구축해보겠습니다.
필요한 Application 들은 아래와 같습니다.
- Kafka
- Zookeeper
- Kafdrop
- Kafka-Connect
- MySQL
1. kafka-docker-compose.yaml 을 생성합니다.
kafka, zookeeper, kafdrop 으로 구성된 Compose yaml 입니다.
cat <<EOF> /tmp/kafka-docker-compose.yaml version: '2' services: kafdrop: image: obsidiandynamics/kafdrop:4.0.1 container_name: kafdrop restart: "no" ports: - "9000:9000" environment: KAFKA_BROKERCONNECT: "kafka:9092" depends_on: - "kafka" networks: - kafka zookeeper: image: confluentinc/cp-zookeeper:7.4.3 container_name: zookeeper environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ports: - "22181:2181" networks: - kafka kafka: image: confluentinc/cp-kafka:7.4.3 container_name: kafka depends_on: - zookeeper ports: - "29092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000 networks: - kafka networks: kafka: driver: bridge EOF
2. docker-compose 를 실행합니다.
docker-compose -f /tmp/kafka-docker-compose.yaml --project-name kafka up -d
3. MySQL 컨테이너를 생성합니다.
MySQL 컨테이너를 하나 생성하겠습니다.
MySQL 는 Kafka-Connect 의 Source 로 사용할 예정입니다.
docker run -d --name mysql -p 3306:3306 --network kafka_kafka -e MYSQL_ROOT_PASSWORD=1234 mysql:8.0
그리고 MySQL 내부에서 Database 와 Table 을 하나씩 생성하겠습니다.
docker exec -it mysql sh mysql -uroot -p1234 create database test; use test; create table test(id int not null primary key, name varchar(64), age int); insert into test values(1, 'Andy', 30); insert into test values(2, 'Bruce', 31);
Kafka Connect 실행하기.
kafka connect 의 properties 파일을 생성합니다.
cat <<EOF> /tmp/kafka-connect-standalone.properties # connect-standalone.properties # Bootstrap server (Kafka broker) to connect to bootstrap.servers=kafka:9092 # Key and value serializers for source and sink connectors key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=true # Specify the converters for internal Kafka Connect data internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter.schemas.enable=false # Specify the topics for source and sink connectors # For Debezium MySQL Connector, these topics represent MySQL database changes # Source connector topic offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000 # Define the converter for offsets storage offset.storage.converter=org.apache.kafka.connect.storage.FileOffsetBackingStore # Define the connector class for the Debezium MySQL Connector # Make sure to download the appropriate connector JAR and place it in the plugin.path directory plugin.path=/etc/kafka-connect/jars # Download the connector JAR from https://debezium.io/ connector.class=io.debezium.connector.mysql.MySqlConnector # MySQL connection properties # Replace with your MySQL server details database.hostname=mysql database.port=3306 database.user=root database.password=1234 database.server.id=184054 database.server.name=my-app-connector database.whitelist=test EOF
kafka-connector 를 다운로드합니다.
kafka-connector-jdbc 를 사용해야하며, 해당 jar 파일을 추가하기 위해서 다운로드합니다.
wget http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz -O /tmp/kafka-connector.tar.gz tar -zxvf /tmp/kafka-connector.tar.gz rm /tmp/kafka-connector.tar.gz
mysql-jdbc-connector 를 다운로드합니다.
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-j-8.2.0.tar.gz -O /tmp/mysql-connector-j-8.2.0.tar.gz tar -zxvf /tmp/mysql-connector-j-8.2.0.tar.gz rm /tmp/mysql-connector-j-8.2.0.tar.gz if test -f /tmp/mysql-connector-j-8.2.0/mysql-connector-j-8.2.0.jar; then echo exist; fi
kafka-connect-jdbc-5.5.2.jar, mysql-connector-j-8.2.0.jar 를 추가합니다.
docker run -d --name kafka-connect \ -p 8083:8083 \ --network kafka_kafka \ --mount type=bind,source=/tmp/kafka-connect-standalone.properties,target=/etc/kafka/connect-standalone.properties \ --mount type=bind,source=/tmp/confluent-5.5.2/share/java/kafka-connect-jdbc/kafka-connect-jdbc-5.5.2.jar,target=/etc/kafka-connect/jars/kafka-connect-jdbc-5.5.2.jar \ --mount type=bind,source=/tmp/mysql-connector-j-8.2.0/mysql-connector-j-8.2.0.jar,target=/etc/kafka-connect/jars/mysql-connector-j-8.2.0.jar,readonly \ --mount type=bind,source=/tmp/mysql-connector-j-8.2.0/mysql-connector-j-8.2.0.jar,target=/usr/share/java/kafka/mysql-connector-j-8.2.0.jar,readonly \ confluentinc/cp-kafka-connect \ /bin/connect-standalone /etc/kafka/connect-standalone.properties
위 과정을 마치게되면, Connector 를 등록해야합니다.
아래의 명령어를 수행하게 되면, Connector 가 등록되구요.
본격적으로 MySQL -> Kafka Topic 으로 이러지는 Source Connector 가 시작됩니다.
curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name" : "mysql-source-connect", "config" : { "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/test", "connection.user":"root", "connection.password":"1234", "mode": "incrementing", "incrementing.column.name" : "id", "table.whitelist":"test", "topic.prefix" : "test-", "poll.interval.ms" : 2000, "tasks.max" : "1" } }'
http://localhost:8083/connectors 를 통해서 등록된 Connector 를 확인할 수 있습니다.
Kafka Connector 가 시작되면, 아래와 같이 토픽의 생성과 메시지의 추가가 시작됩니다.
반응형'Kafka' 카테고리의 다른 글
Docker 로 Kafka Cluster 구축해보기. (0) 2023.12.16 Kafka Ksql 구현하기 (0) 2023.12.11 Kafka Replication (메시지 복제) 이해하기 (0) 2023.09.21 kafka __consumer_offsets topic 이해하기 (0) 2023.09.18 Kafka Consumer 개념 (0) 2023.09.13