-
Kafka Ksql 구현하기Kafka 2023. 12. 11. 06:58728x90반응형
- 목차
소개.
Docker 를 활용하여 간단한 KSQL 을 구현해보도록 하겠습니다.
KSQL 도커 이미지로 confluentinc/cp-ksqldb-server:7.4.3 를 사용할 예정입니다.
https://hub.docker.com/r/confluentinc/cp-ksqldb-server/tags
1. kafka docker compose yaml 파일을 생성합니다.
먼저 docker-compose 설정을 해보겠습니다.
docker-compose 의 구성요소는 아래와 같습니다.
- kafka
- zookeeper
- kafdrop
- ksql-server
cat <<EOF> /tmp/kafka-docker-compose.yaml version: '2' services: kafdrop: image: obsidiandynamics/kafdrop:4.0.1 container_name: kafdrop restart: "no" ports: - "9000:9000" environment: KAFKA_BROKERCONNECT: "kafka:9092" depends_on: - "kafka" networks: - kafka zookeeper: image: confluentinc/cp-zookeeper:7.4.3 container_name: zookeeper environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_INIT_LIMIT: 5 ZOOKEEPER_SYNC_LIMIT: 2 ports: - "22181:2181" networks: - kafka kafka: image: confluentinc/cp-kafka:7.4.3 container_name: kafka depends_on: - zookeeper ports: - "29092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_SOCKET_REQUEST_MAX_BYTES: 100001200 KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 10000000 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 networks: - kafka ksql-server: image: confluentinc/cp-ksqldb-server:7.4.3 container_name: ksql-server hostname: ksql-server ports: - "8088:8088" environment: KSQL_CONFIG_DIR: "/etc/ksql" KSQL_BOOTSTRAP_SERVERS: kafka:9092 KSQL_KSQL_STREAMS_BOOTSTRAP_SERVERS: kafka:9092 volumes: - /tmp/ksql-config:/etc/ksql networks: - kafka networks: kafka: driver: bridge EOF
2. kafka docker compose 를 실행합니다.
docker-compose -f /tmp/kafka-docker-compose.yaml --project-name kafka up -d
ksql CLI 시작하기.
Topic 생성하기.
먼저 kafdrop 을 통해서 Topic 을 생성합니다.
Topic 의 이름은 test_topic 으로 설정하였습니다.
아래 명령어를 통해서 kafka-console-producer 를 활성화시킨 후, json 형식의 메시지를 추가합니다.
docker exec -it kafka \ kafka-console-producer --bootstrap-server kafka:9092 --topic test_topic
{"id": 1, "name": "Andy", "age": 31} {"id": 2, "name": "Bob", "age": 32} {"id": 3, "name": "Chris", "age": 33} {"id": 4, "name": "Dennis", "age": 34} {"id": 5, "name": "Emily", "age": 35}
아래 이미지처럼 Message 들이 추가됩니다.
ksql Stream, Table 생성.
아래 명령어를 통해서 ksql CLI 로 진입할 수 있습니다.
docker run --rm --network kafka_kafka -it confluentinc/cp-ksqldb-server:7.4.3 ksql http://ksql-server:8088
=========================================== = _ _ ____ ____ = = | | _____ __ _| | _ \| __ ) = = | |/ / __|/ _` | | | | | _ \ = = | <\__ \ (_| | | |_| | |_) | = = |_|\_\___/\__, |_|____/|____/ = = |_| = = The Database purpose-built = = for stream processing apps = =========================================== Copyright 2017-2022 Confluent Inc. CLI v7.4.3, Server v7.4.3 located at http://ksql-server:8088 Server Status: RUNNING Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
Stream 과 Table 을 생성합니다.
CREATE STREAM registration_stream ( id int, name varchar, age int ) WITH ( kafka_topic='test_topic', value_format='JSON' ); CREATE TABLE user_table AS SELECT name, COUNT(*) AS name_count FROM registration_stream GROUP BY name;
CREATE STREAM REGISTRATION_STREAM (ID INTEGER, NAME STRING, AGE INTEGER) WITH (CLEANUP_POLICY='delete', KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON'); Message ---------------- Stream created ---------------- CREATE TABLE USER_TABLE WITH (CLEANUP_POLICY='compact', KAFKA_TOPIC='USER_TABLE', PARTITIONS=1, REPLICAS=1, RETENTION_MS=604800000) AS SELECT REGISTRATION_STREAM.NAME NAME, COUNT(*) NAME_COUNT FROM REGISTRATION_STREAM REGISTRATION_STREAM GROUP BY REGISTRATION_STREAM.NAME EMIT CHANGES; Message ----------------------------------------- Created query with ID CTAS_USER_TABLE_3 -----------------------------------------
select * from USER_TABLE limit 5; +--------------------------+---------------------------+ |NAME |NAME_COUNT | +--------------------------+---------------------------+ |Andy |1 | |Bob |1 | |Chris |1 | |Dennis |1 | |Emily |1 | Query terminated
반응형'Kafka' 카테고리의 다른 글
Kafka Repartition 알아보기. (0) 2023.12.22 Docker 로 Kafka Cluster 구축해보기. (0) 2023.12.16 Kafka-Connect & MySQL 구현하기 (0) 2023.12.09 Kafka Replication (메시지 복제) 이해하기 (0) 2023.09.21 kafka __consumer_offsets topic 이해하기 (0) 2023.09.18