-
[Kafka-Connect] S3 Sink Connector 따라해보기 1Kafka 2024. 1. 8. 21:07728x90반응형
- 목차
들어가며.
이번 글에서는 Kafka Connect 의 S3 Sink 를 구현해보려고 합니다.
사용할 데이터는 Kaggle User Behavior - Advertisement 데이터를 사용할 예정이구요.
아래 링크에서 해당 데이터를 내려받으실 수 있습니다.
https://www.kaggle.com/code/srolka/user-behavior-advertisement/input?select=advertising.csv
사용할 advertisement csv 파일에 대해서 간단히 설명하도록 하겠습니다.
각 데이터를 온라인 광고에 노출된 사용자의 행동 데이터입니다.
데이터는 아래와 같이 10개의 Column 으로 구성되며,
사용자의 메타데이터와 온라인 광고의 메타데이터로 구성됩니다.
Data columns (total 10 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 Daily Time Spent on Site 1000 non-null float64 1 Age 1000 non-null int64 2 Area Income 1000 non-null float64 3 Daily Internet Usage 1000 non-null float64 4 Ad Topic Line 1000 non-null object 5 City 1000 non-null object 6 Male 1000 non-null int64 7 Country 1000 non-null object 8 Timestamp 1000 non-null object 9 Clicked on Ad 1000 non-null int64
아래의 예시는 Head 5 에 해당하는 5개의 데이터의 모양입니다.
Daily Time Spent on Site Age Area Income Daily Internet Usage Ad Topic Line City Male Country Timestamp Clicked on Ad 68.95 35 61833.90 256.09 Cloned 5thgeneration orchestration Wrightburgh 0 Tunisia 2016-03-27 00:53:11 0 80.23 31 68441.85 193.77 Monitored national standardization West Jodi 1 Nauru 2016-04-04 01:39:02 0 69.47 26 59785.94 236.50 Organic bottom-line service-desk Davidton 0 San Marino 2016-03-13 20:35:42 0 74.15 29 54806.18 245.89 Triple-buffered reciprocal time-frame West Terrifurt 1 Italy 2016-01-10 02:31:19 0 68.37 35 73889.99 225.58 Robust logistical utilization South Manuel 0 Iceland 2016-06-03 03:36:18 0
Topic 에 데이터 추가하기.
카프카 토픽을 생성하는 방법은 아래의 링크를 참조해주세요.
https://westlife0615.tistory.com/474
아래의 Python Code 는 advertise 토픽으로 데이터를 생성하는 Kafka Producer 프로그램입니다.
import os, json import pandas as pd df = pd.read_csv(os.path.abspath(os.path.join("../resources/advertising.csv"))) from kafka import KafkaProducer producer = KafkaProducer( acks=1, value_serializer=lambda x: x.encode("utf-8"), key_serializer=None, bootstrap_servers="localhost:29091,localhost:29092,localhost:29093") for _, value in df.iterrows(): val = value.to_json() response = producer.send(topic="advertise", value=val).get() print(response)
위 Python 코드를 실행하게 되면 아래와 같이 Topic 에 레코드가 생성됩니다.
Kafka Connect 실행하기.
아래 코드들은 Kafka Connect 클러스터를 실행하는 코드입니다.
< Kafka Connect 의 Distributed Mode 실행을 위한 Properties 파일 >
cat <<EOF> /tmp/kafka-connect.properties bootstrap.servers=host.docker.internal:29191,host.docker.internal:29192,host.docker.internal:29193 group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.topic=connect-offsets offset.storage.replication.factor=3 config.storage.topic=connect-configs config.storage.replication.factor=3 status.storage.topic=connect-status status.storage.replication.factor=3 offset.flush.interval.ms=10000 plugin.path=/usr/share/java rest.port=8083 EOF
< Docker Run Command >
S3 Sink Connector 구현을 위해서 환경변수로 AWS ACCESS KEY 값을 추가합니다.
docker run --name kafka-connect -d \ --mount type=bind,source=/tmp/kafka-connect.properties,destination=/etc/kafka/kafka-connect.properties,readonly \ -p 8083:8083 \ --mount type=bind,source=/tmp/confluentinc-kafka-connect-s3-10.5.7,destination=/usr/share/java/kafka/confluentinc-kafka-connect-s3-10.5.7,readonly \ -e AWS_ACCESS_KEY_ID=Qw0H8Gu6A2KE1PpG33gM \ -e AWS_SECRET_ACCESS_KEY=hFn1GtcNyFvaeVra7GCMNH9P7YSIdUWORNhNwsjf \ confluentinc/cp-kafka-connect:7.5.3 \ connect-distributed /etc/kafka/kafka-connect.properties
Kafka Connect Cluster 확인하기.
Kafka Connect 가 실행되면 Rest API 를 통해서 Kafka Connect Cluster 의 상태를 확인할 수 있습니다.
아래 이미지와 같이 http://localhost:8083 URL 을 통해서 Kafka Connect 의 현재 상태 정보를 알 수 있습니다.
version 과 commit 은 현재 사용중인 Kafka Connect 의 버전 정보를 의미하고,
kafka_cluster_id 는 bootstrap.servers 설정으로 연결된 카프카 클러스터를 의미합니다.
S3 Connector Plugin 추가하기.
먼저 아래 링크를 통해서 S3 Connector Plugin Jar 파일을 설치해야합니다.
https://www.confluent.io/hub/confluentinc/kafka-connect-s3
내려받은 파일은 아래와 같은 구조로 구성됩니다.
그리고 이 파일을 Kafka Connect 내부로 등록합니다.
Kafka Connect Cluster 내부로 플러그인 추가.
아래의 docker copy 명령어를 통해서 로컬호스트의 파일을 도커 컨테이너 내부로 복사합니다.
docker cp \ /tmp/confluentinc-kafka-connect-s3-10.5.7 \ kafka-connect:/usr/share/java/confluentinc-kafka-connect-s3-10.5.7
Successfully copied 46.8MB to kafka-connect:/usr/share/java/
복사가 잘 되었는지 확인해보겠습니다.
아래의 명령어를 통해서 kafka-connect 컨테이너 내부에 /usr/share/java 가 존재하는지 확인합니다.
docker exec kafka-connect ls /usr/share/java
acl confluent-common confluent-control-center confluent-hub-client confluent-telemetry confluentinc-kafka-connect-s3-10.5.7 cp-base-new kafka kafka-serde-tools monitoring-interceptors rest-utils schema-registry
Connector 등록 및 실행하기.
이제 Kafka Connect Cluster 를 Distributed Mode 로 실행시켰고,
S3 Connector Plugin 또한 추가하였습니다.
마지막으로 실제 Connector 를 실행시키겠습니다.
Connector 를 실행시키기 위해선 Kafka Connect 의 REST API 를 통해서 Connector 를 생성하는 과정이 필요합니다.
아래의 Curl 명령어는 Connector 를 생성하는 POST API 입니다.
curl -X POST http://localhost:8083/connectors \ -H 'content-type:application/json' \ -d '{ "name": "s3connector", "config": { "topics": "advertise", "tasks.max": 3, "connector.class": "io.confluent.connect.s3.S3SinkConnector", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "flush.size": 1000, "s3.bucket.name": "advertise", "s3.region": "us-east-1", "store.url": "http://host.docker.internal:9010", "key.converter.schemas.enable": false, "value.converter.schemas.enable": false, "s3.compression.type": "gzip", "locale": "ko_KR", "timezone": "Asia/Seoul" } }'
생성된 S3 Connector 를 확인해보도록 하겠습니다.
브라우저에서 http://localhost:8083/connectors 로 접속하게 되면, 생성된 Connector 들의 리스트를 확인할 수 있습니다.
저의 경우에는 방금 생성한 s3connector 가 보이구요.
http://localhost:8083/connectors/s3connector/status 를 통해서 현재 실행 중인 상태를 확인할 수 있습니다.
최종적으로 아래와 같은 형식으로 Bucket 에 생성된 파일들을 확인할 수 있습니다.
마치며.
이번 글에서는 간단하게 Kafka Connect 로 S3 Connector 를 구현해보았습니다.
이어지는 글에서 S3 Sink Connector 의 세부적인 설정들에 대해 알아볼 예정입니다.
감사합니다.
반응형'Kafka' 카테고리의 다른 글
[ Kafka Producer ] 불안정한 네트워크에서 데이터 생성하기 ( Acks, Retries ) (0) 2024.01.09 [Kafka-Streams] mapValues 알아보기 (0) 2024.01.09 [Kafka-Streams] KStream 알아보기 (0) 2024.01.06 [Kafka] ProducerInterceptors 알아보기 (0) 2024.01.05 [Kafka] Partition Ownership 알아보기 (0) 2024.01.05