-
Flink KeyedStream 알아보기Flink 2023. 10. 11. 23:33728x90반응형
- 목차
소개.
Flink 의 데이터 스트림들 중에서 KeyedStream 에 대해서 알아보려고 합니다.
Flink 의 데이터 스트림은 아래와 같은 종류들이 있습니다.
- DataStream
- KeyedStream
- WindowedStream
KeyedStream 은 Hash Partitioning 으로 데이터들이 분산되는 형태의 데이터스트림인데요.
데이터들이 Upstream -> Downstream 으로 흐르는 과정에서 Hash Algorithm 을 통해서 적절히 분산됩니다.
KeySelector 라는 java class 를 통해서 Hash Partitioning 을 구현할 수 있구요.
보통 데이터가 가지는 Primary Data 을 기준으로 Hash Partitioning 을 진행합니다.
KeySelector 는 "데이터가 가지는 Key 를 지정한다." 는 의미를 가집니다.
예를 들어보겠습니다.
아래와 같은 형식의 사용자 데이터가 있다고 가정하겠습니다.
각 데이터은 사용자의 위치 정보가 존재하고, 사용자가 이동하거나 위치를 변경할 때마다 센서를 통해서 데이터가 유입된다고 해봅시다.
그리고 비즈니스 요구사항은 "사용자의 위치 정보를 기반으로 동선 표현하기" 입니다.
이 상황에서 KeySelector 를 이러한 데이터에 적용하면 좋을까요?
수시로 변경되는 locationCoordinator 일까요 ??
제 생각에는 name 이 좋을 것 같습니다.
왜냐하면 사용자의 locationCoordinator 를 수집하는 것이 중요하니까요.
[ { "name" : "Andy", "age" : 31, "locationCoordinator" : "12,44" }, { "name" : "Kevin", "age" : 21, "locationCoordinator" : "112,424" }, { "name" : "Duke", "age" : 42, "locationCoordinator" : "12,434" }, { "name" : "Hayden", "age" : 11, "locationCoordinator" : "123,44" } ]
만약 "나이대별로 이동 동선을 구하자!" 라는 요구사항을 충족시켜야한다면,
age 는 KeySelector 의 대상으로 지정하는 것도 좋을 것입니다.
keyBy Patitioning.
Flink 에서 KeyedStream 을 생성하기 위해서는 크게 두가지 요소가 필요합니다.
keyBy 와 KeySelector 입니다.
keyBy 는 rebalance, forward, broadcast 처럼 파티셔닝을 위한 도구이자 함수입니다.
다만 keyBy 는 Hash 파티셔닝을 위한 도구이므로 Hashing 을 위한 데이터 또한 필요합니다.
Hashing 을 정의하는 것이 바로 KeySelector 입니다.
코드로 예시를 들어보겠습니다.
사용자의 이름 기반으로 파티셔닝해주세요 !
Pseudo Code 로 작성한 예시입니다.
아래는 사용자 이름을 기반으로 Hash Partitioning 을 한 예시입니다.
userLocationStream .keyBy((user) -> { return user.getName(); })
사용자의 나이대를 기반으로 파티셔닝해주세요!
Pseudo Code 로 작성한 예시입니다.
아래는 사용자의 나이대를 기반으로 Hash Partitioning 을 한 예시입니다.
아래와 같은 형식으로 10대, 20대 등과 같이 나이대를 기준으로 파티셔닝이 가능합니다.
userLocationStream .keyBy((user) -> { return Math.floor(user.getAge() / 10); })
관련 예시.
아래는 실제 KeyedStream 의 예시 코드입니다.
java 11, Flink 1.13.6 버전에서 진행하였습니다.
아래 예시는 Source -> Map 으로 구성된 데이터스트림입니다.
Source 1초 주기로 A ~ Z 까지의 알파벳을 생성합니다.
Map 은 단순히 KeyBy Partitioning 이 잘되었는지 확인하는 용도로 로그를 남기고 있습니다.
<데이터 스트림 구조>
<코드 예시>
package org.example.job; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.taskmanager.RuntimeEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; import java.util.stream.Collectors; import java.util.stream.IntStream; public class KeyedStreamTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(3); DataStreamSource<String> source = env.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> ctx) throws Exception { while (true) { Thread.sleep(1000); String randomAlphabet = IntStream.range(65, 91).mapToObj(a -> (char) a).collect(Collectors.toList()).get(Math.abs(new Random().nextInt()) % ((91 - 64 - 1))).toString(); ctx.collect(randomAlphabet); } } @Override public void cancel() { } }); KeyedStream<String, String> keyedStream = source.name("source").uid("source").keyBy(value -> String.valueOf(value.charAt(0))); DataStream<String> stream = keyedStream.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { System.out.printf("value is %s, HashCode is %s \n", value, this.hashCode()); return value; } }); env.execute(); } }
<출력 결과>
3개의 Subtask들이 존재합니다.
그래서 Hash 코드는 3개 출력됩니다.
즉, 파티셔닝된 데이터들은 3개의 Subtask 로 전달됩니다.
value is A, HashCode is 258302077 value is U, HashCode is 987626891 value is W, HashCode is 258302077 value is O, HashCode is 258302077 value is B, HashCode is 987626891 value is N, HashCode is 1224977671 value is L, HashCode is 1224977671 value is S, HashCode is 1224977671 value is Q, HashCode is 987626891 value is D, HashCode is 258302077 value is W, HashCode is 258302077 value is J, HashCode is 987626891 value is Y, HashCode is 1224977671 value is U, HashCode is 987626891 value is T, HashCode is 1224977671 value is J, HashCode is 987626891 value is R, HashCode is 258302077 value is U, HashCode is 987626891 value is L, HashCode is 1224977671 value is Q, HashCode is 987626891 value is Z, HashCode is 1224977671 value is Y, HashCode is 1224977671 value is D, HashCode is 258302077 value is U, HashCode is 987626891 value is H, HashCode is 1224977671 value is Y, HashCode is 1224977671 value is O, HashCode is 258302077 value is X, HashCode is 258302077 value is W, HashCode is 258302077 value is H, HashCode is 1224977671 value is C, HashCode is 987626891 value is L, HashCode is 1224977671 value is Y, HashCode is 1224977671
아래 결과는 동일한 Character 들을 묶어서 표시해보았습니다.
"U" 는 HashCode 가 987626891 인 Subtask 로,
"L" 는 HashCode 가 1224977671 인 Subtask 로,
"W" 는 HashCode 가 258302077 인 Subtask 로 파티셔닝됩니다.
value is U, HashCode is 987626891 value is U, HashCode is 987626891 value is U, HashCode is 987626891 value is U, HashCode is 987626891 value is W, HashCode is 258302077 value is W, HashCode is 258302077 value is W, HashCode is 258302077 value is L, HashCode is 1224977671 value is L, HashCode is 1224977671 value is L, HashCode is 1224977671
<3개의 Subtask 로 파티셔닝되는 상태>
아래 이미지는 3개의 Subtask 로 파티셔닝되는 수치를 보여주는 이미지입니다.
좌측, Records Received 가 데이터를 전달받은 수치인데요.
골고루 분배된 모습을 확인할 수 있네요.
Hash Algorithm.
Hash 알고리즘은 연속적인 데이터 또는 이산적인 데이트에 상관없이
큰 규모의 데이터를 지정된 크기의 데이터로 변환합니다.
Hash 알고리즘의 핵심은 동일한 Input 에 대한 Output 이 항상 일정하도록 보장하구요.
이러한 성격을 Deterministic 이라고 부릅니다.
Flink 와 같이 내결함성을 제공하는 빅데이터를 처리하는 환경의 경우에 Crash 와 Recovery 가 빈번히 발생하는데요.
Recovery 이후에 데이터 처리 프로세스가 재시작되더라고 멱등성있게 처리되어야합니다.
(멱등성이란 몇 번을 재시도하더라고 결과가 동일해야합니다. )
Hash 의 Deterministic 한 특성은 Flink 의 멱등성에 필수적인 요소입니다.
다만, Hash 의 특성상 Input 의 사이즈와 Output 의 사이즈는 동일하지 않습니다.
그래서 다른 Input 에 대한 hashing 결과가 동일해지는 특징 또한 있습니다.
반응형'Flink' 카테고리의 다른 글
Flink State 알아보기 (0) 2024.01.10 Flink Watermark 알아보기 (0) 2024.01.10 Flink Checkpoint 알아보기 (0) 2024.01.10 [Flink] 바이너리 파일 실행하기 (Binary Execution File) (0) 2023.12.29 Flink Parquet FileSink 알아보기 (0) 2023.10.04