ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka-Streams] Json 기반 Custom Serdes 구현하기
    Kafka 2024. 2. 17. 17:49
    728x90
    반응형

     

    - 목차

     

    들어가며.

    Kafka Streams 에서 JSON 기반의 데이터를 직렬화/역직렬화 할 수 있는 Custom Serdes 를 구현하는 방법에 대해서 알아보려고 합니다.

    우선 이번 글에서 사용할 데이터는 Kaggle 에서 제공되는 Bank Client 데이터를 사용할 예정입니다.

    https://www.kaggle.com/datasets/henriqueyamahata/bank-marketing

     

    Bank Marketing

    source: https://archive.ics.uci.edu/ml/datasets/bank+marketing

    www.kaggle.com

    bank-additional-full.csv
    5.56MB

     

    bank-additional-full.csv 파일의 간단한 형태는 아래와 같습니다.

    +---+-----------+--------+-------------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
    |age|        job| marital|          education|default|housing|loan|  contact|month|day_of_week|duration|campaign|pdays|previous|   poutcome|emp.var.rate|cons.price.idx|cons.conf.idx|euribor3m|nr.employed|  y|
    +---+-----------+--------+-------------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
    | 56|  housemaid| married|           basic.4y|     no|     no|  no|telephone|  may|        mon|     261|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 57|   services| married|        high.school|unknown|     no|  no|telephone|  may|        mon|     149|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 37|   services| married|        high.school|     no|    yes|  no|telephone|  may|        mon|     226|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 40|     admin.| married|           basic.6y|     no|     no|  no|telephone|  may|        mon|     151|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 56|   services| married|        high.school|     no|     no| yes|telephone|  may|        mon|     307|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 45|   services| married|           basic.9y|unknown|     no|  no|telephone|  may|        mon|     198|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 59|     admin.| married|professional.course|     no|     no|  no|telephone|  may|        mon|     139|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 41|blue-collar| married|            unknown|unknown|     no|  no|telephone|  may|        mon|     217|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 24| technician|  single|professional.course|     no|    yes|  no|telephone|  may|        mon|     380|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 25|   services|  single|        high.school|     no|    yes|  no|telephone|  may|        mon|      50|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 41|blue-collar| married|            unknown|unknown|     no|  no|telephone|  may|        mon|      55|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 25|   services|  single|        high.school|     no|    yes|  no|telephone|  may|        mon|     222|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 29|blue-collar|  single|        high.school|     no|     no| yes|telephone|  may|        mon|     137|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 57|  housemaid|divorced|           basic.4y|     no|    yes|  no|telephone|  may|        mon|     293|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 35|blue-collar| married|           basic.6y|     no|    yes|  no|telephone|  may|        mon|     146|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 54|    retired| married|           basic.9y|unknown|    yes| yes|telephone|  may|        mon|     174|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 35|blue-collar| married|           basic.6y|     no|    yes|  no|telephone|  may|        mon|     312|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 46|blue-collar| married|           basic.6y|unknown|    yes| yes|telephone|  may|        mon|     440|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 50|blue-collar| married|           basic.9y|     no|    yes| yes|telephone|  may|        mon|     353|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    | 39| management|  single|           basic.9y|unknown|     no|  no|telephone|  may|        mon|     195|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
    +---+-----------+--------+-------------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+

     

    Serdes 의 구현에 초점을 맞출 예정이라 데이터의 자세한 설명을 생략하도록 하겠습니다.

     

    먼저 아래 링크는 Docker 로 Kafka Cluster 를 구축하는 예시를 작성한 페이지입니다.

    해당 페이지의 내용을 토대로 간단하게 Kafka Cluster 생성이 필요하구요.

     

    https://westlife0615.tistory.com/474

     

    Docker 로 Kafka Cluster 구축해보기.

    - 목차 소개. 저는 로컬 환경에서 카프카 관련 테스트를 진행하는 경우가 많이 생기더군요. 그래서 docker-compose 를 활용하여 Kafka, ZooKeeper 클러스터를 구축하는 내용을 작성하려고 합니다. docker-com

    westlife0615.tistory.com

     

    저는 bank.client 라는 이름의 토픽을 생성한 이후에 bank-additional-full.csv 의 Row 들을 Topic 에 추가하였습니다.

     

    import pandas as pd
    import json
    from kafka import KafkaProducer
    
    data = pd.read_csv("/tmp/bank-additional-full.csv")
    producer = KafkaProducer(
        bootstrap_servers=["localhost:29091", "localhost:29092", "localhost:29093"],
        key_serializer=None,
        value_serializer=lambda x: json.dumps(x).encode("UTF-8")
    )
    topic = "bank.client"
    for index, row in data.iterrows():
        client = row.to_dict()
        producer.send(topic=topic, key=None, value=client)
    producer.flush()
    producer.close()

     

     

    BankClientSerde 구현하기.

    먼저 직렬화/역직렬화 결과인 BankClient 라는 Pojo 클래스와 Serializer, Deserializer 클래스를 생성해야합니다.

    총 3개의 클래스가 필요하고 각각의 내용은 아래와 같습니다.

    class BankClientSerde extends Serdes.WrapperSerde<BankClient> {
    
      public BankClientSerde() {
        super(new BankClientSerializer(), new BankClientDeserializer());
      }
    }
    
    class BankClientDeserializer implements Deserializer<BankClient> {
      ObjectMapper objectMapper = new ObjectMapper();
    
      @Override
      public BankClient deserialize(String topic, byte[] data) {
        try {
          return objectMapper.readValue(data, BankClient.class);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }
    }
    class BankClientSerializer implements Serializer<BankClient> {
      ObjectMapper objectMapper = new ObjectMapper();
      @Override
      public byte[] serialize(String topic, BankClient data) {
        try {
          return objectMapper.valueToTree(data).toString().getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
          throw new RuntimeException(e);
        }
      }
    }
    
    @Getter @Setter @Builder
    class BankClient {
      @JsonProperty("age") private int age;
      @JsonProperty("job") private String job;
      @JsonProperty("y") private String y;
      @JsonProperty("marital") private String marital;
      @JsonProperty("education") private String education;
      @JsonProperty("housing") private String housing;
      @JsonProperty("loan") private String loan;
      @JsonProperty("contact") private String contact;
      @JsonProperty("month") private String month;
      @JsonProperty("duration") private int duration;
      @JsonProperty("campaign") private int campaign;
      @JsonProperty("pdays") private int pdays;
      @JsonProperty("previous") private String previous;
      @JsonProperty("poutcome") private String poutcome;
      @JsonProperty("euribor3m") private String euribor3m;
      @JsonProperty("day_of_week") private String dayOfWeek;
      @JsonProperty("emp.var.rate") private double empVarRate;
      @JsonProperty("cons.price.idx") private double consPriceIdx;
      @JsonProperty("cons.conf.idx") private double consConfIdx;
      @JsonProperty("nr.employed") private double nrEmployed;
      @JsonProperty("default") private String defaultValue;
    }

     

    Jackson Library 를 사용하여 Serializer 와 Deserializer 를 구현하였구요.

    이들을 BankClientSerde 클래스의 생성자에 사용합니다.

     

    Kafka Streams 구성하기.

    Kafka Streams 구성은 아래와 같습니다.

    새롭게 구성한 BankClientSerde 를 아래와 같이 Source 와 Sink 에 연결합니다.

    public class SerdeTest {
    
      public static void main (String[] args) throws InterruptedException {
        String sourceTopic = "bank.client";
        String sinkTopic = "bank.client.output";
        Serde<String> stringSerde = Serdes.String();
        Serde<BankClient> bankClientSerde = new BankClientSerde();
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, BankClient> source = builder.stream(sourceTopic, Consumed.with(stringSerde, bankClientSerde));
        source.to(sinkTopic, Produced.with(stringSerde, bankClientSerde));
    
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank-client-processor2");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        StreamsConfig config = new StreamsConfig(props);
        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.start();
        Thread.sleep(35000);
        streams.close();
      }
    }

     

     

    반응형
Designed by Tistory.