ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] API Version 알아보기 ( Protocol )
    Kafka 2024. 1. 31. 06:11
    728x90
    반응형

    - 목차

     

    들어가며.

    이번 글에서는 Kafka 에서 클라이언트와 브로커가 통신을 수행하기 위해서 사용되는 API Version 에 대해서 알아보려고 합니다.

    일반적으로 Broker 의 버전과 API Version 은 동일하게 사용하는 것이 권장됩니다.

    하지만 API Version 을 명시하지 않아도 Kafka Wire Protocol 상에서 적절한 API Version 을 추론할 수 있습니다.

    그래서 Kafka Client 모듈을 사용할 때에 API Version 을 명시하지 않아도 Consumer 와 Producer 를 쉽게 사용할 수 있죠.

     

    Confluent Version & Kafka Version.

    아래 페이지는 Confluent 의 버전과 카프카 버전에 대한 정보를 제공합니다.

    https://docs.confluent.io/platform/current/installation/versions-interoperability.html

     

    Supported Versions and Interoperability for Confluent Platform | Confluent Documentation

    Each version of Confluent Platform includes several component services which are only compatible in that specific Confluent Platform version. There are exceptions, including clients and Confluent Control Center, which can be used across versions. All servi

    docs.confluent.io

     

    위 페이지는 아래의 테이블과 같이 Confluent 버전과 Kafka 버전의 상응 관계를 제공합니다.

    그래서 Confluent 기반의 소스파일이나 도커이미지를 사용한다면 위 페이지를 통해서 Kafka Version 을 확인할 수 있죠.

    Confluent Kafka Release Date Standard End of Support Platinum End of Supprt
    7.6.x 3.6.x February 9, 2024 February 9, 2026 February 9, 2027
    7.5.x 3.5.x August 25, 2023 August 25, 2025 August 25, 2026
    7.4.x 3.4.x May 3, 2023 May 3, 2025 May 3, 2026
    7.3.x 3.3.x November 4, 2022 November 4, 2024 November 4, 2025

     

     

    API Version Compatibility 테스트.

    Docker 로 카프카 실행하기.

    아래 페이지의 내용을 통해서 Docker 를 통해 Kafka 브로커들을 실행할 수 있습니다.

    저는 API Version 과 관련된 테스트를 위해서 Confluent 7.4.3 버전 & Kafka 3.3.X 버전을 실행합니다.

     

    https://westlife0615.tistory.com/474

     

    Docker 로 Kafka Cluster 구축해보기.

    - 목차 소개. 저는 로컬 환경에서 카프카 관련 테스트를 진행하는 경우가 많이 생기더군요. 그래서 docker-compose 를 활용하여 Kafka, ZooKeeper 클러스터를 구축하는 내용을 작성하려고 합니다. docker-com

    westlife0615.tistory.com

     

    위 링크의 가이드를 따라하시면, 아래와 같이 카프카 클러스터를 실행됩니다.

     

    그리고 http://localhost:9000 으로 접속하여 Kafdrop 에서 "test_topic" 이름의 토픽을 생성합니다.

     

     

    Kafka Client 실행하기.

    아래의 파이썬 프로그램은 기본적인 KafkaProducer 예시 코드입니다.

    KafkaProducer 는 기본적으로 어떤 버전의 API_VERSION 을 사용하면 좋을지 자동적으로 추론합니다.

    from kafka import KafkaProducer
    import json
    
    bootstrap_servers = ['localhost:29091', 'localhost:29092', 'localhost:29093']
    key_serializer = None
    value_serializer = lambda v: json.dumps(v).encode("utf-8")
    
    producer = KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        key_serializer=key_serializer,
        value_serializer=value_serializer,
    )
    
    print(producer.config["api_version"])

     

    저는 버전 3.3.0 의 Kafka Broker 를 사용하고 있구요.

    2.5.0 버전의 api_version 이 자동적으로 설정됩니다.

    (2, 5, 0)

     

    kafka-python 모듈의 경우에는 conn.py 파일 내부에 check_version 함수를 활용합니다.

    이 과정에서 Client 는 Broker 에게 사용가능한 API VERSION 리스트를 얻을 수 있습니다.

    그리고 API Version 목록들 중에서 적합한 하나의 API Version 을 결정하고 그 API 를 사용합니다.

     

    < conn.py  >

    if f.succeeded():
        if isinstance(request, ApiVersionRequest[0]):
            # Starting from 0.10 kafka broker we determine version
            # by looking at ApiVersionResponse
            api_versions = self._handle_api_version_response(f.value)
            version = self._infer_broker_version_from_api_versions(api_versions)
        log.info('Broker version identified as %s', '.'.join(map(str, version)))
        log.info('Set configuration api_version=%s to skip auto'
                 ' check_version requests on startup', version)
        break

     

     

    반응형
Designed by Tistory.