ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka Ksql 구현하기
    Kafka 2023. 12. 11. 06:58
    728x90
    반응형

     

    - 목차

     

     

    소개.

    Docker 를 활용하여 간단한 KSQL 을 구현해보도록 하겠습니다.

    KSQL 도커 이미지로 confluentinc/cp-ksqldb-server:7.4.3 를 사용할 예정입니다.

     

    https://hub.docker.com/r/confluentinc/cp-ksqldb-server/tags

     

    Docker

     

    hub.docker.com

     

     

     

    1. kafka docker compose yaml 파일을 생성합니다.

    먼저 docker-compose 설정을 해보겠습니다.

    docker-compose 의 구성요소는 아래와 같습니다.

    - kafka

    - zookeeper

    - kafdrop

    - ksql-server

     

    cat <<EOF> /tmp/kafka-docker-compose.yaml
    version: '2'
    services:
      kafdrop:
        image: obsidiandynamics/kafdrop:4.0.1
        container_name: kafdrop
        restart: "no"
        ports:
          - "9000:9000"
        environment:
          KAFKA_BROKERCONNECT: "kafka:9092"
        depends_on:
          - "kafka"
        networks:
          - kafka
      zookeeper:
        image: confluentinc/cp-zookeeper:7.4.3
        container_name: zookeeper
        environment:
          ZOOKEEPER_SERVER_ID: 1
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
          ZOOKEEPER_INIT_LIMIT: 5
          ZOOKEEPER_SYNC_LIMIT: 2
        ports:
          - "22181:2181"
        networks:
          - kafka      
      kafka:
        image: confluentinc/cp-kafka:7.4.3
        container_name: kafka    
        depends_on:
          - zookeeper
        ports:
          - "29092:9092"
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_MESSAGE_MAX_BYTES: 10000000
          KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200
          KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1      
        networks:
          - kafka
      ksql-server:
        image: confluentinc/cp-ksqldb-server:7.4.3
        container_name: ksql-server
        hostname: ksql-server
        ports:
          - "8088:8088"
        environment:
          KSQL_CONFIG_DIR: "/etc/ksql"
          KSQL_BOOTSTRAP_SERVERS: kafka:9092
          KSQL_KSQL_STREAMS_BOOTSTRAP_SERVERS: kafka:9092
        volumes:
          - /tmp/ksql-config:/etc/ksql
        networks:
          - kafka
    
    networks:
      kafka:
        driver: bridge
    EOF

     

     

    2. kafka docker compose 를 실행합니다.

    docker-compose -f /tmp/kafka-docker-compose.yaml --project-name kafka up -d

     

    ksql CLI 시작하기.

    Topic 생성하기.

    먼저 kafdrop 을 통해서 Topic 을 생성합니다.

    Topic 의 이름은 test_topic 으로 설정하였습니다.

     

    아래 명령어를 통해서 kafka-console-producer 를 활성화시킨 후, json 형식의 메시지를 추가합니다.

    docker exec -it kafka \
    kafka-console-producer --bootstrap-server kafka:9092 --topic test_topic
    {"id": 1, "name": "Andy", "age": 31}
    {"id": 2, "name": "Bob", "age": 32}
    {"id": 3, "name": "Chris", "age": 33}
    {"id": 4, "name": "Dennis", "age": 34}
    {"id": 5, "name": "Emily", "age": 35}

     

    아래 이미지처럼 Message 들이 추가됩니다.

     

    ksql Stream, Table 생성.

    아래 명령어를 통해서 ksql CLI 로 진입할 수 있습니다.

    docker run --rm --network kafka_kafka -it confluentinc/cp-ksqldb-server:7.4.3 ksql http://ksql-server:8088
                      
                      ===========================================
                      =       _              _ ____  ____       =
                      =      | | _____  __ _| |  _ \| __ )      =
                      =      | |/ / __|/ _` | | | | |  _ \      =
                      =      |   <\__ \ (_| | | |_| | |_) |     =
                      =      |_|\_\___/\__, |_|____/|____/      =
                      =                   |_|                   =
                      =        The Database purpose-built       =
                      =        for stream processing apps       =
                      ===========================================
    
    Copyright 2017-2022 Confluent Inc.
    
    CLI v7.4.3, Server v7.4.3 located at http://ksql-server:8088
    Server Status: RUNNING
    
    Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

     

    Stream 과 Table 을 생성합니다.

     

    CREATE STREAM registration_stream (
      id int,
      name varchar,
      age int
    ) WITH (
      kafka_topic='test_topic',
      value_format='JSON'
    );
    
    CREATE TABLE user_table AS
    SELECT
      name,
      COUNT(*) AS name_count
    FROM
      registration_stream
    GROUP BY
      name;
    CREATE STREAM REGISTRATION_STREAM (ID INTEGER, NAME STRING, AGE INTEGER) WITH (CLEANUP_POLICY='delete', KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');
     Message        
    ----------------
     Stream created 
    ----------------
    
    CREATE TABLE USER_TABLE WITH (CLEANUP_POLICY='compact', KAFKA_TOPIC='USER_TABLE', PARTITIONS=1, REPLICAS=1, RETENTION_MS=604800000) AS SELECT
      REGISTRATION_STREAM.NAME NAME,
      COUNT(*) NAME_COUNT
    FROM REGISTRATION_STREAM REGISTRATION_STREAM
    GROUP BY REGISTRATION_STREAM.NAME
    EMIT CHANGES;
     Message                                 
    -----------------------------------------
     Created query with ID CTAS_USER_TABLE_3 
    -----------------------------------------

     

    select * from USER_TABLE limit 5;
    +--------------------------+---------------------------+
    |NAME                      |NAME_COUNT                                                                                               |
    +--------------------------+---------------------------+
    |Andy                      |1                                                                                                        |
    |Bob                       |1                                                                                                        |
    |Chris                     |1                                                                                                        |
    |Dennis                    |1                                                                                                        |
    |Emily                     |1                                                                                                        |
    Query terminated

     

    반응형
Designed by Tistory.