ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka-Connect] S3 Sink Connector 따라해보기 1
    Kafka 2024. 1. 8. 21:07
    728x90
    반응형

    - 목차

     

    들어가며.

    이번 글에서는 Kafka Connect 의 S3 Sink 를 구현해보려고 합니다.

    사용할 데이터는 Kaggle User Behavior - Advertisement 데이터를 사용할 예정이구요.

     

    아래 링크에서 해당 데이터를 내려받으실 수 있습니다.

    https://www.kaggle.com/code/srolka/user-behavior-advertisement/input?select=advertising.csv

     

    User Behavior - Advertisement

    Explore and run machine learning code with Kaggle Notebooks | Using data from Advertisement

    www.kaggle.com

     

    사용할 advertisement csv 파일에 대해서 간단히 설명하도록 하겠습니다.

    각 데이터를 온라인 광고에 노출된 사용자의 행동 데이터입니다.

    데이터는 아래와 같이 10개의 Column 으로 구성되며,

    사용자의 메타데이터와 온라인 광고의 메타데이터로 구성됩니다.

     

    Data columns (total 10 columns):
     #   Column                    Non-Null Count  Dtype  
    ---  ------                    --------------  -----  
     0   Daily Time Spent on Site  1000 non-null   float64
     1   Age                       1000 non-null   int64  
     2   Area Income               1000 non-null   float64
     3   Daily Internet Usage      1000 non-null   float64
     4   Ad Topic Line             1000 non-null   object 
     5   City                      1000 non-null   object 
     6   Male                      1000 non-null   int64  
     7   Country                   1000 non-null   object 
     8   Timestamp                 1000 non-null   object 
     9   Clicked on Ad             1000 non-null   int64

     

    아래의 예시는 Head 5 에 해당하는 5개의 데이터의 모양입니다.

    Daily Time Spent on Site  Age  Area Income  Daily Internet Usage   Ad Topic Line                           City               Male     Country        Timestamp             Clicked on Ad
    68.95                     35   61833.90     256.09                 Cloned 5thgeneration orchestration      Wrightburgh        0        Tunisia        2016-03-27 00:53:11   0
    80.23                     31   68441.85     193.77                 Monitored national standardization      West Jodi          1        Nauru          2016-04-04 01:39:02   0
    69.47                     26   59785.94     236.50                 Organic bottom-line service-desk        Davidton           0        San Marino     2016-03-13 20:35:42   0
    74.15                     29   54806.18     245.89                 Triple-buffered reciprocal time-frame   West Terrifurt     1        Italy          2016-01-10 02:31:19   0
    68.37                     35   73889.99     225.58                 Robust logistical utilization           South Manuel       0        Iceland        2016-06-03 03:36:18   0

     

    Topic 에 데이터 추가하기.

    카프카 토픽을 생성하는 방법은 아래의 링크를 참조해주세요.

    https://westlife0615.tistory.com/474

     

    Docker 로 Kafka Cluster 구축해보기.

    - 목차 소개. 저는 로컬 환경에서 카프카 관련 테스트를 진행하는 경우가 많이 생기더군요. 그래서 docker-compose 를 활용하여 Kafka, ZooKeeper 클러스터를 구축하는 내용을 작성하려고 합니다. docker-com

    westlife0615.tistory.com

     

     

    아래의 Python Code 는 advertise 토픽으로 데이터를 생성하는 Kafka Producer 프로그램입니다.

     

    import os, json
    import pandas as pd
    
    df = pd.read_csv(os.path.abspath(os.path.join("../resources/advertising.csv")))
    
    from kafka import KafkaProducer
    producer = KafkaProducer(
        acks=1,
        value_serializer=lambda x: x.encode("utf-8"),
        key_serializer=None,
        bootstrap_servers="localhost:29091,localhost:29092,localhost:29093")
    
    for _, value in df.iterrows():
        val = value.to_json()
        response = producer.send(topic="advertise", value=val).get()
        print(response)

     

    위 Python 코드를 실행하게 되면 아래와 같이 Topic 에 레코드가 생성됩니다.

     

     

     

    Kafka Connect 실행하기.

    아래 코드들은 Kafka Connect 클러스터를 실행하는 코드입니다.

     

    < Kafka Connect 의 Distributed Mode 실행을 위한 Properties 파일 >

    cat <<EOF> /tmp/kafka-connect.properties
    bootstrap.servers=host.docker.internal:29191,host.docker.internal:29192,host.docker.internal:29193
    group.id=connect-cluster
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=3
    config.storage.topic=connect-configs
    config.storage.replication.factor=3
    status.storage.topic=connect-status
    status.storage.replication.factor=3
    offset.flush.interval.ms=10000
    plugin.path=/usr/share/java
    rest.port=8083
    EOF

     

    < Docker Run Command >

    S3 Sink Connector 구현을 위해서 환경변수로 AWS ACCESS KEY 값을 추가합니다.

    docker run --name kafka-connect -d \
    --mount type=bind,source=/tmp/kafka-connect.properties,destination=/etc/kafka/kafka-connect.properties,readonly \
    -p 8083:8083 \
    --mount type=bind,source=/tmp/confluentinc-kafka-connect-s3-10.5.7,destination=/usr/share/java/kafka/confluentinc-kafka-connect-s3-10.5.7,readonly \
    -e AWS_ACCESS_KEY_ID=Qw0H8Gu6A2KE1PpG33gM \
    -e AWS_SECRET_ACCESS_KEY=hFn1GtcNyFvaeVra7GCMNH9P7YSIdUWORNhNwsjf \
    confluentinc/cp-kafka-connect:7.5.3 \
    connect-distributed /etc/kafka/kafka-connect.properties

     

    Kafka Connect Cluster 확인하기.

    Kafka Connect 가 실행되면 Rest API 를 통해서 Kafka Connect Cluster 의 상태를 확인할 수 있습니다.

    아래 이미지와 같이 http://localhost:8083 URL 을 통해서 Kafka Connect 의 현재 상태 정보를 알 수 있습니다.

     

     

    version 과 commit 은 현재 사용중인 Kafka Connect 의 버전 정보를 의미하고,

    kafka_cluster_id 는 bootstrap.servers 설정으로 연결된 카프카 클러스터를 의미합니다.

     

    S3 Connector Plugin 추가하기.

    먼저 아래 링크를 통해서 S3 Connector Plugin Jar 파일을 설치해야합니다.

     

    https://www.confluent.io/hub/confluentinc/kafka-connect-s3

     

    Amazon S3 Sink Connector

    Confluent, founded by the original creators of Apache Kafka®, delivers a complete execution of Kafka for the Enterprise, to help you run your business in real-time.

    www.confluent.io

     

    내려받은 파일은 아래와 같은 구조로 구성됩니다.

    그리고 이 파일을 Kafka Connect 내부로 등록합니다.

     

    Kafka Connect Cluster 내부로 플러그인 추가.

    아래의 docker copy 명령어를 통해서 로컬호스트의 파일을 도커 컨테이너 내부로 복사합니다.

    docker cp \
    /tmp/confluentinc-kafka-connect-s3-10.5.7 \
    kafka-connect:/usr/share/java/confluentinc-kafka-connect-s3-10.5.7
    Successfully copied 46.8MB to kafka-connect:/usr/share/java/

     

     

    복사가 잘 되었는지 확인해보겠습니다.

    아래의 명령어를 통해서 kafka-connect 컨테이너 내부에 /usr/share/java 가 존재하는지 확인합니다.

    docker exec kafka-connect ls /usr/share/java
    acl
    confluent-common
    confluent-control-center
    confluent-hub-client
    confluent-telemetry
    confluentinc-kafka-connect-s3-10.5.7
    cp-base-new
    kafka
    kafka-serde-tools
    monitoring-interceptors
    rest-utils
    schema-registry

     

    Connector 등록 및 실행하기.

    이제 Kafka Connect Cluster 를 Distributed Mode 로 실행시켰고,

    S3 Connector Plugin 또한 추가하였습니다.

    마지막으로 실제 Connector 를 실행시키겠습니다.

    Connector 를 실행시키기 위해선 Kafka Connect 의 REST API 를 통해서 Connector 를 생성하는 과정이 필요합니다.

    아래의 Curl 명령어는 Connector 를 생성하는 POST API 입니다.

     

    curl -X POST http://localhost:8083/connectors \
    -H 'content-type:application/json' \
    -d '{
        "name": "s3connector",
        "config": {
            "topics": "advertise",
            "tasks.max": 3,
            "connector.class": "io.confluent.connect.s3.S3SinkConnector",
            "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
            "storage.class": "io.confluent.connect.s3.storage.S3Storage",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "flush.size": 1000,
            "s3.bucket.name": "advertise",
            "s3.region": "us-east-1",
            "store.url": "http://host.docker.internal:9010",
            "key.converter.schemas.enable": false,
            "value.converter.schemas.enable": false,
            "s3.compression.type": "gzip",
            "locale": "ko_KR",
            "timezone": "Asia/Seoul"
        }
    }'

     

    생성된 S3 Connector 를 확인해보도록 하겠습니다.

    브라우저에서 http://localhost:8083/connectors 로 접속하게 되면, 생성된 Connector 들의 리스트를 확인할 수 있습니다.

    저의 경우에는 방금 생성한 s3connector 가 보이구요.

     

     

    http://localhost:8083/connectors/s3connector/status 를 통해서 현재 실행 중인 상태를 확인할 수 있습니다.

     

     

    최종적으로 아래와 같은 형식으로 Bucket 에 생성된 파일들을 확인할 수 있습니다.

     

     

    마치며.

    이번 글에서는 간단하게 Kafka Connect 로 S3 Connector 를 구현해보았습니다.

    이어지는 글에서 S3 Sink Connector 의 세부적인 설정들에 대해 알아볼 예정입니다.

    감사합니다.

     

     

    반응형
Designed by Tistory.