ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka-Streams] mapValues 알아보기
    Kafka 2024. 1. 9. 05:22
    728x90
    반응형

     

    - 목차

     

    들어가며.

    이번 글에서는 kafka streams API 의 mapValues 의 기능에 대해서 알아보려고 합니다.

    mapValues API 는 kafka streams 의 Stateless 한 데이터 변형을 제공하는 기능입니다.

    이전 상태와 무관하가 현재 처리하는 데이터의 상태와 변형에 집중합니다.

     

    아래 링크는 Docker 로 Kafka Cluster 를 간단히 구축하는 내용을 적은 웹사이트의 링크입니다.

    https://westlife0615.tistory.com/474

     

    Docker 로 Kafka Cluster 구축해보기.

    - 목차 소개. 저는 로컬 환경에서 카프카 관련 테스트를 진행하는 경우가 많이 생기더군요. 그래서 docker-compose 를 활용하여 Kafka, ZooKeeper 클러스터를 구축하는 내용을 작성하려고 합니다. docker-com

    westlife0615.tistory.com

     

    저는 daily.climate 이라는 이름의 토픽을 생성하였습니다.

     

    테스트 데이터 생성하기.

    사용할 데이터는 Kaggle 에서 제공되는 날씨 데이터를 사용해볼 예정입니다.

    아래 링크는 일별 날씨 정보를 CSV 형식으로 제공하는 Kaggle 사이트의 주소입니다.

     

    https://www.kaggle.com/datasets/sumanthvrao/daily-climate-time-series-data

     

    Daily Climate time series data

    Daily climate data in the city of Delhi from 2013 to 2017

    www.kaggle.com

     

    daily_climate 데이터의 형식을 아래와 같습니다.

               date   meantemp   humidity  wind_speed  meanpressure
    0    2017-01-01  15.913043  85.869565    2.743478     59.000000
    1    2017-01-02  18.500000  77.222222    2.894444   1018.277778
    2    2017-01-03  17.111111  81.888889    4.016667   1018.333333
    3    2017-01-04  18.700000  70.050000    4.545000   1015.700000
    4    2017-01-05  18.388889  74.944444    3.300000   1014.333333
    ..          ...        ...        ...         ...           ...
    109  2017-04-20  34.500000  27.500000    5.562500    998.625000
    110  2017-04-21  34.250000  39.375000    6.962500    999.875000
    111  2017-04-22  32.900000  40.900000    8.890000   1001.600000
    112  2017-04-23  32.875000  27.500000    9.962500   1002.125000
    113  2017-04-24  32.000000  27.142857   12.157143   1004.142857

     

    아래의 파이썬 코드를 통해서 daily.climate 토픽으로 CSV 파일의 데이터들을 추가할 수 있습니다.

    import pandas as pd
    import json
    from kafka import KafkaProducer
    
    data = pd.read_csv(
        'https://storage.googleapis.com/kagglesdsdata/datasets/312121/636393/DailyDelhiClimateTest.csv?X-Goog-Algorithm=GOOG4-RSA-SHA256&X-Goog-Credential=gcp-kaggle-com%40kaggle-161607.iam.gserviceaccount.com%2F20240228%2Fauto%2Fstorage%2Fgoog4_request&X-Goog-Date=20240228T050249Z&X-Goog-Expires=259200&X-Goog-SignedHeaders=host&X-Goog-Signature=3f7a71155da6fd683486c013e8991d579709fff3f4d94fb935c29d2b5f7d6b1191f2f47d58fa147d6218a7c72705b2d1701d97b6906e657cc41a6781ececc1a3c98463ad84db3cae9d9deb8bf09656c67ef95a2fe500935aa17a975ae071ac88c85c078795d5e7ebafbb886c57c54ab53d0317510f1ec4fb3bf2dd90bb81befda959567e2457023123b75c805c9138df4f39a103ddb749dd144579ce0add068aa86796ccbc8dc028359cbb7c2516edcb9b82fc85ff40180f14d67d9b1fe657bc10d9705733fc35a7636c38f6f180012b138e4d17e4ce12c3c461287e67caac6498ac563b72606afc04e48eb1aeb80030dd11799f9672557838f87e48f35f36b0')
    producer = KafkaProducer(
        bootstrap_servers=["localhost:29091", "localhost:29092", "localhost:29093"],
        key_serializer=None,
        value_serializer=lambda x: json.dumps(x).encode("UTF-8")
    )
    topic = "daily.climate"
    for index, row in data.iterrows():
        climate = row.to_dict()
        producer.send(topic=topic, key=None, value=climate)
    producer.flush()
    producer.close()

     

     

    mapValues 사용하기.

    mapValues 는 KStream 의 데이터를 Stateless 한 방식으로 변형하는 기능입니다.

    mapValues 는 ValueMapper 함수형 인터페이스 또는 람다 함수를 입력으로 받으며,

    정의된 내용을 토대로 Row by Row 순서로 데이터를 변형합니다.

     

    아래 예시는 날씨 정보 중 섭씨로 표현되는 meantemp 값을 화씨 단위로 변형하는 간단한 예시입니다.

    package com.westlife.streams;
    
    import com.fasterxml.jackson.databind.*;
    import com.fasterxml.jackson.databind.node.*;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.*;
    import org.apache.kafka.streams.*;
    import org.apache.kafka.streams.kstream.*;
    
    import java.io.*;
    import java.util.*;
    
    public class MapValuesTest {
      public static void main(String[] args) throws InterruptedException {
        String sourceTopic = "daily.climate";
        Serde<String> stringSerde = Serdes.String();
        Serde<Climate> climateSerde = new ClimateSerde();
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, Climate> source = builder.stream(sourceTopic, Consumed.with(stringSerde, climateSerde));
        KStream<String, Climate> stream = source.mapValues(climate -> {
          climate.updateTemperature();
          return climate;
        });
        stream.print(Printed.toSysOut());
    
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "daily-climate-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        StreamsConfig config = new StreamsConfig(props);
        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.start();
        Thread.sleep(35000);
        streams.close();
    
      }
    
      static class ClimateSerde extends Serdes.WrapperSerde<Climate> {
        public ClimateSerde() {
          super(new ClimateSerializer(), new ClimateDeserializer());
        }
      }
    
      static class ClimateSerializer implements Serializer<Climate> {
        @Override
        public byte[] serialize(String topic, Climate data) {
          try {
            return data.toJson().toString().getBytes("UTF-8");
          } catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
          }
        }
      }
    
      static class ClimateDeserializer implements Deserializer<Climate> {
        private ObjectMapper mapper = new ObjectMapper();
        @Override
        public Climate deserialize(String topic, byte[] data) {
          try {
            ObjectNode node = (ObjectNode) mapper.readTree(data);
            return new Climate(node.get("date").asText(),
                    node.get("meantemp").asDouble(),
                    node.get("humidity").asDouble(),
                    node.get("wind_speed").asDouble(),
                    node.get("meanpressure").asDouble());
          } catch (IOException e) {
            throw new RuntimeException(e);
          }
        }
      }
    
      static class Climate {
        private String date;
        private Double meanTemp;
        private Double humidity;
        private Double windSpeed;
        private Double meanPressure;
    
        Climate(String date,
                Double meanTemp,
                Double humidity,
                Double windSpeed,
                Double meanPressure) {
          this.date = date;
          this.meanTemp = meanTemp;
          this.humidity = humidity;
          this.windSpeed = windSpeed;
          this.meanPressure = meanPressure;
        }
    
        public void updateTemperature() {
          //(0°C × 9/5) + 32
          this.meanTemp = this.meanTemp * (9/5) + 32;
        }
    
        public JsonNode toJson() {
          ObjectNode node = new ObjectNode(JsonNodeFactory.instance);
          node.put("date", this.date);
          node.put("meantemp", this.meanTemp);
          node.put("humidity", this.humidity);
          node.put("wind_speed", this.windSpeed);
          node.put("meanpressure", this.meanPressure);
          return node;
        }
      }
    }

     

     

    mapValues 는 Key 를 변경하지 않는다.

    mapValues 의 중요한 특징 중 하나는 Key 를 변형하지 않고 Value 만을 Transformation 한다는 점입니다.

    만약 Key 를 변형해야한다면 mapValues 보다는 map 을 사용하여야 합니다.

     

    반응형
Designed by Tistory.