-
[Kafka-Streams] Json 기반 Custom Serdes 구현하기Kafka 2024. 2. 17. 17:49728x90반응형
- 목차
들어가며.
Kafka Streams 에서 JSON 기반의 데이터를 직렬화/역직렬화 할 수 있는 Custom Serdes 를 구현하는 방법에 대해서 알아보려고 합니다.
우선 이번 글에서 사용할 데이터는 Kaggle 에서 제공되는 Bank Client 데이터를 사용할 예정입니다.
https://www.kaggle.com/datasets/henriqueyamahata/bank-marketing
bank-additional-full.csv 파일의 간단한 형태는 아래와 같습니다.
+---+-----------+--------+-------------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+ |age| job| marital| education|default|housing|loan| contact|month|day_of_week|duration|campaign|pdays|previous| poutcome|emp.var.rate|cons.price.idx|cons.conf.idx|euribor3m|nr.employed| y| +---+-----------+--------+-------------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+ | 56| housemaid| married| basic.4y| no| no| no|telephone| may| mon| 261| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 57| services| married| high.school|unknown| no| no|telephone| may| mon| 149| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 37| services| married| high.school| no| yes| no|telephone| may| mon| 226| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 40| admin.| married| basic.6y| no| no| no|telephone| may| mon| 151| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 56| services| married| high.school| no| no| yes|telephone| may| mon| 307| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 45| services| married| basic.9y|unknown| no| no|telephone| may| mon| 198| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 59| admin.| married|professional.course| no| no| no|telephone| may| mon| 139| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 41|blue-collar| married| unknown|unknown| no| no|telephone| may| mon| 217| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 24| technician| single|professional.course| no| yes| no|telephone| may| mon| 380| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 25| services| single| high.school| no| yes| no|telephone| may| mon| 50| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 41|blue-collar| married| unknown|unknown| no| no|telephone| may| mon| 55| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 25| services| single| high.school| no| yes| no|telephone| may| mon| 222| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 29|blue-collar| single| high.school| no| no| yes|telephone| may| mon| 137| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 57| housemaid|divorced| basic.4y| no| yes| no|telephone| may| mon| 293| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 35|blue-collar| married| basic.6y| no| yes| no|telephone| may| mon| 146| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 54| retired| married| basic.9y|unknown| yes| yes|telephone| may| mon| 174| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 35|blue-collar| married| basic.6y| no| yes| no|telephone| may| mon| 312| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 46|blue-collar| married| basic.6y|unknown| yes| yes|telephone| may| mon| 440| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 50|blue-collar| married| basic.9y| no| yes| yes|telephone| may| mon| 353| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| | 39| management| single| basic.9y|unknown| no| no|telephone| may| mon| 195| 1| 999| 0|nonexistent| 1.1| 93.994| -36.4| 4.857| 5191| no| +---+-----------+--------+-------------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
Serdes 의 구현에 초점을 맞출 예정이라 데이터의 자세한 설명을 생략하도록 하겠습니다.
먼저 아래 링크는 Docker 로 Kafka Cluster 를 구축하는 예시를 작성한 페이지입니다.
해당 페이지의 내용을 토대로 간단하게 Kafka Cluster 생성이 필요하구요.
https://westlife0615.tistory.com/474
저는 bank.client 라는 이름의 토픽을 생성한 이후에 bank-additional-full.csv 의 Row 들을 Topic 에 추가하였습니다.
import pandas as pd import json from kafka import KafkaProducer data = pd.read_csv("/tmp/bank-additional-full.csv") producer = KafkaProducer( bootstrap_servers=["localhost:29091", "localhost:29092", "localhost:29093"], key_serializer=None, value_serializer=lambda x: json.dumps(x).encode("UTF-8") ) topic = "bank.client" for index, row in data.iterrows(): client = row.to_dict() producer.send(topic=topic, key=None, value=client) producer.flush() producer.close()
BankClientSerde 구현하기.
먼저 직렬화/역직렬화 결과인 BankClient 라는 Pojo 클래스와 Serializer, Deserializer 클래스를 생성해야합니다.
총 3개의 클래스가 필요하고 각각의 내용은 아래와 같습니다.
class BankClientSerde extends Serdes.WrapperSerde<BankClient> { public BankClientSerde() { super(new BankClientSerializer(), new BankClientDeserializer()); } } class BankClientDeserializer implements Deserializer<BankClient> { ObjectMapper objectMapper = new ObjectMapper(); @Override public BankClient deserialize(String topic, byte[] data) { try { return objectMapper.readValue(data, BankClient.class); } catch (IOException e) { throw new RuntimeException(e); } } } class BankClientSerializer implements Serializer<BankClient> { ObjectMapper objectMapper = new ObjectMapper(); @Override public byte[] serialize(String topic, BankClient data) { try { return objectMapper.valueToTree(data).toString().getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } } } @Getter @Setter @Builder class BankClient { @JsonProperty("age") private int age; @JsonProperty("job") private String job; @JsonProperty("y") private String y; @JsonProperty("marital") private String marital; @JsonProperty("education") private String education; @JsonProperty("housing") private String housing; @JsonProperty("loan") private String loan; @JsonProperty("contact") private String contact; @JsonProperty("month") private String month; @JsonProperty("duration") private int duration; @JsonProperty("campaign") private int campaign; @JsonProperty("pdays") private int pdays; @JsonProperty("previous") private String previous; @JsonProperty("poutcome") private String poutcome; @JsonProperty("euribor3m") private String euribor3m; @JsonProperty("day_of_week") private String dayOfWeek; @JsonProperty("emp.var.rate") private double empVarRate; @JsonProperty("cons.price.idx") private double consPriceIdx; @JsonProperty("cons.conf.idx") private double consConfIdx; @JsonProperty("nr.employed") private double nrEmployed; @JsonProperty("default") private String defaultValue; }
Jackson Library 를 사용하여 Serializer 와 Deserializer 를 구현하였구요.
이들을 BankClientSerde 클래스의 생성자에 사용합니다.
Kafka Streams 구성하기.
Kafka Streams 구성은 아래와 같습니다.
새롭게 구성한 BankClientSerde 를 아래와 같이 Source 와 Sink 에 연결합니다.
public class SerdeTest { public static void main (String[] args) throws InterruptedException { String sourceTopic = "bank.client"; String sinkTopic = "bank.client.output"; Serde<String> stringSerde = Serdes.String(); Serde<BankClient> bankClientSerde = new BankClientSerde(); StreamsBuilder builder = new StreamsBuilder(); KStream<String, BankClient> source = builder.stream(sourceTopic, Consumed.with(stringSerde, bankClientSerde)); source.to(sinkTopic, Produced.with(stringSerde, bankClientSerde)); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank-client-processor2"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091,localhost:29092,localhost:29093"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); StreamsConfig config = new StreamsConfig(props); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, config); streams.start(); Thread.sleep(35000); streams.close(); } }
반응형'Kafka' 카테고리의 다른 글
[Kafka] max.block.ms 알아보기 (0) 2024.02.18 [Kafka-Connect] Debezium MySQL Connector 구현하기 (0) 2024.02.18 [Kafka] Transaction Coordinator 알아보기 (0) 2024.02.07 [Kafka] Rebalance 가 발생하는 경우들 알아보기 ( Rebalance Scenario ) (0) 2024.02.04 [Kafka] Kafka Rebalance Protocol 알아보기 ( JoinGroup, LeaveGroup ) (0) 2024.02.04