ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [RabbitMQ] Exchange 알아보기
    RabbitMQ 2024. 1. 28. 12:24
    728x90
    반응형

     

    - 목차

     

    들어가며.

    이번 글에서는 RabbitMQ 의 구성요소인 Exchange 에 대해서 알아보려고 합니다.

    Exchange 는 RabbitMQ Publisher 가 메시지를 전달할 때에 메시지를 적절한 Queue 로 라우팅하는 역할을 담당합니다.

    우선 RabbitMQ 와 Client 사이의 관계를 간단하게 살펴보도록 하겠습니다.

    아래 이미지를 보시면 Publisher 와 Broker 사이의 관계가 표현됩니다.

    Publisher 는 메시지를 RabbitMQ Broker 에게 전달합니다.

    메지시가 RabbitMQ Broker 로 전달되면, 이제 RabbitMQ Broker 의 내부를 살펴봐야합니다.

     

    출처 : https://medium.com/trendyol-tech/rabbitmq-exchange-types-d7e1f51ec825

     

     

    Broker 내부에는 Message Delivery 를 위한 구성요소인 Exchange, Queue 가 존재합니다.

    Queue 는 실질적인 메시지의 저장소로써 동작하며, 오늘의 주제인 Exchange 는 Client 로부터 전달받은 메시지를 적절한 Queue 로 분배합니다.

    Exchange 라는 이름에서 알 수 있듯이, 메시지를 교환 및 분배하는 역할을 합니다. ( 일종의 라우터 )

     

    출처 : https://www.cloudamqp.com/blog/part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html

     

     

    이번 글에서는 RabbitMQ 의 Exchange 가 어떤 방식으로 동작하며, 관련한 예제 코드들을 작성해보려고 합니다.

     

     

    RabbitMQ Docker Container.

    실습 및 예제 코드들과 함께 설명을 이어가려고 합니다.

    그래서 RabbitMQ 환경 구축을 위한 Docker 명령어들과 RabbitMQ 세팅을 선행하겠습니다.

     

    https://hub.docker.com/_/rabbitmq

     

    rabbitmq - Official Image | Docker Hub

    Quick reference Supported tags and respective Dockerfile links 4.0.0-beta.5, 4.0-rc⁠4.0.0-beta.5-management, 4.0-rc-management⁠4.0.0-beta.5-alpine, 4.0-rc-alpine⁠4.0.0-beta.5-management-alpine, 4.0-rc-management-alpine⁠3.13.6, 3.13, 3, latest⁠3.1

    hub.docker.com

     

    아래 명령어를 통해서 RabbitMQ Container 를 실행할 수 있습니다.

    docker run -d --hostname my-rabbit --name some-rabbit \
    -p 8080:15672 \
    -p 5672:5672 \
    -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password \
    rabbitmq:3-management

     

    Container 실행 이후에 http://localhost:8080 페이지에서 로그인 이후에 Admin 접속이 가능합니다.

    계정은 Docker Container 실행 시에 입력한 환경변수에 해당합니다.

     

     

    마지막으로 RabbitMQ Admin 에서 hello 라는 이름의 Queue 를 생성합니다.

     

     

    Exchange 종류.

    Exchange 는 RabbitMQ Broker 내부에 존재하는 구성요소 (Entity) 입니다.

    Exchange 는 Publisher 에 의해서 전송된 메시지를 연결된 Queue 로 전달하는 역할을 수행합니다.

    정의된 규칙에 따라서 Exchange 는 0개 이상의 Queue 로 메시지를 전달할 수 있습니다.

    일반적으로는 1개의 Queue 로 메시지를 전달하지만, 경우에 따라서 어떠한 Queue 로도 메시지를 전달하지 않고 Drop 하거나

    모든 Queue 들로 메시지를 전달할 수 있습니다.

     

    Direct Exchange ( Unicast ).

    Exchange 에는 여러가지 방식들이 존재합니다.

    먼저 Direct Exchange 방식은 Direct 라는 단어에서 알 수 있듯이, Exchange 와 Queue 사이의 1:1 매칭을 지원합니다.

    그래서 Publisher 가 Direct Exchange 방식으로 메시지를 Publish 한다면,

    이는 지정된 1개의 Queue 로 메시지가 전달됩니다.

     

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        port=5672,
        credentials=pika.PlainCredentials('user', 'password'))
    )
    channel = connection.channel()
    channel.queue_declare(queue='hello', durable=True)
    channel.basic_publish(exchange='amq.direct',
                          routing_key='hello',
                          body='Hello World!')
    print(" [x] Sent 'Hello World!'")
    
    channel.close()
    connection.close()

     

    아래의 결과 이미지처럼 1개의 Connection 과 Channel 이 생성된 모습을 확인할 수 있으며,

    amq.direct Exchange 를 거쳐 hello Queue 로 메시지가 전달됩니다.

     

     

     

    queue_declare.

    생성된 Channel 을 통해서 queue_declare 함수를 사용할 수 있습니다.

    queue_declare 함수는 Channel 과 Queue 를 연결할 수 있는 함수입니다.

    queue_declare 의 활용법은 크게 2가지인데요.

    - Queue 의 존재 유무를 체크하거나

    - 존재하지 않는 Queue 라면 이를 생성합니다.

     

    passive 인자를 통해서 존재 유무를 확인할 수 있으며, 사용법은 아래와 같습니다.

    존재하지 않는 Queue 를 대상으로 queue_declare 함수를 사용하게 되면, 404 Exception 을 Raise 하게 됩니다.

    channel.queue_declare(queue='no-existent', passive=True)
    
    # pika.exceptions.ChannelClosedByBroker: (404, "NOT_FOUND - no queue 'hello123' in vhost '/'")

     

    그리고 passive 옵션이 False 상태일 때에는 Queue 를 생성하는 역할을 수행합니다.

    channel.queue_declare(queue='hello2', passive=False, durable=True)

     

     

    Fanout Exchange ( Broadcast ).

    Fanout Exchange 방식은 흔히 말하는 Broadcast 방식입니다.

    Fanout Exchange 와 연결된 모든 Queue 로 메시지를 전파합니다.

    먼저 관련한 예시부터 확인해보시죠.

     

    아래의 예시는 queue_declare 함수를 통해서 새로운 Queue 4개를 생성합니다.

    그리고 queue_bind 함수를 통해서 Queue 와 Fanout Exchange 를 연결합니다.

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        port=5672,
        credentials=pika.PlainCredentials('user', 'password'))
    )
    channel = connection.channel()
    channel.queue_declare(queue='new_queue1', passive=False)
    channel.queue_declare(queue='new_queue2', passive=False)
    channel.queue_declare(queue='new_queue3', passive=False)
    channel.queue_declare(queue='new_queue4', passive=False)
    channel.queue_bind(exchange='amq.fanout', queue='new_queue1')
    channel.queue_bind(exchange='amq.fanout', queue='new_queue2')
    channel.queue_bind(exchange='amq.fanout', queue='new_queue3')
    channel.queue_bind(exchange='amq.fanout', queue='new_queue4')
    channel.basic_publish(exchange='amq.fanout',
                          routing_key='',
                          body='Hello World!')
    print(" [x] Sent 'Hello World!'")
    
    channel.close()
    connection.close()

     

     

    queue_bind.

    queue_bind 함수를 통해서 아래와 같이 Exchange 와 Queue 를 연결할 수 있습니다.

    이렇게 연결된 모든 Queue 들은 Fanout Exchange 를 통해서 Broadcast 전송이 가능해집니다.

     

    또한 queue_unbind 함수를 통해서 Binding 을 제거할 수 있습니다.

     

    channel.queue_unbind(exchange='amq.fanout', queue='new_queue4')

     

     

    Fanout Exchange 를 통해서 연결된 모든 Queue 로 메시지를 전달할 수 있습니다.

     

     

     

    그 외의 Topic Exchange, Header Exchange 방식이 더 존재하지만,

    실용적으로 주로 사용되는 두가지 Exchange ( Direct, Fanout ) 만을 살펴보았습니다.

     

     

     

    반응형
Designed by Tistory.