-
[RabbitMQ] Exchange 알아보기RabbitMQ 2024. 1. 28. 12:24728x90반응형
- 목차
들어가며.
이번 글에서는 RabbitMQ 의 구성요소인 Exchange 에 대해서 알아보려고 합니다.
Exchange 는 RabbitMQ Publisher 가 메시지를 전달할 때에 메시지를 적절한 Queue 로 라우팅하는 역할을 담당합니다.
우선 RabbitMQ 와 Client 사이의 관계를 간단하게 살펴보도록 하겠습니다.
아래 이미지를 보시면 Publisher 와 Broker 사이의 관계가 표현됩니다.
Publisher 는 메시지를 RabbitMQ Broker 에게 전달합니다.
메지시가 RabbitMQ Broker 로 전달되면, 이제 RabbitMQ Broker 의 내부를 살펴봐야합니다.
Broker 내부에는 Message Delivery 를 위한 구성요소인 Exchange, Queue 가 존재합니다.
Queue 는 실질적인 메시지의 저장소로써 동작하며, 오늘의 주제인 Exchange 는 Client 로부터 전달받은 메시지를 적절한 Queue 로 분배합니다.
Exchange 라는 이름에서 알 수 있듯이, 메시지를 교환 및 분배하는 역할을 합니다. ( 일종의 라우터 )
이번 글에서는 RabbitMQ 의 Exchange 가 어떤 방식으로 동작하며, 관련한 예제 코드들을 작성해보려고 합니다.
RabbitMQ Docker Container.
실습 및 예제 코드들과 함께 설명을 이어가려고 합니다.
그래서 RabbitMQ 환경 구축을 위한 Docker 명령어들과 RabbitMQ 세팅을 선행하겠습니다.
https://hub.docker.com/_/rabbitmq
아래 명령어를 통해서 RabbitMQ Container 를 실행할 수 있습니다.
docker run -d --hostname my-rabbit --name some-rabbit \ -p 8080:15672 \ -p 5672:5672 \ -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password \ rabbitmq:3-management
Container 실행 이후에 http://localhost:8080 페이지에서 로그인 이후에 Admin 접속이 가능합니다.
계정은 Docker Container 실행 시에 입력한 환경변수에 해당합니다.
마지막으로 RabbitMQ Admin 에서 hello 라는 이름의 Queue 를 생성합니다.
Exchange 종류.
Exchange 는 RabbitMQ Broker 내부에 존재하는 구성요소 (Entity) 입니다.
Exchange 는 Publisher 에 의해서 전송된 메시지를 연결된 Queue 로 전달하는 역할을 수행합니다.
정의된 규칙에 따라서 Exchange 는 0개 이상의 Queue 로 메시지를 전달할 수 있습니다.
일반적으로는 1개의 Queue 로 메시지를 전달하지만, 경우에 따라서 어떠한 Queue 로도 메시지를 전달하지 않고 Drop 하거나
모든 Queue 들로 메시지를 전달할 수 있습니다.
Direct Exchange ( Unicast ).
Exchange 에는 여러가지 방식들이 존재합니다.
먼저 Direct Exchange 방식은 Direct 라는 단어에서 알 수 있듯이, Exchange 와 Queue 사이의 1:1 매칭을 지원합니다.
그래서 Publisher 가 Direct Exchange 방식으로 메시지를 Publish 한다면,
이는 지정된 1개의 Queue 로 메시지가 전달됩니다.
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', port=5672, credentials=pika.PlainCredentials('user', 'password')) ) channel = connection.channel() channel.queue_declare(queue='hello', durable=True) channel.basic_publish(exchange='amq.direct', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") channel.close() connection.close()
아래의 결과 이미지처럼 1개의 Connection 과 Channel 이 생성된 모습을 확인할 수 있으며,
amq.direct Exchange 를 거쳐 hello Queue 로 메시지가 전달됩니다.
queue_declare.
생성된 Channel 을 통해서 queue_declare 함수를 사용할 수 있습니다.
queue_declare 함수는 Channel 과 Queue 를 연결할 수 있는 함수입니다.
queue_declare 의 활용법은 크게 2가지인데요.
- Queue 의 존재 유무를 체크하거나
- 존재하지 않는 Queue 라면 이를 생성합니다.
passive 인자를 통해서 존재 유무를 확인할 수 있으며, 사용법은 아래와 같습니다.
존재하지 않는 Queue 를 대상으로 queue_declare 함수를 사용하게 되면, 404 Exception 을 Raise 하게 됩니다.
channel.queue_declare(queue='no-existent', passive=True) # pika.exceptions.ChannelClosedByBroker: (404, "NOT_FOUND - no queue 'hello123' in vhost '/'")
그리고 passive 옵션이 False 상태일 때에는 Queue 를 생성하는 역할을 수행합니다.
channel.queue_declare(queue='hello2', passive=False, durable=True)
Fanout Exchange ( Broadcast ).
Fanout Exchange 방식은 흔히 말하는 Broadcast 방식입니다.
Fanout Exchange 와 연결된 모든 Queue 로 메시지를 전파합니다.
먼저 관련한 예시부터 확인해보시죠.
아래의 예시는 queue_declare 함수를 통해서 새로운 Queue 4개를 생성합니다.
그리고 queue_bind 함수를 통해서 Queue 와 Fanout Exchange 를 연결합니다.
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost', port=5672, credentials=pika.PlainCredentials('user', 'password')) ) channel = connection.channel() channel.queue_declare(queue='new_queue1', passive=False) channel.queue_declare(queue='new_queue2', passive=False) channel.queue_declare(queue='new_queue3', passive=False) channel.queue_declare(queue='new_queue4', passive=False) channel.queue_bind(exchange='amq.fanout', queue='new_queue1') channel.queue_bind(exchange='amq.fanout', queue='new_queue2') channel.queue_bind(exchange='amq.fanout', queue='new_queue3') channel.queue_bind(exchange='amq.fanout', queue='new_queue4') channel.basic_publish(exchange='amq.fanout', routing_key='', body='Hello World!') print(" [x] Sent 'Hello World!'") channel.close() connection.close()
queue_bind.
queue_bind 함수를 통해서 아래와 같이 Exchange 와 Queue 를 연결할 수 있습니다.
이렇게 연결된 모든 Queue 들은 Fanout Exchange 를 통해서 Broadcast 전송이 가능해집니다.
또한 queue_unbind 함수를 통해서 Binding 을 제거할 수 있습니다.
channel.queue_unbind(exchange='amq.fanout', queue='new_queue4')
Fanout Exchange 를 통해서 연결된 모든 Queue 로 메시지를 전달할 수 있습니다.
그 외의 Topic Exchange, Header Exchange 방식이 더 존재하지만,
실용적으로 주로 사용되는 두가지 Exchange ( Direct, Fanout ) 만을 살펴보았습니다.
반응형