-
[Kafka Connect] SpoolDir Connector 구현해보기Kafka/kafka Connect 2023. 2. 24. 06:23728x90반응형
- 목차
들어가며.
이번 글에서는 Kafka Connect 의 SpoolDir Source Connector 에 대해 알아보는 시간을 가지려고 합니다.
SpoolDir Connector 는 Local File 을 읽어들이고 Record 를 생성하여 Kafka Topic 으로 푸시하는 구조를 가집니다.
간략한 구조는 아래와 같습니다.
SpoolDir Source Connector 는 Local File 들을 Line by Line 으로 읽어들이고,
레코드를 생성하여 지정된 Kafka Topic 으로 레코드를 저장합니다.
중요한 점은 SpoolDir 은 S3 나 Hadoop 과 같은 원격 저장소의 파일이 아닌 내부의 로컬 파일을 그 대상으로 합니다.
그래서 SpoolDir Source Connector 의 지정된 디렉토리 하위로 파일들이 위치해야합니다.
즉, Connector 가 실행된 이후에 외부 구성요소에 의해서 파일이 지속적으로 공급되어야합니다.
이번 글에서는 Docker 를 활용하여 SpoolDir Source Connector 를 구현하는 것에 집중합니다.
추후에 작성할 글에서 심화된 주제를 다루도록 하겠습니다.
Kafka Connect Distributed Mode 구성하기.
SpoolDir Source Connector 를 실행하기 이전에 Docker 를 활용해서 Kafka Connect Distributed Mode 를 구성합니다.
Zookeeper Container 실행.
먼저 Zookeeper 를 실행하여, Kafka Cluster 의 상태를 저장하기 위한 저장소를 생성해야합니다.
kafka 라는 이름의 Docker Network 를 생성하고, Zookeeper 와 연결하도록 합니다.
docker network create kafka
docker run --platform linux/amd64 -d --name zookeeper \ -e ZOOKEEPER_SERVER_ID=1 \ -e ZOOKEEPER_CLIENT_PORT=2181 \ -p 2181:2181 --network kafka \ confluentinc/cp-zookeeper:7.4.3
Kafka Cluster 실행.
Zookeeper 가 실행되었다면 Kafka Broker 를 실행합니다.
3개의 Kafka Broker 들을 실행하고 이들을 연결하도록 합니다.
1번 Broker.
docker run --platform linux/amd64 -d --name kafka-broker-1 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092 \ -e KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka-broker-1:9092 \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT \ -e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \ -p 9091:9092 --network kafka \ confluentinc/cp-kafka:7.4.3
2번 Broker.
docker run --platform linux/amd64 -d --name kafka-broker-2 \ -e KAFKA_BROKER_ID=2 \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092 \ -e KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka-broker-2:9092 \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT \ -e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \ -p 9092:9092 --network kafka \ confluentinc/cp-kafka:7.4.3
3번 Broker.
docker run --platform linux/amd64 -d --name kafka-broker-3 \ -e KAFKA_BROKER_ID=3 \ -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \ -e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092 \ -e KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka-broker-3:9092 \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT \ -e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \ -p 9093:9092 --network kafka \ confluentinc/cp-kafka:7.4.3
위의 세가지 Docker Run 명령어를 통해서 3개의 Kafka Broker 가 실행됩니다.
지금까지 실행된 Docker Container 들의 상태는 아래의 Docker Desktop 의 캡쳐본과 같습니다.
1개의 Zookeeper 와 3개의 Kafka Broker 가 실행되어 Kafka Cluster 를 이룹니다.
여기서 Kafdrop 을 연결하여 Kafka Cluster 의 상태를 Web UI 를 통해서 확인할 수도 있습니다.
Kafdrop 실행해보기.
아래 Docker Run 명령어를 통해서 Kafdrop 을 실행시킬 수 있습니다.
환경변수로써 Kafka Broker 들의 Advertised Listener 주소를 바인딩시켜줍니다.
docker run --platform linux/amd64 -d --name kafdrop \ -e KAFKA_BROKERCONNECT=kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092 \ -p 9000:9000 --network kafka \ obsidiandynamics/kafdrop:4.0.2
그리고 http://localhost:9000 주소를 통해서 아래와 같은 UI 결과를 확인할 수 있습니다.
Kafka Connect Distributed Mode 실행해보기.
지금까지는 Kafka Connect 를 실행하기 위한 준비 과정이었습니다.
Kafka Cluster 환경과 연결할 Kafka Connect 환경을 구축해보겠습니다.
Kafka Connect 를 실행하는 Docker Command 는 아래와 같습니다.
docker run -d --platform linux/amd64 --name kafka-connect -p 8083:8083 \ -e CONNECT_BOOTSTRAP_SERVERS='kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092' \ -e CONNECT_REST_PORT='8083' \ -e CONNECT_GROUP_ID='connect-cluster' \ -e CONNECT_CONFIG_STORAGE_TOPIC='connect-configs' \ -e CONNECT_OFFSET_STORAGE_TOPIC='connect-offsets' \ -e CONNECT_STATUS_STORAGE_TOPIC='connect-statuses' \ -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR='3' \ -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR='3' \ -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR='3' \ -e CONNECT_KEY_CONVERTER='org.apache.kafka.connect.json.JsonConverter' \ -e CONNECT_VALUE_CONVERTER='org.apache.kafka.connect.json.JsonConverter' \ -e CONNECT_INTERNAL_KEY_CONVERTER='org.apache.kafka.connect.json.JsonConverter' \ -e CONNECT_INTERNAL_VALUE_CONVERTER='org.apache.kafka.connect.json.JsonConverter' \ -e CONNECT_LOG4J_LOGGERS='org.apache.kafka=INFO,org.apache.kafka.connect=INFO' \ -e CONNECT_REST_ADVERTISED_HOST_NAME='kafka-connect' \ --network kafka \ confluentinc/cp-kafka-connect:7.4.3 \ /bin/bash -c " \ confluent-hub install jcustenborder/kafka-connect-spooldir:2.0.65 --no-prompt && \ mkdir -p /home/appuser/input /home/appuser/finished /home/appuser/error && \ /etc/confluent/docker/run"
위 명령어를 통해서 Kafka Connect Container 를 실행하는데에 몇 분이 소요될 수 있습니다.
실행이 마무리된 이후에는 http://localhost:8083 주소를 통해서 Kafka Connect 의 상태를 확인할 수 있습니다.
또는 CURL 명령어를 통해서 확인이 가능합니다.
curl -X GET 'http://localhost:8083'
{ "version": "7.4.3-ccs", "commit": "58bc41cd9d6c788aee9a08c455ec7578fe9a71f2", "kafka_cluster_id": "0XxEyrZvRwiDgB0dvHn4Hg" }
그리고 Kafka Connect 가 실행된 이후에 4개의 Kafka Topic 들이 생성됩니다.
Kafka Connect 의 실행 이후에 이러한 결과들이 확인된다면 정상적으로 Kafka Connect 가 실행되었음을 의미합니다.
Connector 실행하기.
하지만 현재는 Kafka SpoolDir Connector 을 실행할 수 있는 Kafka Connect 환경만이 구축된 상태입니다.
아래의 CURL 명령어는 생성된 Connector 들의 목록을 확인할 수 있는 GET API 요청입니다.
curl -X GET 'http://localhost:8083/connectors'
[]
또는 웹브라우저에서 확인이 가능합니다.
이제 SpoolDir Source Connector 을 실행해보도록 하겠습니다.
Kafka Connect 에서 Source/Sink Connector 를 생성하는 방식은 POST API 를 통한 HTTP 요청입니다.
아래와 같이 Kafka Connect 의 /connectors Endpoint 로 JSON 타입의 Configuration 와 함께 POST 요청을 수행합니다.
cat <<'EOF'> /tmp/spooldir-csv-source.json { "name": "spooldir-csv-source", "config": { "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector", "tasks.max": "1", "input.path": "/home/appuser/input", "finished.path": "/home/appuser/finished", "error.path": "/home/appuser/error", "input.file.pattern": ".*\\.csv", "topic": "spooldir-topic", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "schema.generation.enabled": "true", "csv.first.row.as.header": "true" } } EOF curl -X POST 'http://localhost:8083/connectors' \ -H 'Content-Type: application/json' \ -d @/tmp/spooldir-csv-source.json
생성된 Connector 확인하는 법.
아래와 같이 /connectors 로 GET 요청을 함으로써 생성된 Connector 들을 확인할 수 있습니다.
curl -X GET 'http://localhost:8083/connectors'
[ "spooldir-csv-source" ]
생성된 Connector 의 Configuration 확인.
생성된 Connector 의 Configuration 을 확인하는 방법은 /connectors/{connector-name} 의 Path 로 GET 요청을 전달하면 됩니다.
curl -X GET 'http://localhost:8083/connectors/spooldir-csv-source'
{ "name": "spooldir-csv-source", "config": { "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector", "input.file.pattern": ".*\\.csv", "csv.first.row.as.header": "true", "finished.path": "/home/appuser/finished", "tasks.max": "1", "name": "spooldir-csv-source", "topic": "spooldir-topic", "error.path": "/home/appuser/error", "input.path": "/home/appuser/input", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "schema.generation.enabled": "true" }, "tasks": [], "type": "source" }
Connector 상태 확인.
/connectors/{connector-name}/status 의 Path 로 GET 요청을 전달하면,
Connector 의 상태를 확인할 수 있습니다.
저의 경우에는 Input File 들이 지정된 위치에 존재하지 않아서 아래와 같이 Failed 상태가 되었습니다.
curl -X GET 'http://localhost:8083/connectors/spooldir-csv-source/status'
{ "name": "spooldir-csv-source", "connector": { "state": "FAILED", "worker_id": "kafka-connect:8083", "trace": "java.lang.IllegalStateException: Could not find any input file(s) to infer schema from.\n\tat com.google.common.base.Preconditions.checkState(Preconditions.java:512)\n\tat com.github.jcustenborder.kafka.connect.spooldir.AbstractSpoolDirSourceConnector.start(AbstractSpoolDirSourceConnector.java:63)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:190)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:215)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:360)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:343)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:143)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:121)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n" }, "tasks": [], "type": "source" }
그럼 처리할 수 있는 파일을 추가한 이후에 SpoolDir Source Connector 를 실행해보도록 하겠습니다.
먼저 csv 파일들을 생성하고, Kafka Connect 의 /home/appuser/input 디렉토리에 추가하겠습니다.
자세히 설명드리진 않았지만, SpoolDir Source Connector 는 지정된 위치의 파일들을 처리하는 Source Connector 입니다.
저는 SpoolDir Source Connector 의 설정에 Input File 위치를 /home/appuser/input 로 지정하였습니다.
먼저 아래의 Script 를 통해서 csv File 을 생성합니다.
import pandas as pd import numpy as np num_rows = 1000 num_columns = 10 data = { "user_id": np.arange(1, num_rows + 1), "age": np.random.randint(18, 70, num_rows), "gender": np.random.choice(["Male", "Female", "Other"], num_rows), "score": np.random.rand(num_rows) * 100, "city": np.random.choice(["New York", "Paris", "Tokyo", "Berlin"], num_rows), "purchase_amount": np.round(np.random.uniform(10, 500, num_rows), 2), "subscription": np.random.choice(["Basic", "Premium", "VIP"], num_rows), "login_count": np.random.randint(1, 50, num_rows), "last_login": pd.to_datetime("2023-01-01") + pd.to_timedelta(np.random.randint(0, 365, num_rows), unit="D"), "is_active": np.random.choice([True, False], num_rows) } df = pd.DataFrame(data) df.to_csv("/tmp/user_data.csv", index=False) print("CSV file 'user_data.csv' created with random user data.")
그리고 생성된 /tmp/user_data.csv 파일을 kafka-connect 의 /home/appuser/input 디렉토리 내부로 옮겨줍니다.
docker cp /tmp/user_data.csv kafka-connect:/home/appuser/input/user_data.csv
이제 SpoolDir Source Connector 를 실행합니다.
cat <<'EOF'> /tmp/spooldir-csv-source.json { "name": "spooldir-csv-source-2", "config": { "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector", "tasks.max": "1", "input.path": "/home/appuser/input", "finished.path": "/home/appuser/finished", "error.path": "/home/appuser/error", "input.file.pattern": ".*\\.csv", "topic": "spooldir-topic", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "schema.generation.enabled": "true", "csv.first.row.as.header": "true" } } EOF curl -X POST 'http://localhost:8083/connectors' \ -H 'Content-Type: application/json' \ -d @/tmp/spooldir-csv-source.json
spooldir-csv-source-2 Connector 의 상태는 아래의 이미지처럼 RUNNING 상태가 됩니다.
그리고 spooldir-topic 내부에 Kafka Record 들이 추가됩니다.
초가된 Kafka Record 들은 SpoolDir Source Connector 가 읽어들인 csv 파일의 데이터가 됩니다.
반응형