주식 분석 스트리밍 4

RabbitMQ(AMQP) 설치 편

[Message Queue]

Aibees가 마스크 판매업을 시작했습니다.
뭣도 모르고 시작한 장사이기에 직접 배달을 하려고 합니다.
A가 첫번째 주문을 넣었군요. Aibees는 직접 차를 타고가서 물건을 건네줍니다.
물건이 좋다고 단골이 되겠다고 하네요. 
시간이 지나고, 새로운 B, C, D들이 나타나 마스크를 주문했고 이들 역시 단골이 되어주었습니다.
이에 힘입어 마스크 외에도 여러 물품들을 판매하게 되었죠.

그런데 문제가 생겼어요.
직접 배달을 하다보니 너무 할일이 많아지게 되고 문어발식 동선이 되다보니 업무가 점점 복잡해지게 되었습니다.

그래서 Aibees는 M이라는 매니저를 고용하게 됩니다. M은 Aibees가 *생산하는 물건들을 정리하고 배달해주는 역할*을 맡게 됩니다.
Aibees는 이제 누구와 계약을 맺었는지, 어디로 배달을 가야하는지 알 필요가 없게 되었습니다. (현실에선 큰일날 소리이지만요)

위 예시에서의 M이 이번에 소개하고 설치해 볼 RabbitMQ가 속한 Message Queue 오픈소스 툴이다.
데이터를 생산(Producer)하는 시스템과 전달받는(Consumer) 시스템 사이 비동기적으로 데이터 송/수신이 가능하도록 중개하는 것을 MOM(Message Oriented Middleware)이라고 하며, MOM을 오픈소스 툴로 만든 것이 Message Queue 시스템이다.

MOM통신을 위해 규정된 ISO Application Layer 프로토콜이 AMQP(Advanced Message Queuing protocol) 이며 프로토콜인 만큼 동일한 프레임워크, 언어가 아니더라도 Octet-Stream 규격으로 데이터 통신이 가능하다.

특징
채팅 서비스나 비동기 데이터 기반 시스템, 배치작업에서 주로 사용하게 되며 프로세스 단위로 처리하는 시스템에서 사용자/데이터가 많아지면 요청에 대한 응답을 기다리는 수가 증가하고 지연되어 정상적인 서비스가 불가능해진다.

예시로, 쇼핑 웹페이지에 주문처리가 들어오면 주문내역 저장, 결제처리, 예산/재고계산, 관련 담당자 PUSH 등 다양한 곳에 동시다발적으로 데이터를 보내야한다. 여기서 발생하는 자원을 한 모듈에서 감당하기에는 참 부담스러운데 여기에 통계부서를 만들었다고 주문데이터를 일일히 보내달라고 한다.

이러면 웹 페이지는 이런 API 전송 때문에 처리속도가 점점 지연되고 유지보수는 더더욱 복잡해진다.
로딩이 허용?되는 시스템은 그럭저럭 유지된다 해도 게임같은 딜레이가 치명적인 서비스에서는 안될 일인 것이다.

이런 문제들을 해소할 수 있는게 Message Queue 이다.

MQ종류

  1. RabbitMQ
    • AMQT 프로토콜을 사용하는 대표적인 MQ시스템
    • 유연한 라우팅/클러스터링이 가능 → 확장성이 높다고 평가할 수 있다.
    • 거의 모든 언어와 운영체제를 지원(적용하기 쉽다.)
  2. ActiveMQ
    • 아파치에서 관리하는 오픈소스로서 AMQP가 아닌 JMS를 사용한다.
    • JMS : Java Messaging Service로 Java기반 어플리케이션 사이에서 효과적으로 통신할 수 있는 API - Rest API를 기반으로 통신한다. - Spring에서 쉽게 사용할 수 있다. - C, Python, PHP등에서도 사용가능(하나 Stomp 프로토콜이 개입되어야 한다.)
  3. Kafka
    • 대용량/실시간 로그 처리에 특화된 메시징 시스템으로 TPS가 다른 MQ시스템보다 TPS가 월등하다.
    • 특화되었다는 말은 범용 MQ시스템들이 가지는 다양한 기능을 제공하지 않는다는 이야기도 된다.
    • AMQP 프로토콜이 아닌 단순한 메시지 Header를 가진 TCP 통신으로 Overhead를 감소시켰다. - 분산처리 시스템을 위해서 설계되었기 때문에 분산/복제 구성이 쉽다. - 범용 MQ시스템은 Broker가 Consumer에게 PUSH하는 방식이지만 Kafka는 Consumer가 Broker로부터 PULL하여 가져가는 방식으로 동작하기 때문에 Consumer는 자신의 처리능력만큼의 메시지만 처리할 수 있다.
    • 앞 포스트에서 설명한 Spark Streaming의 Windowed DStream과 연계하면 왜 Spark와 Kafka 연동을 추천하는지 이해할 수 있다. - PULL하여 가져갈 수 있다는 얘기는 곧 영속성(Durability)를 보장한다는 이야기이며 데이터를 따로 쌓아두어 Batch에서도 사용 가능하다는 이야기이다.

[RabbitMQ]

그럼 이제 본격적으로 RabbitMQ에 대해 이야기하고자 한다.

  1. 왜 RabbitMQ인가

