-
[Kafka-Streams] mapValues 알아보기Kafka 2024. 1. 9. 05:22728x90반응형
- 목차
들어가며.
이번 글에서는 kafka streams API 의 mapValues 의 기능에 대해서 알아보려고 합니다.
mapValues API 는 kafka streams 의 Stateless 한 데이터 변형을 제공하는 기능입니다.
이전 상태와 무관하가 현재 처리하는 데이터의 상태와 변형에 집중합니다.
아래 링크는 Docker 로 Kafka Cluster 를 간단히 구축하는 내용을 적은 웹사이트의 링크입니다.
https://westlife0615.tistory.com/474
저는 daily.climate 이라는 이름의 토픽을 생성하였습니다.
테스트 데이터 생성하기.
사용할 데이터는 Kaggle 에서 제공되는 날씨 데이터를 사용해볼 예정입니다.
아래 링크는 일별 날씨 정보를 CSV 형식으로 제공하는 Kaggle 사이트의 주소입니다.
https://www.kaggle.com/datasets/sumanthvrao/daily-climate-time-series-data
daily_climate 데이터의 형식을 아래와 같습니다.
date meantemp humidity wind_speed meanpressure 0 2017-01-01 15.913043 85.869565 2.743478 59.000000 1 2017-01-02 18.500000 77.222222 2.894444 1018.277778 2 2017-01-03 17.111111 81.888889 4.016667 1018.333333 3 2017-01-04 18.700000 70.050000 4.545000 1015.700000 4 2017-01-05 18.388889 74.944444 3.300000 1014.333333 .. ... ... ... ... ... 109 2017-04-20 34.500000 27.500000 5.562500 998.625000 110 2017-04-21 34.250000 39.375000 6.962500 999.875000 111 2017-04-22 32.900000 40.900000 8.890000 1001.600000 112 2017-04-23 32.875000 27.500000 9.962500 1002.125000 113 2017-04-24 32.000000 27.142857 12.157143 1004.142857
아래의 파이썬 코드를 통해서 daily.climate 토픽으로 CSV 파일의 데이터들을 추가할 수 있습니다.
import pandas as pd import json from kafka import KafkaProducer data = pd.read_csv( 'https://storage.googleapis.com/kagglesdsdata/datasets/312121/636393/DailyDelhiClimateTest.csv?X-Goog-Algorithm=GOOG4-RSA-SHA256&X-Goog-Credential=gcp-kaggle-com%40kaggle-161607.iam.gserviceaccount.com%2F20240228%2Fauto%2Fstorage%2Fgoog4_request&X-Goog-Date=20240228T050249Z&X-Goog-Expires=259200&X-Goog-SignedHeaders=host&X-Goog-Signature=3f7a71155da6fd683486c013e8991d579709fff3f4d94fb935c29d2b5f7d6b1191f2f47d58fa147d6218a7c72705b2d1701d97b6906e657cc41a6781ececc1a3c98463ad84db3cae9d9deb8bf09656c67ef95a2fe500935aa17a975ae071ac88c85c078795d5e7ebafbb886c57c54ab53d0317510f1ec4fb3bf2dd90bb81befda959567e2457023123b75c805c9138df4f39a103ddb749dd144579ce0add068aa86796ccbc8dc028359cbb7c2516edcb9b82fc85ff40180f14d67d9b1fe657bc10d9705733fc35a7636c38f6f180012b138e4d17e4ce12c3c461287e67caac6498ac563b72606afc04e48eb1aeb80030dd11799f9672557838f87e48f35f36b0') producer = KafkaProducer( bootstrap_servers=["localhost:29091", "localhost:29092", "localhost:29093"], key_serializer=None, value_serializer=lambda x: json.dumps(x).encode("UTF-8") ) topic = "daily.climate" for index, row in data.iterrows(): climate = row.to_dict() producer.send(topic=topic, key=None, value=climate) producer.flush() producer.close()
mapValues 사용하기.
mapValues 는 KStream 의 데이터를 Stateless 한 방식으로 변형하는 기능입니다.
mapValues 는 ValueMapper 함수형 인터페이스 또는 람다 함수를 입력으로 받으며,
정의된 내용을 토대로 Row by Row 순서로 데이터를 변형합니다.
아래 예시는 날씨 정보 중 섭씨로 표현되는 meantemp 값을 화씨 단위로 변형하는 간단한 예시입니다.
package com.westlife.streams; import com.fasterxml.jackson.databind.*; import com.fasterxml.jackson.databind.node.*; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.*; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.*; import java.io.*; import java.util.*; public class MapValuesTest { public static void main(String[] args) throws InterruptedException { String sourceTopic = "daily.climate"; Serde<String> stringSerde = Serdes.String(); Serde<Climate> climateSerde = new ClimateSerde(); StreamsBuilder builder = new StreamsBuilder(); KStream<String, Climate> source = builder.stream(sourceTopic, Consumed.with(stringSerde, climateSerde)); KStream<String, Climate> stream = source.mapValues(climate -> { climate.updateTemperature(); return climate; }); stream.print(Printed.toSysOut()); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "daily-climate-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsConfig config = new StreamsConfig(props); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, config); streams.start(); Thread.sleep(35000); streams.close(); } static class ClimateSerde extends Serdes.WrapperSerde<Climate> { public ClimateSerde() { super(new ClimateSerializer(), new ClimateDeserializer()); } } static class ClimateSerializer implements Serializer<Climate> { @Override public byte[] serialize(String topic, Climate data) { try { return data.toJson().toString().getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } } } static class ClimateDeserializer implements Deserializer<Climate> { private ObjectMapper mapper = new ObjectMapper(); @Override public Climate deserialize(String topic, byte[] data) { try { ObjectNode node = (ObjectNode) mapper.readTree(data); return new Climate(node.get("date").asText(), node.get("meantemp").asDouble(), node.get("humidity").asDouble(), node.get("wind_speed").asDouble(), node.get("meanpressure").asDouble()); } catch (IOException e) { throw new RuntimeException(e); } } } static class Climate { private String date; private Double meanTemp; private Double humidity; private Double windSpeed; private Double meanPressure; Climate(String date, Double meanTemp, Double humidity, Double windSpeed, Double meanPressure) { this.date = date; this.meanTemp = meanTemp; this.humidity = humidity; this.windSpeed = windSpeed; this.meanPressure = meanPressure; } public void updateTemperature() { //(0°C × 9/5) + 32 this.meanTemp = this.meanTemp * (9/5) + 32; } public JsonNode toJson() { ObjectNode node = new ObjectNode(JsonNodeFactory.instance); node.put("date", this.date); node.put("meantemp", this.meanTemp); node.put("humidity", this.humidity); node.put("wind_speed", this.windSpeed); node.put("meanpressure", this.meanPressure); return node; } } }
mapValues 는 Key 를 변경하지 않는다.
mapValues 의 중요한 특징 중 하나는 Key 를 변형하지 않고 Value 만을 Transformation 한다는 점입니다.
만약 Key 를 변형해야한다면 mapValues 보다는 map 을 사용하여야 합니다.
반응형'Kafka' 카테고리의 다른 글
Kafka Log Compaction 알아보기 (0) 2024.01.12 [ Kafka Producer ] 불안정한 네트워크에서 데이터 생성하기 ( Acks, Retries ) (0) 2024.01.09 [Kafka-Connect] S3 Sink Connector 따라해보기 1 (0) 2024.01.08 [Kafka-Streams] KStream 알아보기 (0) 2024.01.06 [Kafka] Key Partitioner 알아보기 (0) 2024.01.05