ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • kafka consumer
    Kafka 2023. 3. 2. 07:51
    728x90
    반응형

    개요

    kafka consumer 는 카프카 메세지를 소비하는 대상입니다.
    카프카는 토픽이라는 영역에 메시지를 저장하게 되는데
    토픽에 차곡차곡 저장된 메시지를 kafka consumer 는 하나씩 조회할 수 있습니다.

    토픽은 파티션이라는 단위로 나뉘어져 있습니다.
    대개 하나의 토픽은 둘 이상의 파티션으로 나뉘고,
    카프카 메시지는 각각의 파티션에 나뉘어 저장이 되죠.

    이때 kafka consumer 는 단독으로 하나의 토픽을 소비하기보단 파티션의 갯수만큼 생성을 하고 파티션 수와 consumer 의 수를 1:1로 매칭합니다.
    그래서 파티션의 수만큼 동시 처리가 가능해집니다.

    파티션의 수 만큼 카프카 소비자가 많아지면 이 소비자들을 관리하기 위한 단위가 필요해집니다.
    이를 consumer group 이라고 부릅니다.
    카프카 소비자가 카프카 브로커와 연결될 때, 여러가지 설정값들이 필요한데, 연결하는 과정에서 consumer group 이름을 설정합니다.
    그리고 카프카 브로커는 동일한 consumer group 명을 가진 소비자들을 관리합니다.

    partition

    토픽은 파티션으로 구성됩니다.
    그리고 replication 설정에 의해서 원본 파티션과 복제 파티션으로 나뉘는데요.
    이를 리더 파티션과 팔로워 파티션이라고 부릅니다.

    카프카 생산자가 메시지를 브로커로 전송하면 리더 파티션에 해당 메시지가 저장되고 팔로워 파티션에 순차적으로 저장이 됩니다.
    replication-factor 의 값, In Sync Replica 의 값, sync 또는 async 설정에 따라 메시지가 복제되는 과정을 조금씩 다르지만 결과적으로 리더 파티션에서의 데이터 생성이 우선되며 가장 중요합니다.

    이러한 이유로 데이터의 정교함을 위하여 모든 카프카 생산자와 소비자는 리더 파티션와 연결됩니다.
    팔로워 파티션으로 데이터에 직접적으로 연결되진 읺습니다.
    그리고 보통 리더 파티션은 브로커마다 하나씩 존재하게 되며, 브로커와 소비자가 1:1로 매칭될 수 있습니다.
    그래서 카프카 메시지의 소비 속도를 향상시키기 위해서는 토픽의 파티션의 수가 많을수록 속도는 향상됩니다.

    kafka consumer option

    카프카 consumer 를 생성하고 카프카 브로커와 연결학 위한 설정들입니다.

    bootstrap.servers

    카프카 클러스터에 연결하기 위한 브로커의 연결 정보입니다.
    호스트:포트 와 같이 구성됩니다.
    카프카 클러스터의 브로커가 두개 이상이라고 했을 때, 호스트:포트,호스트:포트,… 와 같은 형식으로 연결 정보를 이어 작성할 수 있습니다.
    사실상 하나의 호스트:포트 만을 사용해도 무방하지만, 클러스터의 가용성을 높이는 관점에서 여러개의 연결 정보를 사용합니다.

    group.id

    컨슈머 그룹을 지정하기 위한 옵션입니다.
    컨슈머 그룹의 이름을 명시할 수 있고, 해당 그룹명을 토대로 오프셋이 관리됩니다.

    컨슈머 그룹은 여러가지 목적으로 활용됩니다.

    1. 특정 토픽의 소비된 오프셋을 관리할 수 있습니다.
    컨슈머 그룹 A의 인스턴스들이 메시지를 소비하는 과정에서 서버가 재시작되는 경우,
    마지막으로 저장된 오프셋으로부터 메시지를 다시 컨슘할 수 있습니다.

    2. 컨슈머 그룹 내에 컨슈머가 여러개인 경우, 파티션 별로 컨슘을 나눠서 진행할 수 있다.
    토픽의 리더 파티션이 생산자&소비자와 직접적으로 연결됩니다.
    그렇기 때문에 파티션의 갯수만큼 컨슈머의 갯수를 유지한다면 효율적인 메시지 소비를 할 수 있습니다.

    auto.commit.reset

    consume 시에 offset 의 reset 여부를 결정하는 옵션입니다. 

    과거에 특정 consumer group 으로 consume 을 진행한 경우, 해당 consumer group 이름으로 offset 기록이 유지됩니다. 

    그리고 다시 consume 을 시도할 때, 기록된 offset 부터 메시지를 consume 하게 됩니다. 

    만약 기록된 offset 과 무관하게 consume 을 진행하고 싶다면 auto.commit.reset 옵션을 활용할 수 있습니다. 

    "earliest", "latest" 두개의 값이 존재하며,

    "earliest" 는 토픽의 첫 메시지부터 consume 을 진행할 경우

    "latest" 는 새로 추가될 메시지부터 consume 을 진행할 경우에 활용하면 됩니다.

    어떠한 설정도 하지 않은 경우에는 기존과 같이 offset 부터 consume 이 진행됩니다. 

    heartbeat.interval.ms

    클러스터 환경은 보통 마스터슬레이브들로 구성됩니다. 

    슬레이브들이 실질적인 태스크를 진행하며, 마스터는 슬레이브들을 관리하죠. 

    어떠한 장애가 발생한 슬레이브가 발견된다면, 마스터는 새로운 슬레이브를 만들거나 알림을 통해서 해당 장애 상황을 대처합니다. 

    이때, 슬레이브들을 마스터에 주기적인 heartbeat 신호를 보내어 자신의 상태를 보고하게 됩니다. 

     

    kafka consumer 가 consumer group 을 통해서 카프카 클러스터와 연결되면 해당 consumer 들은 카프카 클러스터의 관리 대상이 됩니다. 

    그리고 consume 작업을 제대로 수행하고 있는지 관리받게 되죠. 

    이 상황에서 consumer 들을 카프카 클러스터에게 heartbeat 를 보내어 자신의 상태를 보고합니다. 

    이 heartbeat 주기가 heartbeat.interval.ms 옵션입니다. 

     

    만약 하나의 컨슈머가 heartbeat.interval.ms 동안 장애가 발생한다면,

    장애가 발생한 컨슈머가 담당하던 파티션의 consume 은 다른 컨슈머가 담당하게 하는 일련의 대처가 진행됩니다. 

    이를 리밸런싱이라고 합니다. 

    session.timeout.ms

    session.timeout.ms 는 카프카 클러스터가 consumer 가 살아있는지 체크하는 주기입니다. 

    consumer 가 카프카 클러스터에게 session.timeout.ms 동안 heartbeat 를 보내지 않는다면,

    카프카 클러스터는 해당 consumer 가 장애가 있다고 판단합니다. 

    그리고 리밸런싱을 진행합니다. 

    이 리밸런싱이 consumer group 내부의 인스턴스에 행해지는 장애 대처 방식인데요.  

    장애가 발생한 consumer 가 담당하던 파티션을 다른 consumer 가 담당하게 됩니다. 

     

    예를 들어, 

     

    PartitionA <--> ConsumerA

    PartitionB <--> ConsumerB 

    인 상황에서 ConsumerA 가 session.timeout.ms 동안 heartbeat 가 발생하지 않는다면, 

     

    PartitionA <--> ConsumerB 

    PartitionB <--> ConsumerB 

    로 리밸런싱 됩니다. 

     

    보통 session.interval.ms 는 heartbeat.timeout.ms 와 함께 고려되며, 

    heartbeat.timeout.ms 는 session.interval.ms 값보다 작게 설정되어야 합니다. 

     

    auto.commit.interval.ms

    consumer group 은 메시지를 소비하는 과정에서 최종적으로 소비된 offset 을 카프카 클러스터에게 보고해야합니다. 

    그리고 클러스터는 consumer group 마다 소비된 offset 을 기록합니다. 

    이러한 행위를 offset 을 commit 한다고 표현합니다. 

     

    offset commit 은 자동으로 진행될 수 있는데, 

    이러한 설정이 auto.commit.interval.ms 입니다.

    설정된 주기마다 offset 이 자동으로 commit 됩니다.

     

    반응형
Designed by Tistory.