ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka-Connect & MySQL 구현하기
    Kafka 2023. 12. 9. 07:37
    728x90
    반응형

    - 목차

     

    소개.

    kafka-connect 를 활용하여 MySQL Source 또는 Sink 구현해보려고 합니다.

    먼저 실습을 위해서 Kafka Cluster 를 구축해보겠습니다.

    필요한 Application 들은 아래와 같습니다.

     

    - Kafka

    - Zookeeper

    - Kafdrop

    - Kafka-Connect

    - MySQL

     

    1. kafka-docker-compose.yaml 을 생성합니다.

    kafka, zookeeper, kafdrop 으로 구성된 Compose yaml 입니다.

    cat <<EOF> /tmp/kafka-docker-compose.yaml
    version: '2'
    services:
      kafdrop:
        image: obsidiandynamics/kafdrop:4.0.1
        container_name: kafdrop
        restart: "no"
        ports:
          - "9000:9000"
        environment:
          KAFKA_BROKERCONNECT: "kafka:9092"
        depends_on:
          - "kafka"
        networks:
          - kafka
      zookeeper:
        image: confluentinc/cp-zookeeper:7.4.3
        container_name: zookeeper
        environment:
          ZOOKEEPER_SERVER_ID: 1
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
          ZOOKEEPER_INIT_LIMIT: 5
          ZOOKEEPER_SYNC_LIMIT: 2
        ports:
          - "22181:2181"
        networks:
          - kafka      
      kafka:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka    
        depends_on:
          - zookeeper
        ports:
          - "29092:9092"
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
        networks:
          - kafka
    networks:
      kafka:
        driver: bridge
    EOF

     

    2. docker-compose 를 실행합니다.

    docker-compose -f /tmp/kafka-docker-compose.yaml --project-name kafka up -d

     

     

    3. MySQL 컨테이너를 생성합니다.

     

    MySQL 컨테이너를 하나 생성하겠습니다.

    MySQL 는 Kafka-Connect 의 Source 로 사용할 예정입니다.

     

    docker run -d --name mysql -p 3306:3306 --network kafka_kafka -e MYSQL_ROOT_PASSWORD=1234 mysql:8.0

     

    그리고 MySQL 내부에서 Database 와 Table 을 하나씩 생성하겠습니다.

    docker exec -it mysql sh
    
    mysql -uroot -p1234
    create database test;
    use test;
    create table test(id int not null primary key, name varchar(64), age int);
    
    insert into test values(1, 'Andy', 30);
    insert into test values(2, 'Bruce', 31);

     

    Kafka Connect 실행하기.

    kafka connect 의 properties 파일을 생성합니다.

     

    cat <<EOF> /tmp/kafka-connect-standalone.properties
    # connect-standalone.properties
    
    # Bootstrap server (Kafka broker) to connect to
    bootstrap.servers=kafka:9092
    
    # Key and value serializers for source and sink connectors
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=true
    
    # Specify the converters for internal Kafka Connect data
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter.schemas.enable=false
    
    # Specify the topics for source and sink connectors
    # For Debezium MySQL Connector, these topics represent MySQL database changes
    # Source connector topic
    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms=10000
    
    # Define the converter for offsets storage
    offset.storage.converter=org.apache.kafka.connect.storage.FileOffsetBackingStore
    
    # Define the connector class for the Debezium MySQL Connector
    # Make sure to download the appropriate connector JAR and place it in the plugin.path directory
    plugin.path=/etc/kafka-connect/jars
    # Download the connector JAR from https://debezium.io/
    connector.class=io.debezium.connector.mysql.MySqlConnector
    
    # MySQL connection properties
    # Replace with your MySQL server details
    database.hostname=mysql
    database.port=3306
    database.user=root
    database.password=1234
    database.server.id=184054
    database.server.name=my-app-connector
    database.whitelist=test
    EOF

     

    kafka-connector 를 다운로드합니다.

    kafka-connector-jdbc 를 사용해야하며, 해당 jar 파일을 추가하기 위해서 다운로드합니다.

    wget http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz -O /tmp/kafka-connector.tar.gz
    tar -zxvf /tmp/kafka-connector.tar.gz
    rm /tmp/kafka-connector.tar.gz

     

    mysql-jdbc-connector 를 다운로드합니다.

    wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-j-8.2.0.tar.gz -O /tmp/mysql-connector-j-8.2.0.tar.gz
    tar -zxvf /tmp/mysql-connector-j-8.2.0.tar.gz
    rm /tmp/mysql-connector-j-8.2.0.tar.gz
    if test -f /tmp/mysql-connector-j-8.2.0/mysql-connector-j-8.2.0.jar; then echo exist; fi

     

    kafka-connect-jdbc-5.5.2.jar,  mysql-connector-j-8.2.0.jar 를 추가합니다.

    docker run -d --name kafka-connect \
    -p 8083:8083 \
    --network kafka_kafka \
    --mount type=bind,source=/tmp/kafka-connect-standalone.properties,target=/etc/kafka/connect-standalone.properties \
    --mount type=bind,source=/tmp/confluent-5.5.2/share/java/kafka-connect-jdbc/kafka-connect-jdbc-5.5.2.jar,target=/etc/kafka-connect/jars/kafka-connect-jdbc-5.5.2.jar \
    --mount type=bind,source=/tmp/mysql-connector-j-8.2.0/mysql-connector-j-8.2.0.jar,target=/etc/kafka-connect/jars/mysql-connector-j-8.2.0.jar,readonly \
    --mount type=bind,source=/tmp/mysql-connector-j-8.2.0/mysql-connector-j-8.2.0.jar,target=/usr/share/java/kafka/mysql-connector-j-8.2.0.jar,readonly \
    confluentinc/cp-kafka-connect \
    /bin/connect-standalone /etc/kafka/connect-standalone.properties

     

     

    위 과정을 마치게되면, Connector 를 등록해야합니다.

    아래의 명령어를 수행하게 되면, Connector 가 등록되구요.

    본격적으로 MySQL -> Kafka Topic 으로 이러지는 Source Connector 가 시작됩니다.

    curl -X POST http://localhost:8083/connectors \
    -H "Content-Type: application/json" \
    -d '{
      "name" : "mysql-source-connect",
      "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mysql://mysql:3306/test",
        "connection.user":"root",
        "connection.password":"1234",
        "mode": "incrementing",
        "incrementing.column.name" : "id",
        "table.whitelist":"test",
        "topic.prefix" : "test-",
        "poll.interval.ms" : 2000,
        "tasks.max" : "1"
      }
    }'

     

    http://localhost:8083/connectors 를 통해서 등록된 Connector 를 확인할 수 있습니다.

     

    Kafka Connector 가 시작되면, 아래와 같이 토픽의 생성과 메시지의 추가가 시작됩니다.

    반응형

    'Kafka' 카테고리의 다른 글

    Docker 로 Kafka Cluster 구축해보기.  (0) 2023.12.16
    Kafka Ksql 구현하기  (0) 2023.12.11
    Kafka Replication (메시지 복제) 이해하기  (0) 2023.09.21
    kafka __consumer_offsets topic 이해하기  (0) 2023.09.18
    Kafka Consumer 개념  (0) 2023.09.13
Designed by Tistory.