위에서 각 MQ별로 설명하는 내용을 보면 Spark까지 도입하는 입장에서 Kafka를 선택하지 않는게 이상할 정도로 Spark와 Kafka는 참 짝이 잘 맞는다.

그러나 Kafka는 생각보다 굉장히 무거운 오픈소스이다.
공식문서에서 권장하는 설치는 기본적으로 3개의 인스턴스를 올려야 하며(각각 다른 서버에) ZooKeeper라는 공개 분산형 레지스트리 구성 서비스 시스템을 같이 띄워야 한다. 주키퍼 안에 각 인스턴스 상태를 보존하므로써 영속성을 보장한다고 말하는 것이다.

이런 권장사항으로 인해 규모가 작고 일개 개발자가 진행하는 토이프로젝트에는 맞지 않다.

그럼 ActiveMQ는 어떠한가.
ActiveMQ는 JMS를 기반으로 올려진 MQ시스템이기 때문에 Spring Boot위주의 MSA 등에서는 효과적이지만 Python, Node.js등 다양한 언어의 프레임워크들로 시스템을 구성한다면 다소 난해해진다.

RabbitMQ는 언어장벽에 구애받지 않고 쉽게 통신할 수 있도록 개발되었기 때문에 보다 쉬운개발을 할 수 있다는 큰 장점이 있어서 선택한 것이다.

특히 Python에서 PIKA라는 Package를 제공하여 Publish하기 정말 쉽다.
반대로 Spark에서는 공식적으로 RabbitMQ 관련 Dependency를 제공하지 않고, 다른 단체에서 만든 라이브러리마저 2017, 2019년도 이후 전혀 관리가 되고있지 않아 쉽게 적용하지는 못했다. (이건 다음 포스트에서 설명하겠다.)

  1. RabbitMQ 구조

RabbitMQ는 Erlang이라는 언어로 구현된 시스템으로 서버에 자체적으로 녹여내기엔 부수적으로 설치해야 한다. → Docker Container로 분리해서 띄우는게 더 쉽다.

                            RabbitMQ
Producers -> [ Exchange --> Binding --> Queue ] -> Consumers

Publish(Producer) 시스템은 Broker(RabbitMQ)의 Exchange에 메시지를 전달하면 그 안에서 Administrator가 정한 Binding 규칙에 의해 적절한 Queue를 선택하여 메세지를 Push한다.

※ Binding: Exchange에서 적절한 Queue로 전달하기 위해 설정한 임의의 규칙을 의미한다. Queue 이름을 직접적으로 추가할 수도 있고 밑에서 얘기할 Topic Type의 경우 Key의 특정규칙이 보이면 그에 따른 Routing도 가능하다.

※ Routing Key: 메세지는 Key와 Payload로 이루어져 있다. Key는 Queue 이름일 수도 있고 Binding을 위한 Routing 규칙일 수도 있다.

메세지 전달방식(Exchange Type)은 총 4개가 존재한다.

  1. Direct Exchange
    • Routing Key가 Queue의 이름으로 지정해서 사용하는 방식이다.
    • Queue를 생성하고 특정 Key, 예를 들어 ‘AAA’라는 Key가 들어오면 받겠다라고 명시를 한다면 ‘AAA’ Key로 지정된 메세지는 모조리 해당 Queue로 들어간다.
    • Key가 일치하지 않으면 메시지는 무시된다.
  2. Fanout Exchange
    • Routing Key 관계없이 Exchange에 연결된 모든 Queue에 메세지를 Push한다.
    • 채팅 서비스에서 전체공지같은 메세지를 보낼 때 유용하다.\
  3. Topic Exchange
    • Routing Key가 Binding 패턴에 해당된다면 메세지를 Push하는 방식이다.
    • WildCard 방식을 이용한다.
    • *: 단어 1개 일치
    • #: 0~1개 이상 단어 일치
    • ex) Q1의 Binding이 *.A.# 일 경우 ‘zz.A.xx.yy’, ‘xy.A.XY’ 같은 Routing Key는 허용이 되지만 ‘XY.xy.A’나 ‘AB.A’같은 Key는 허용되지 않는다.
  4. Headers Exchange
    • Binding은 무시된 채 헤더값이 지정된 값과 같은 경우 일치로 간주하고 메세지를 Push한다.
    • Spring에서 Header에 USER-KEY를 넣어 검증하는 경우와 동일한 원리라 생각하면 쉽다.

RabbitMQ 설치 Docker로 설치하면 정말 쉽다.

왜냐면, 공식 Docker Image가 이미 만들어져 있기 때문에 Image를 Pull 받아서 설치하면 되기 때문이다.

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

port 5672번은 AMQP Socket통신용이며 15672는 Management Web용으로 열게 된다.

설치가 끝나고 Management UI를 접속하면 다음과 같은 화면이 나온다. (초기 로그인/비밀번호는 구글링하세요^^)

rabbitmq

참고로 Nginx에 Reverse Proxy를 3일정도 연구했는데 결국 실패해서 Port를 그대로 열어두었다.(물론 5672, 15672 그대로 열지는 않았지만…)

Hits

Written on February 4, 2022