ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka Connect] SpoolDir Connector 구현해보기
    Kafka/kafka Connect 2023. 2. 24. 06:23
    728x90
    반응형

    - 목차

     

    들어가며.

    이번 글에서는 Kafka Connect 의 SpoolDir Source Connector 에 대해 알아보는 시간을 가지려고 합니다.

    SpoolDir Connector 는 Local File 을 읽어들이고 Record 를 생성하여 Kafka Topic 으로 푸시하는 구조를 가집니다.

     

    간략한 구조는 아래와 같습니다.

    SpoolDir Source Connector 는 Local File 들을 Line by Line 으로 읽어들이고,

    레코드를 생성하여 지정된 Kafka Topic 으로 레코드를 저장합니다.

     

    중요한 점은 SpoolDir 은 S3 나 Hadoop 과 같은 원격 저장소의 파일이 아닌 내부의 로컬 파일을 그 대상으로 합니다.

    그래서 SpoolDir Source Connector 의 지정된 디렉토리 하위로 파일들이 위치해야합니다.

    즉, Connector 가 실행된 이후에 외부 구성요소에 의해서 파일이 지속적으로 공급되어야합니다.

     

    이번 글에서는 Docker 를 활용하여 SpoolDir Source Connector 를 구현하는 것에 집중합니다.

    추후에 작성할 글에서 심화된 주제를 다루도록 하겠습니다.

     

    Kafka Connect Distributed Mode 구성하기.

    SpoolDir Source Connector 를 실행하기 이전에 Docker 를 활용해서 Kafka Connect Distributed Mode 를 구성합니다.

     

    Zookeeper Container 실행.

    먼저 Zookeeper 를 실행하여, Kafka Cluster 의 상태를 저장하기 위한 저장소를 생성해야합니다.

    kafka 라는 이름의 Docker Network 를 생성하고, Zookeeper 와 연결하도록 합니다.

    docker network create kafka
    docker run --platform linux/amd64 -d --name zookeeper \
    -e ZOOKEEPER_SERVER_ID=1 \
    -e ZOOKEEPER_CLIENT_PORT=2181 \
    -p 2181:2181 --network kafka \
    confluentinc/cp-zookeeper:7.4.3

     

    Kafka Cluster 실행.

    Zookeeper 가 실행되었다면 Kafka Broker 를 실행합니다.

    3개의 Kafka Broker 들을 실행하고 이들을 연결하도록 합니다.

     

    1번 Broker.

    docker run --platform linux/amd64 -d --name kafka-broker-1 \
    -e KAFKA_BROKER_ID=1 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
    -e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092 \
    -e KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka-broker-1:9092 \
    -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT \
    -e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
    -p 9091:9092 --network kafka \
    confluentinc/cp-kafka:7.4.3

     

    2번 Broker.

    docker run --platform linux/amd64 -d --name kafka-broker-2 \
    -e KAFKA_BROKER_ID=2 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
    -e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092 \
    -e KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka-broker-2:9092 \
    -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT \
    -e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
    -p 9092:9092 --network kafka \
    confluentinc/cp-kafka:7.4.3

     

    3번 Broker.

    docker run --platform linux/amd64 -d --name kafka-broker-3 \
    -e KAFKA_BROKER_ID=3 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
    -e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092 \
    -e KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka-broker-3:9092 \
    -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT \
    -e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
    -p 9093:9092 --network kafka \
    confluentinc/cp-kafka:7.4.3

     

     

    위의 세가지 Docker Run 명령어를 통해서 3개의 Kafka Broker 가 실행됩니다.

    지금까지 실행된 Docker Container 들의 상태는 아래의 Docker Desktop 의 캡쳐본과 같습니다.

    1개의 Zookeeper 와 3개의 Kafka Broker 가 실행되어 Kafka Cluster 를 이룹니다.

     

    여기서 Kafdrop 을 연결하여 Kafka Cluster 의 상태를 Web UI 를 통해서 확인할 수도 있습니다.

     

    Kafdrop 실행해보기.

    아래 Docker Run 명령어를 통해서 Kafdrop 을 실행시킬 수 있습니다.

    환경변수로써 Kafka Broker 들의 Advertised Listener 주소를 바인딩시켜줍니다.

    docker run --platform linux/amd64 -d --name kafdrop \
    -e KAFKA_BROKERCONNECT=kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092 \
    -p 9000:9000 --network kafka \
    obsidiandynamics/kafdrop:4.0.2

     

    그리고 http://localhost:9000 주소를 통해서 아래와 같은 UI 결과를 확인할 수 있습니다.

     

     

     

    Kafka Connect Distributed Mode 실행해보기.

    지금까지는 Kafka Connect 를 실행하기 위한 준비 과정이었습니다.

    Kafka Cluster 환경과 연결할 Kafka Connect 환경을 구축해보겠습니다.

     

    Kafka Connect 를 실행하는 Docker Command 는 아래와 같습니다.

    
    docker run -d --platform linux/amd64 --name kafka-connect -p 8083:8083 \
    -e CONNECT_BOOTSTRAP_SERVERS='kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092' \
    -e CONNECT_REST_PORT='8083' \
    -e CONNECT_GROUP_ID='connect-cluster' \
    -e CONNECT_CONFIG_STORAGE_TOPIC='connect-configs' \
    -e CONNECT_OFFSET_STORAGE_TOPIC='connect-offsets' \
    -e CONNECT_STATUS_STORAGE_TOPIC='connect-statuses' \
    -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR='3' \
    -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR='3' \
    -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR='3' \
    -e CONNECT_KEY_CONVERTER='org.apache.kafka.connect.json.JsonConverter' \
    -e CONNECT_VALUE_CONVERTER='org.apache.kafka.connect.json.JsonConverter' \
    -e CONNECT_INTERNAL_KEY_CONVERTER='org.apache.kafka.connect.json.JsonConverter' \
    -e CONNECT_INTERNAL_VALUE_CONVERTER='org.apache.kafka.connect.json.JsonConverter' \
    -e CONNECT_LOG4J_LOGGERS='org.apache.kafka=INFO,org.apache.kafka.connect=INFO' \
    -e CONNECT_REST_ADVERTISED_HOST_NAME='kafka-connect' \
    --network kafka \
    confluentinc/cp-kafka-connect:7.4.3 \
    /bin/bash -c " \
    confluent-hub install jcustenborder/kafka-connect-spooldir:2.0.65 --no-prompt && \
    mkdir -p /home/appuser/input /home/appuser/finished /home/appuser/error && \
    /etc/confluent/docker/run"

     

    위 명령어를 통해서 Kafka Connect Container 를 실행하는데에 몇 분이 소요될 수 있습니다.

    실행이 마무리된 이후에는 http://localhost:8083 주소를 통해서 Kafka Connect 의 상태를 확인할 수 있습니다.

     

    또는 CURL 명령어를 통해서 확인이 가능합니다.

     

    curl -X GET 'http://localhost:8083'
    {
      "version": "7.4.3-ccs",
      "commit": "58bc41cd9d6c788aee9a08c455ec7578fe9a71f2",
      "kafka_cluster_id": "0XxEyrZvRwiDgB0dvHn4Hg"
    }

     

     

    그리고 Kafka Connect 가 실행된 이후에 4개의 Kafka Topic 들이 생성됩니다.

    Kafka Connect 의 실행 이후에 이러한 결과들이 확인된다면 정상적으로 Kafka Connect 가 실행되었음을 의미합니다.

     

     

     

    Connector 실행하기.

    하지만 현재는 Kafka SpoolDir Connector 을 실행할 수 있는 Kafka Connect 환경만이 구축된 상태입니다.

    아래의 CURL 명령어는 생성된 Connector 들의 목록을 확인할 수 있는 GET API 요청입니다.

    curl -X GET 'http://localhost:8083/connectors'
    []

     

    또는 웹브라우저에서 확인이 가능합니다.

     

     

     

    이제 SpoolDir Source Connector 을 실행해보도록 하겠습니다.

    Kafka Connect 에서 Source/Sink Connector 를 생성하는 방식은 POST API 를 통한 HTTP 요청입니다.

    아래와 같이 Kafka Connect 의 /connectors Endpoint 로 JSON 타입의 Configuration 와 함께 POST 요청을 수행합니다.

     

    cat <<'EOF'> /tmp/spooldir-csv-source.json
    {
        "name": "spooldir-csv-source",
        "config": {
            "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
            "tasks.max": "1",
            "input.path": "/home/appuser/input",
            "finished.path": "/home/appuser/finished",
            "error.path": "/home/appuser/error",
            "input.file.pattern": ".*\\.csv",
            "topic": "spooldir-topic",
            "value.converter": "org.apache.kafka.connect.storage.StringConverter",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "schema.generation.enabled": "true",
            "csv.first.row.as.header": "true"
        }
    }
    EOF
    
    curl -X POST 'http://localhost:8083/connectors' \
    -H 'Content-Type: application/json' \
    -d @/tmp/spooldir-csv-source.json

     

     

    생성된 Connector 확인하는 법.

    아래와 같이 /connectors 로 GET 요청을 함으로써 생성된 Connector 들을 확인할 수 있습니다.

    curl -X GET 'http://localhost:8083/connectors'
    [
      "spooldir-csv-source"
    ]

     

     

    생성된 Connector 의 Configuration 확인.

    생성된 Connector 의 Configuration 을 확인하는 방법은 /connectors/{connector-name} 의 Path 로 GET 요청을 전달하면 됩니다.

     

    curl -X GET 'http://localhost:8083/connectors/spooldir-csv-source'
    {
      "name": "spooldir-csv-source",
      "config": {
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "input.file.pattern": ".*\\.csv",
        "csv.first.row.as.header": "true",
        "finished.path": "/home/appuser/finished",
        "tasks.max": "1",
        "name": "spooldir-csv-source",
        "topic": "spooldir-topic",
        "error.path": "/home/appuser/error",
        "input.path": "/home/appuser/input",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "schema.generation.enabled": "true"
      },
      "tasks": [],
      "type": "source"
    }

     

     

    Connector 상태 확인.

    /connectors/{connector-name}/status 의 Path 로 GET 요청을 전달하면,

    Connector 의 상태를 확인할 수 있습니다.

     

    저의 경우에는 Input File 들이 지정된 위치에 존재하지 않아서 아래와 같이 Failed 상태가 되었습니다.

     

    curl -X GET 'http://localhost:8083/connectors/spooldir-csv-source/status'
    {
      "name": "spooldir-csv-source",
      "connector": {
        "state": "FAILED",
        "worker_id": "kafka-connect:8083",
        "trace": "java.lang.IllegalStateException: Could not find any input file(s) to infer schema from.\n\tat com.google.common.base.Preconditions.checkState(Preconditions.java:512)\n\tat com.github.jcustenborder.kafka.connect.spooldir.AbstractSpoolDirSourceConnector.start(AbstractSpoolDirSourceConnector.java:63)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:190)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:215)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:360)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:343)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:143)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:121)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n"
      },
      "tasks": [],
      "type": "source"
    }

     

     

    그럼 처리할 수 있는 파일을 추가한 이후에 SpoolDir Source Connector 를 실행해보도록 하겠습니다.

     

    먼저 csv 파일들을 생성하고, Kafka Connect 의 /home/appuser/input 디렉토리에 추가하겠습니다.

    자세히 설명드리진 않았지만, SpoolDir Source Connector 는 지정된 위치의 파일들을 처리하는 Source Connector 입니다.

    저는 SpoolDir Source Connector 의 설정에 Input File 위치를 /home/appuser/input 로 지정하였습니다.

     

    먼저 아래의 Script 를 통해서 csv File 을 생성합니다.

    import pandas as pd
    import numpy as np
    
    num_rows = 1000
    num_columns = 10
    
    data = {
        "user_id": np.arange(1, num_rows + 1),
        "age": np.random.randint(18, 70, num_rows),
        "gender": np.random.choice(["Male", "Female", "Other"], num_rows),
        "score": np.random.rand(num_rows) * 100,
        "city": np.random.choice(["New York", "Paris", "Tokyo", "Berlin"], num_rows),
        "purchase_amount": np.round(np.random.uniform(10, 500, num_rows), 2),
        "subscription": np.random.choice(["Basic", "Premium", "VIP"], num_rows),
        "login_count": np.random.randint(1, 50, num_rows),
        "last_login": pd.to_datetime("2023-01-01") + pd.to_timedelta(np.random.randint(0, 365, num_rows), unit="D"),
        "is_active": np.random.choice([True, False], num_rows)
    }
    
    df = pd.DataFrame(data)
    
    df.to_csv("/tmp/user_data.csv", index=False)
    
    print("CSV file 'user_data.csv' created with random user data.")

     

    그리고 생성된 /tmp/user_data.csv 파일을 kafka-connect 의 /home/appuser/input 디렉토리 내부로 옮겨줍니다.

    docker cp /tmp/user_data.csv kafka-connect:/home/appuser/input/user_data.csv

     

     

    이제 SpoolDir Source Connector 를 실행합니다.

     

    cat <<'EOF'> /tmp/spooldir-csv-source.json
    {
        "name": "spooldir-csv-source-2",
        "config": {
            "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
            "tasks.max": "1",
            "input.path": "/home/appuser/input",
            "finished.path": "/home/appuser/finished",
            "error.path": "/home/appuser/error",
            "input.file.pattern": ".*\\.csv",
            "topic": "spooldir-topic",
            "value.converter": "org.apache.kafka.connect.storage.StringConverter",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "schema.generation.enabled": "true",
            "csv.first.row.as.header": "true"
        }
    }
    EOF
    
    curl -X POST 'http://localhost:8083/connectors' \
    -H 'Content-Type: application/json' \
    -d @/tmp/spooldir-csv-source.json

     

    spooldir-csv-source-2 Connector 의 상태는 아래의 이미지처럼 RUNNING 상태가 됩니다.

     

     

     

    그리고 spooldir-topic 내부에 Kafka Record 들이 추가됩니다.

    초가된 Kafka Record 들은 SpoolDir Source Connector 가 읽어들인 csv 파일의 데이터가 됩니다.

     

     

     

     

    반응형
Designed by Tistory.