개요
AMQP (advanced message queuing protocol)란 메세지를 생산하는 Producer와 메세지를 소비하는 Consumer 사이에 메세지를 보관하는 Broker를 두어, 생산자와 소비자를 연결하는 네트워크 프로토콜을 의미한다.
AMQP를 사용하는 대표적인 서비스로는 RabbitMQ가 있으며, 구성요소는 다음과 같다.
- Exchange : 메세지 생산자인 Producer는 메세지를 발행할 때 Queue가 아닌, Exchange에 메세지를 저장한다. Exchange에 저장된 메세지는 Binding Key를 사용하여 사전에 정의된 바인딩 규칙에 따라 적절한 Queue로 옮겨진다.
- Queue : Queue는 Exchange로 부터 이동해온 메세지들을 보관하는 저장소로서, 메세지 소비자인 Consumer는 Queue에 보관된 메세지를 읽고 처리한다.
- Binding : Exchange와 Queue를 연결하기 위한 Key. Binding Key를 사용하여 바인딩 규칙을 정의하고, Exchange에 저장된 메세지를 적절한 Queue로 옮기는 작업이 이루어진다.
이 글의 목적은 RabbitMQ의 기본적인 Configuration과 사용법에 있지 않다. 운영의 관점에서 바라보았을 때 중요한 것은 성공케이스가 아닌, 예상치 못한 실패 케이스이다. 실패 케이스에 대한 대응을 어떻게 하느냐에 따라서 안정적인 시스템이 될 수도 있고, 대처 능력이 부족한 미완성 소스가 될 수도 있다.
위와 같은 문제의식을 바탕으로 글의 전반에 걸쳐 다룰 내용은 Consumer에서 소비에 실패한 Queue를 DLX/DLP로 이동시키는 작업과, DLP로 이동한 메세지를 추후 어떻게 처리할 것인가에 대한 것이다.
문제상황
RabbitMQ를 사용하여 이벤트 발행과 구독 로직을 개발한 후 테스트를 진행하다 이상한 현상을 발견했다. 데이터 정합성 문제로 시스템에서 오류가 발생하였는데, 시스템이 비정상 종료되지 않고 무한으로 Queue Consume 재시도를 수행하는 것이었다.
관련 정보를 찾아보니 RabbitMQ에서는 메세지 소비에 실패하였을 때 retry 정책을 설정할 수 있는데, 현재 프레임워크에 해당 설정이 되어 있지 않다 보니 구독하는 로직에서 Queue에 남아 있는 메세지를 소비하기 위한 동작이 무한정 반복되고 있었던 것이다.
RabbitMQ DLX를 사용하여 예외발생 시 무한 Retry를 제한하자.
DLX에 대해 알아보고 직접 DLX를 생성하여 무한 Retry를 제한해봅니다.
velog.io
이슈를 해소하기 위해서는 처리에 실패한 메세지에 대해 Retry 정책을 설정하여 최대 횟수를 제한하고, 그 이후엔 실패한 메세지가 담겨 있는 Queue에서 분리하는 작업이 필요하다.
메세지를 기존 Queue에서 분리하기 위해 DLX/DLQ를 사용해야 한다.
- DLX (Dead-Letter-Exchange) : 메세지를 발행하는 공간인 Exchange를 의미한다. 다만 DLX의 경우에는 비즈니스 로직에서 참조하는 Queue와 연결되는데, 만일 Queue의 메세지 구독에 실패했을 때 Retry 정책에 따라 재시도를 수행하여도 정상적으로 처리가 되지 않은 경우 연결된 Exchange로 메세지를 이동시킨다. 구독에 실패한 메세지를 보관하여 DLQ로 전달하는 역할을 담당하는 것이다.
- DLQ (Dead-Letter-Queue) : DLX에서 보관하고 있던 메세지는 앞에서 기술한 Binding Rule에 따라 적절한 Queue로 옮겨지게 된다. DLX는 DLQ로 메세지를 바인딩하도록 규칙을 지정하게 되므로, 최종적으로 실패한 메세지는 기존 Queue에서 분리되어 DLQ로 이동하는 것이다. DLQ는 실패한 메세지를 보관하는 메세지 저장소로 보면 된다.
문제 해결 과정
문제를 해소하기 위해 필요한 과정은 다음과 같다.
- 프레임워크 전역에 RabbitMQ 실패 메세지에 대한 Retry 정책 설정
- 이벤트 구독 로직의 Catch 문에서 예외를 전파할 때 AMQP Library의 AmqpRejectAndDontRequeueException 예외 전파
- 비즈니스 로직을 처리하는 Queue에 DLX 및 Binding Rule을 정의
1. 프레임워크 전역에 RabbitMQ 실패 메세지에 대한 Retry 정책 설정
프레임워크에 Retry정책을 설정하는 방법은 간단하다. (물론 몇 초 간격으로 최대 몇 회 재시도를 진행할지는 개발팀 내부적으로 반드시 논의가 되어야 할 부분이라고 생각한다) springboot의 yaml 설정으로 아래와 같이 구성해주면 Runtime에 자동으로 설정 정보가 반영된다.
rabbitmq:
host: localhost
username: guest
password: guest
port: 5672
# 재처리 설정
listener:
simple:
retry:
enabled: true
initial-interval: 3s
max-interval: 10s
max-attempts: 5
multiplier: 2
각 옵션에 대한 세부 정보는 아래와 같다.
- enabled : Retry 정책 활성화 여부
- initial-interval : 초기 재시도 간격
- max-interval : 최대 재시도 간격
- max-attempts : 최대 재시도 횟수
- multiplier : 재시도 간격 증가율
위와 같이 실패에 대한 재시도 정책을 설정하게 되면, Queue의 메세지 구독이 실패하더라도 지정된 횟수만큼 메세지 소비 재시도가 이루어지며 재시도를 하여도 해결이 되지 않는 경우 메세지를 Queue에 연결된 DLX로 보내게 된다.
2. 이벤트 구독 로직의 Catch 문에서 예외를 전파할 때 AMQP Library의 AmqpRejectAndDontRequeueException 예외 전파
@RabbitListener 를 사용하는 구독 로직에서 Queue 메세지 구독에 실패했음을 RabbitMQ에 알리기 위해서는 AMQP의 Custom Exception을 사용해야만 한다. 따라서 기존 로직에서 사용하던 내부용 Exception을 걷어내고, AmqpRejectAndDontRequeueException 예외를 전파하도록 변경하였다.
AS-IS
...
try {
// business logic...
execute(data);
} catch (Exception e) {
log.error("event consume failed...");
throw new CustomException(e);
}
TO-BE
...
try {
// business logic...
execute(data);
} catch (Exception e) {
log.error("event consume failed...");
throw new AmqpRejectAndDontRequeueException(e);
}
3. 비즈니스 로직을 처리하는 Queue에 DLX 및 Binding Rule을 정의
Spring에서 RabbitMQ Configuration을 잡을 땐 Queue/Exchange/Binding Rule을 Bean으로 등록해야 한다. 가장 간단한 기본 설정은 다음과 같다.
// org.springframework.amqp.core.Queue
@Bean
public Queue queue() {
return new Queue(queueName);
}
/**
* 지정된 Exchange 이름으로 Direct Exchange Bean 을 생성
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(exchangeName);
}
/**
* 주어진 Queue 와 Exchange 을 Binding 하고 Routing Key 을 이용하여 Binding Bean 생성
* Exchange 에 Queue 을 등록한다고 이해하자
**/
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
만일 위와 같은 설정에서 Bean으로 등록된 Queue에 DLX를 연결해야 한다면 DLX와 DLQ를 별도의 Bean으로 등록하고 queue에 해당 Exchange와 Binding Rule을 연결해주면 된다.
// org.springframework.amqp.core.Queue
@Bean
public Queue queue() {
return QueueBuilder.durable(queue)
.deadLetterExchange(deadExchange)
.deadLetterRoutingKey(deadBinding)
.build();
}
/**
* 지정된 Exchange 이름으로 Direct Exchange Bean 을 생성
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(exchangeName);
}
/**
* 주어진 Queue 와 Exchange 을 Binding 하고 Routing Key 을 이용하여 Binding Bean 생성
* Exchange 에 Queue 을 등록한다고 이해하자
**/
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
@Bean
public Queue deadLetterQueue() {
return new Queue(deadQueueName);
}
/**
* 지정된 Exchange 이름으로 Direct Exchange Bean 을 생성
*/
@Bean
public DirectExchange deadLetterdirectExchange() {
return new DirectExchange(deadExchangeName);
}
/**
* 주어진 Queue 와 Exchange 을 Binding 하고 Routing Key 을 이용하여 Binding Bean 생성
* Exchange 에 Queue 을 등록한다고 이해하자
**/
@Bean
public Binding deadBinding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
위와 같이 설정을 완료하게 되면 소스를 배포했을 때 RabbitMQ 인스턴스에 자동으로 Queue/Exchange/Binding Rule이 적용된다.
남은 과제 : DLQ 메세지 후속처리
길게 서술한 실패 처리에 대한 대응 방안을 모두 적용하더라도 남아 있는 문제가 여전히 존재한다. 구독에 실패하여 DLQ로 이동한 메세지에 대한 후속처리 문제이다.
좋은 시스템이란 실패 케이스가 발생하였을 때 사람이 개입하지 않더라도 시스템 자동화를 통해 복구가 가능해야 한다고 생각한다. 하지만 자동화된 복구가 불가능한 경우 불가피하게 수기처리가 필요한 경우가 생길 수 있다.
구분 | 시스템 자동화 가능 | 시스템 자동화 불가능 |
데이터 정합성 문제 | √ | |
내부 로직 오류 | √ | |
네트워크 통신 오류 | √ | |
인프라 문제 | √ | |
RabbitMQ 자체 문제 | √ |
※ 시스템 자동화란 DLQ 메세지에 대한 Consumer를 재정의 하는 것과 배치 프로그램을 개발하여 주기적으로 실패 큐를 재처리 하는 과정을 포함한다.
※ 위 글에서는 '시스템 자동화 가능/불가능' 으로 표현했지만 일반적으로는 '일시적인 에러/보정이 필요한 에러' 로 표현한다.
네트워크 통신 오류, 인프라 문제, RabbitMQ 인스턴스 내의 자체 문제로 인해 메세지 구독이 실패한 경우 대게 조금 뒤에 재시도를 할 경우 별도의 조치 없이도 큐를 정상적으로 처리할 수 있다. 하지만 DB 데이터의 정합성 문제나 소비하는 로직의 문제가 발생한 경우엔 운영 담당자의 개입 없이는 정상적으로 큐를 소비할 수 없다.
후자와 같이 개발자의 수기 보정 후 재시도를 해야 하는 경우에는 기존 consume 과정 외에 배치 등 별개의 로직이 추가적으로 필요하다. 개발자가 개입하여 데이터 보정 혹은 로직 수정이 완료되고 나면 배치를 돌려 DLQ의 메세지를 처리한다. 배치는 DLQ에 담긴 메세지를 읽어 Rest 방식으로 재수행을 시도한다.
여기서 한 가지 의문이 남을 수 있다. 사람의 개입 없이 자체적으로 복구 가능한 에러의 경우 굳이 배치를 통해 처리할 필요가 있는가?
DLQ를 기존 consume 로직에서 소비하도록 한다면 일시적인 에러의 경우 정상적으로 처리가 될 수 있다. 하지만 문제는 DLQ는 비즈니스 데이터만을 담고 있기 때문에 일시적인 에러인지, 보정이 필요한 에러인지 구분할 수 있는 방법이 없다.
위와 같이 처리가 이루어질 경우 DLQ 소비 로직에 다음과 같은 절차가 필요할 것으로 보인다.
- DLQ를 구독하는 서비스 로직 개발
- 일시적인 에러의 경우 정상 처리
- 일시적인 에러 케이스의 실패 + 보정이 필요한 케이스의 실패에 대한 예외처리
일시적인 에러라 하더라도 재수행 시점에 네트워크가 정상적으로 복구되지 않아 재실패가 발생할 가능성이 있다. 또한 개발자의 개입이 필요한 에러의 경우 당연하게도 같은 로직을 수행할 경우 실패하게 된다. 위 두 경우를 묶어 catch 절에서 별도의 예외처리가 필요하다. 방법은 여러 가지가 존재할 수 있겠지만 두 가지 정도의 방안을 정리하면 다음과 같다.
- 실패한 Queue는 이유와 관계 없이 사람의 확인이 필요한 데이터로 인지하여 별도의 DLQ에 다시 저장하고, 원인을 파악하여 대응을 한 후 재수행 할 수 있도록 처리
- 실패 건을 관리하는 테이블을 생성하여 재수행 성공 여부 플래그와 메세지 json 값을 보관하여 배치로 후처리 진행
저장소의 차이만 존재할 뿐 두 가지의 성격은 일치한다. 결국 1차 재수행으로 해결할 수 없는 경우 별도의 저장소로 메세지를 이동시킨 후 사람이 개입하여 원인을 파악한 후 배치를 통해 재시도를 수행하는 것이다.
배치 작업을 진행할 때에는 @RabbitListener를 사용하지 않고 MQConnectionFactory에서 Connection을 생성한 후 별도의 처리가 필요하다. 메세지가 존재할 때 바로 consume을 하는 것이 아닌 배치의 스케줄링에 따라 동작해야 하기 때문이다.
...
try {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
ConnectionFactory factory = connectionFactory.getRabbitConnectionFactory();
Channel channel = factory.newConnection().createChannel();
// consume
channel.consume();
} catch (Exception e) {}
앞서 언급된 두 가지 저장 방법 중 테이블 방식이 좀 더 나은 것 같다고 생각한다. 왜냐하면 네트워크 문제가 바로 해결되지 않을 수 있고, 보정 작업은 사람이 하는 일이다 보니 배치 로직 내에서도 재수행 실패가 발생할 수 있기 때문이다. 테이블로 관리하되 재수행 성공 여부 컬럼에 인덱스를 적용한다면 성능 이슈도 발생하지 않을 것이고 실패한 메세지에 대해 언제든 배치가 재수행을 시도할 수 있게 된다.
또한 Queue만을 사용하는 방식과 달리 DLQ에 대한 처리를 고려하지 않아도 되는 편의를 챙길 수 있게 된다.
AWS SQS 활용
AWS SQS란, Pub-Sub 구조를 가지는 AWS의 Simple Queue Service이다. AWS SQS란?
SQS를 사용할 경우 AWS SDK의 도움을 받아 위 문제를 보다 수월하게 해결할 수 있다.
AWS SDK에서는 DLQ에 저장된 메세지를 Original Queue로 re-drive 할 수 있는 기능을 제공한다. 위의 케이스에 re-drive 기능을 적용하면 아래와 같이 절차가 간소화된다.
- original-queue에서 consume 실패가 발생할 경우 DLQ로 메세지를 이동
- 배치를 통해 주기적으로 DLQ 메세지를 original-queue로 이동
- 일시적인 에러는 재시도를 통해 해결, 보정이 필요한 에러는 다시 DLQ로 이동
- DLQ에 오랜 기간 보존된 데이터는 담당자가 확인 후 보정작업 진행 -> 이후의 flow는 모두 배치에서 re-drive를 통해 잔여 메세지 자동으로 처리
# Reference
https://velog.io/@black_han26/AMQPAdvanced-Message-Queuing-Protocol
AMQP 개념 및 Spring-RabbitMQ 적용
AMQP란 무엇인지 알아보고, RabbitMQ를 SpringBoot에서 적용해 본다.
velog.io
https://velog.io/@sdb016/RabbitMQ-%EA%B8%B0%EC%B4%88-%EA%B0%9C%EB%85%90
[RabbitMQ] 기초 개념
AMQP를 구현한 오픈소스 메세지 브로커이다.producers에서 consumers로 메세지(요청)를 전달할 때 중간에서 브로커 역할을 한다.사용하는 케이스는 다음과 같다.요청을 많은 사용자에게 전달할 때요청
velog.io
RabbitMQ DLX를 사용하여 예외발생 시 무한 Retry를 제한하자.
DLX에 대해 알아보고 직접 DLX를 생성하여 무한 Retry를 제한해봅니다.
velog.io
[서버] SpringBoot 을 이용한 RabbitMQ 구축하기
이번 포스팅에서는 이번 포스팅에서는 메세지 큐(Message Queue) 을 알아보자 에 이어서 실제로 RabbitMQ 을 Spring boot 에 적용해보겠습니다 🔥
velog.io
AWS SDK 로 DLQ 에 쌓인 메시지 처리하는 2가지 방법
관련 글AWS SQS + Spring Boot 3 + kotlin 인프라 구축하기AWS SQS Consumer 에러를 DLQ 로 처리하기message converter 를 이용한 sqs message serializerAWS SDK 로 DLQ 에 쌓인 메시지 처리하기 앞선 시간에 우리는 SQS 를 생
wonit.tistory.com
https://vprog1215.tistory.com/342
[RbbitMQ] Delay RabbitMQ Spring Consume
Delay RabbitMQ Spring Consume 앞서 발행한 메세지를 실제로 받아보자 java code package com.example.rabbitmqconsumer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; impo
vprog1215.tistory.com
https://jibinary.tistory.com/156
[AWS] Amazon SQS (Simple Queue Service) 쉽게 개념과 특징 정리 (Producer, Consumer, Queue, Short & Long Polling)
◇ 공부 기록용으로 작성하였으니 틀린점, 피드백 주시면 감사하겠습니다 ◇ Fully managed message queuing for microservices, distributed systems, and serverless applicationsSQS (Simple Queue Service) SQS
jibinary.tistory.com
'CS' 카테고리의 다른 글
Blocking/Non-Blocking(블로킹/논블로킹), Sync/Async(동기/비동기), Concurrent/Parallel(동시/병렬) (1) | 2024.10.11 |
---|