RabbitMQ DLX/DLQ 메세지 손실 방지
이번 글에 주제는 RabbitMQ를 재시작 했을 때 DLX(Dead Letter Exchange)와 DLQ(Dead Letter Queue)의 메세지가 손실 되지 않는 구성 및 코드를 작성하는 방법에 대해서 쓰려고 합니다. 물론 DLX/DLQ에만 국한된 이야기가 아니고 Exchange/Queue에도 관련된 이야기 입니다. DLX/DLQ도 결국에는 Exchange이면서 Queue이기도 하기 때문입니다.
이와 관련해서 사용된 기술은 Spring AMQP 기준 입니다. 그렇지만 기술만 그러할 뿐 근본적인 내용은 RabbitMQ에 의존하기 때문에 다른 언어에서도 지원 되는 기술을 찾아 동일하게 적용할 수 있습니다.
방법
RabbitMQ 서버가 재시작 되었을 때 메세지가 없어지는 현상을 방지하기 위해선
- Exchange/Queue 설정의 Durability값이 true인지와 Auto delete값이 false인지 확인
- 메세지를 발송 할 때 메세지의 Delivery mode가 2인지 확인
- 1 메모리 저장(손실 발생)
- 2 디스크 저장(손실 없음)
이와 관련해서 자세한 사항은 아래를 참고하시면 되겠습니다.
Exchange/Queue
DLX/DLQ에 대한 설명을 드리기 앞서 DLX/DLQ도 Exchange/Queue의 한 부분이기 때문에 선행으로 Exchange/Queue에 대한 설명을 하자면 Exchange/Queue 구성은 기본적으로 메모리에 저장됩니다. 따라서 RabbitMQ 서버를 재시작하게 되면 메모리가 초기화 되기 때문에 Exchange/Queue 구성 역시 사라지게 됩니다.
Exchange/Queue를 영구적으로 하기 위해선 Durability옵션을 true로 Auto delete옵션을 false로 설정하면 메모리가 아닌 디스크에 저장되기 때문에 RabbitMQ 서버를 재시작하더라도 사라지지 않습니다. 다행이 Exchange/Queue를 생성 할 때 해당 값들은 기본 값으로 설정되어 있습니다.
위 이미지와 같이 Exchange를 생성하는 RabbitMQ 콘솔 페이지를 보면 기본 설정이 영구 저장 상태임을 알 수 있습니다. 그리고 Queue도 마찬가지로 아래와 같이 기본 설정이 영구 저장 상태임을 알 수 있습니다.
그리고 Spring AMQP의 @Exchange
어노테이션으로 생성 할 때도 기본 값은 영구 저장 상태입니다.
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange {
.
.
.
// RabbitMQ의 Durability옵션 중 Durable 값을 뜻함.
String durable() default TRUE;
// RabbitMQ의 Auto delete옵션 중 No 값을 뜻함.
String autoDelete() default FALSE;
.
.
.
}
@Queue
어노테이션을 사용 했을 때는 상황이 다릅니다. @Queue
어노테이션의 durable, autoDelete 변수를 보면 Queue에 이름이 있는 경우에만 영구 저장된다고 나와 있습니다.
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Queue {
.
.
.
/**
* Specifies if this queue should be durable.
* By default if queue name is provided it is durable.
* @return true if the queue is to be declared as durable.
* @see org.springframework.amqp.core.Queue#isDurable()
*/
String durable() default "";
/**
* Specifies if this queue should be auto deleted when not used.
* By default if queue name is provided it is not auto-deleted.
* @return true if the queue is to be declared as auto-delete.
* @see org.springframework.amqp.core.Queue#isAutoDelete()
*/
String autoDelete() default "";
.
.
.
}
즉 @Queue
어노테이션을 사용 할 땐 name 혹은 value 값에 Queue 이름을 지정하거나 durable에 true, autoDelete에 false로 값을 초기화 해야 합니다.
만약 여기서 name 혹은 value 값을 지정하지 않는 경우엔 RabbitListenerAnnotationBeanPostProcessor#declareQueue
메서드에서 durable를 false, autoDelete를 true로 초기화 하게 됩니다.
public class RabbitListenerAnnotationBeanPostProcessor
implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware,
SmartInitializingSingleton {
.
.
.
private String declareQueue(org.springframework.amqp.rabbit.annotation.Queue bindingQueue,
Collection<Declarable> declarables) {
String queueName = (String) resolveExpression(bindingQueue.value());
boolean isAnonymous = false;
// Queue 이름이 없는 경우 검증
if (!StringUtils.hasText(queueName)) {
queueName = Base64UrlNamingStrategy.DEFAULT.generateName();
// 아래 코드의 주석대로 기본 값을 비영구로 설정.
// default exclusive/autodelete and non-durable when anonymous
isAnonymous = true;
}
Queue queue = new Queue(queueName,
resolveExpressionAsBoolean(bindingQueue.durable(), !isAnonymous),
resolveExpressionAsBoolean(bindingQueue.exclusive(), isAnonymous),
resolveExpressionAsBoolean(bindingQueue.autoDelete(), isAnonymous),
resolveArguments(bindingQueue.arguments()));
queue.setIgnoreDeclarationExceptions(resolveExpressionAsBoolean(bindingQueue.ignoreDeclarationExceptions()));
((ConfigurableBeanFactory) this.beanFactory).registerSingleton(queueName + ++this.increment, queue);
if (bindingQueue.admins().length > 0) {
queue.setAdminsThatShouldDeclare((Object[]) bindingQueue.admins());
}
queue.setShouldDeclare(resolveExpressionAsBoolean(bindingQueue.declare()));
declarables.add(queue);
return queueName;
}
.
.
.
}
그래서 아래 이미지와 같이 durable은 false, autoDelete는 true로 초기화 되어 Queue가 영구적으로 될 수 없으며 name을 보면 Spring AMQP에서 임의에 이름으로 초기화 합니다.
RabbitMQ 콘솔 페이지에서 확인하면 아래 이미지와 같이 비영구 상태로 생성 되어 있습니다.
영구 상태의 경우에는 아래 이미지와 같이 auto-delete 부분이 없고 durable이 true 입니다.
만약에 어노테이션이 아닌 org.springframework.amqp.core
패키지의 Queue
클래스로 Queue를 생성한다면 Queue
클래스의 점층적 생성자 패턴으로 durable은 true, autoDelete는 false로 기본 값으로 초기화 됩니다.
package org.springframework.amqp.core;
/**
* 첫번째 매개변수 : queue 이름
* 두번째 매개변수 : durable
* 세번째 매개변수 : exclusive
* 네번째 매개변수 : autoDelete
*/
public class Queue extends AbstractDeclarable implements Cloneable {
.
.
.
/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
// Queue 이름만 초기화 하는 생성자 메서드를 보면 durable(두번째 매개변수)은 true, autoDelete(네번째 매개변수)는 false로 초기화
this(name, true, false, false);
}
/**
* Construct a new queue, given a name and durability flag. The queue is non-exclusive and non auto-delete.
*
* @param name the name of the queue.
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
*/
public Queue(String name, boolean durable) {
this(name, durable, false, false, null);
}
/**
* Construct a new queue, given a name, durability, exclusive and auto-delete flags.
* @param name the name of the queue.
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's
* connection)
* @param autoDelete true if the server should delete the queue when it is no longer in use
*/
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
this(name, durable, exclusive, autoDelete, null);
}
/**
* Construct a new queue, given a name, durability flag, and auto-delete flag, and arguments.
* @param name the name of the queue - must not be null; set to "" to have the broker generate the name.
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's
* connection)
* @param autoDelete true if the server should delete the queue when it is no longer in use
* @param arguments the arguments used to declare the queue
*/
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete,
@Nullable Map<String, Object> arguments) {
super(arguments);
Assert.notNull(name, "'name' cannot be null");
this.name = name;
this.actualName = StringUtils.hasText(name) ? name
: (Base64UrlNamingStrategy.DEFAULT.generateName() + "_awaiting_declaration");
this.durable = durable;
this.exclusive = exclusive;
this.autoDelete = autoDelete;
}
.
.
.
}
정리
- Exchage/Queue 정보를 디스크에 저장하기 위해선 durable 은 true, autoDelete 는 false 여야한다.
- RabbitMQ 콘솔 페이지에서 Exchage/Queue 생성 시 기본 값은 디스크 저장 방식이다.
- Spring AMQP인 경우
@Exchange
어노테이션의 기본 값은 디스크 저장 방식이고@Queue
어노테이션은 Queue 이름(value 혹은 name 값)을 지정 하거나 durable 값에 true, autoDelete 값에 false로 지정해야 한다.org.springframework.amqp.core#Queue
클래스로 Queue를 생성할 경우 Queue 이름만 초기화 하는 생성자 메서드를 사용하면 durable는 true, autoDelete는 false로 기본 초기화가 된다.
DLX/DLQ
DLX/DLQ 구성은 프로세스 오류를 유연하게 처리하기 위해 사용됩니다. 프로세스에서 오류나 예외가 발생해서 메시지를 재처리 해야 하는 경우 reject을 하게 되는데 reject를 하게 되면 다시 원래 Queue로 돌아가기 때문에 재처리 해야 할 메세지와 신규 메세지가 혼합되면서 처리 효율성이 떨어질 수 있습니다. 이때 DLX/DLQ를 구성하게 되면 reject된 메세지는 원래 Queue로 돌아가지 않고 DLX/DLQ로 가게 됩니다. 재처리 메세지와 신규 메세지가 분리함으로서 메세지 처리의 효율성을 높힐 수 있습니다.
DLX(Dead Letter Exchange)는 reject된 메세지를 어느 Queue로 보낼지 결정하는 라우팅 역할을 합니다.
DLQ(Dead Letter Queue)는 DLX에서 라우팅된 메세지를 보관할 Queue를 뜻합니다.
예제
RabbitMQ 재시작 시 DLX/DLQ의 메세지가 손실 되지 않게 하는 부분을 예제로 풀어보겠습니다. 예제 DLX/DLQ 구성은 아래와 같습니다.
예제 상황은
- 리스너가 q.exchange.work의 메세지를 소비하는 도중에 예외가 발생.
- 리스너는 예외가 발생된 메세지를 q.exchange.dlx.work로 전송하고 리스너에선 현재 메세지를 ack 처리하여 정상적으로 메세지가 소비 되었음을 알림.
- q.exchange.dlx.work는 예외 메세지에 예외가 다시 발생하면 reject하여 q.exchange.dlx.wait로 이관.
- q.exchange.dlx.wait는 지정한 x-message-ttl 시간이 초과하면 reject 되어 다시 q.exchange.dlx.work로 메세지 이관.
- 예외 메세지가 정상 처리 될 때 까지 q.exchange.dlx.work와 q.exchange.dlx.wait 메세지 이관 반복.
원래 구성에선 q.exchange.dlx.work 리스너가 언젠가는 ack 처리를 해야 하지만 예제에선 RabbitMQ 서버 재시작 시 메세지가 손실 되는지 확인 여부를 위해 ack 없이 rejec만 하고 있습니다.
구성 설정
구성 설정은 RabbitMQ 콘솔 페이지에서 구성하지 않고 Spring AMQP를 사용하였습니다. 구성 코드는 아래를 참고하시면 되겠습니다.
@Component
public class ConsumerListener {
private final static Logger logger = LoggerFactory.getLogger(ConsumerListener.class);
public ConsumerListener(AmqpAdmin amqpAdmin) throws InterruptedException {
/*
wait dlx exchange/queue 생성
리스너가 있는 exchange/queue는 RabbitMQ에 자동 생성 되나 리스너가 없는 exchange/queue는 자동 생성이 되지 않기 때문에 직접 생성해줘야 함.
*/
org.springframework.amqp.core.Queue dlxWaitQueue = QueueBuilder.durable("q.exchange.dlx.wait")
.withArguments(Map.of("x-dead-letter-exchange", "x.exchange.dlx.work"
, "x-message-ttl", 20000))
.build();
org.springframework.amqp.core.Exchange dlxWaitExchange = new DirectExchange("x.exchange.dlx.wait");
amqpAdmin.declareQueue(dlxWaitQueue);
amqpAdmin.declareExchange(dlxWaitExchange);
amqpAdmin.declareBinding(BindingBuilder.bind(dlxWaitQueue)
.to((DirectExchange)dlxWaitExchange)
.with("r1"));
}
@RabbitListener(containerFactory = "simpleRabbitListenerContainerFactory"
, bindings = @QueueBinding(value = @Queue(value = "q.exchange.work")
, exchange = @Exchange("x.exchange.work")
, key = "r1")
, ackMode = "MANUAL")
public void work(Map<String, Object> value, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
logger.info("q.exchange.work : {}", value);
// 메세지 재처리가 필요하다고 판단되어 메세지를 DLX로 Publish.
Message message = MessageBuilder.withBody(new Gson().toJson(value).getBytes(StandardCharsets.UTF_8))
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
channel.basicPublish("x.exchange.dlx.work", "r1", null, message.getBody());
// 메세지를 DLX로 보냈기 때문에 해당 메세지는 메세지 소비 처리.
channel.basicAck(tag, false);
}
@RabbitListener(containerFactory = "simpleRabbitListenerContainerFactory"
, bindings = @QueueBinding(value = @Queue(value = "q.exchange.dlx.work", arguments = {@Argument(name = "x-dead-letter-exchange", value = "x.exchange.dlx.wait")})
, exchange = @Exchange("x.exchange.dlx.work")
, key = "r1")
, ackMode = "MANUAL")
public void testListener(Map<String, Object> value, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException {
logger.info("q.exchange.dlx.work : {}", value);
// 메세지가 해당 Queue 있는 상태에서 RabbitMQ를 재시작 했을 때 메세지가 손실 여부 확을 위해 임의로 대기시간 지정.
Thread.sleep(10000);
// 테스트를 위해 대기 성격의 DLX로 메세지 이동.
channel.basicReject(tag, false);
}
}
위 예제 코드를 실행하면 RabbitMQ 콘솔 페이지에 아래와 같이 DLX/DLQ가 구성이 됩니다.
- q.exchange.work
- q.exchange.dlx.work
- q.exchange.dlx.wait
해당 Queue의 Consumers를 보면 Consumer가 없음을 확인할 수 있다. 이는 x-message-ttl 기능을 사용해서 지연 목적으로 생성되었기 때문이다.
x-message-ttl에 지정된 시간만큼 메세지를 Queue가 보관하다 지정된 시간이 지나면 리스너가 메세지를 가져가는데 리스너가 없기 때문에 해당 Queue DLX인 x.exchange.dlx.work로 메세지가 이관 됩니다.
Queue들을 보면 Details → Features의 durable 값이 true임을 확인 할 수 있습니다. 그리고 이미지엔 없지만 관련 Queue들의 Exchage도 durable 값이 true이기에 RabbitMQ가 재시작 되더라도 Exchage/Queue가 없어지질 않아 1차 메세지 손실은 방지 할 수 있습니다.
그럼 이 상태에서 x.exchange.work에 테스트 메세지를 발행하고 RabbitMQ를 재시작 해보도록 하겠습니다.
테스트 메세지를 발행 하고 나면 아래 로그와 같이 메세지를 수신 하고 있음을 확인 할 수 있습니다.
2023-06-29T18:28:35.635+09:00 INFO 52408 --- [ main] c.rabbitmq.consumer.ConsumerApplication : Starting ConsumerApplication using Java 17.0.7 with PID 52408 (D:\\Down\\rabbitmq-cd2\\consumer\\build\\classes\\java\\main started by sinna in D:\\Down\\rabbitmq-cd2)
2023-06-29T18:28:35.639+09:00 INFO 52408 --- [ main] c.rabbitmq.consumer.ConsumerApplication : No active profile set, falling back to 1 default profile: "default"
2023-06-29T18:28:36.320+09:00 INFO 52408 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:5674
2023-06-29T18:28:36.381+09:00 INFO 52408 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: connectionFactory#26ae880a:0/SimpleConnection@5e1218b4 [delegate=amqp://admin@127.0.0.1:5674/, localPort=12381]
2023-06-29T18:28:36.813+09:00 WARN 52408 --- [ main] ocalVariableTableParameterNameDiscoverer : Using deprecated '-debug' fallback for parameter name resolution. Compile the affected code with '-parameters' instead or avoid its introspection: org.springframework.amqp.rabbit.core.RabbitAdmin
2023-06-29T18:28:36.913+09:00 INFO 52408 --- [ main] c.rabbitmq.consumer.ConsumerApplication : Started ConsumerApplication in 1.787 seconds (process running for 2.847)
2023-06-29T18:41:35.559+09:00 INFO 52408 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener : q.exchange.work : {key=value1}
2023-06-29T18:41:35.573+09:00 INFO 52408 --- [ntContainer#1-1] c.r.consumer.listener.ConsumerListener : q.exchange.dlx.work : {key=value1}
2023-06-29T18:42:05.596+09:00 INFO 52408 --- [ntContainer#1-1] c.r.consumer.listener.ConsumerListener : q.exchange.dlx.work : {key=value1}
2023-06-29T18:42:35.607+09:00 INFO 52408 --- [ntContainer#1-1] c.r.consumer.listener.ConsumerListener : q.exchange.dlx.work : {key=value1}
2023-06-29T18:43:05.619+09:00 INFO 52408 --- [ntContainer#1-1] c.r.consumer.listener.ConsumerListener : q.exchange.dlx.work : {key=value1}
2023-06-29T18:43:35.631+09:00 INFO 52408 --- [ntContainer#1-1] c.r.consumer.listener.ConsumerListener : q.exchange.dlx.work : {key=value1}
2023-06-29T18:44:05.647+09:00 INFO 52408 --- [ntContainer#1-1] c.r.consumer.listener.ConsumerListener : q.exchange.dlx.work : {key=value1}
2023-06-29T18:44:35.654+09:00 INFO 52408 --- [ntContainer#1-1] c.r.consumer.listener.ConsumerListener : q.exchange.dlx.work : {key=value1}
테스트 메세지를 하나 더 발행하여 q.exchange.dlx.work
, q.exchange.dlx.wait
Queue에 메세지가 있는 상태로 만들어 둡니다.
현 상태에서 RabbitMQ를 재시작하게 되면 Queue가 손실 되었음을 알 수 있습니다.
Exchage/Queue 손실 부분은 설정을 통해 방지 하였으나 실질적으로 중요한 메세지의 Delivery mode 설정이 없기 때문입니다. Spring AMQP 기준 basicPublish
메서드로 메세지를 발급 할 때 Delivery mode를 설정해야 합니다. basicPublish
를 사용하는 부분의 아래 코드를 수정해야 합니다.
@RabbitListener(containerFactory = "simpleRabbitListenerContainerFactory"
, bindings = @QueueBinding(value = @Queue(value = "q.exchange.work")
, exchange = @Exchange("x.exchange.work")
, key = "r1")
, ackMode = "MANUAL")
public void work(Map<String, Object> value, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
.
.
.
channel.basicPublish("x.exchange.dlx.work", "r1", null, message.getBody());
.
.
.
}
세번째 매개변수가 null인걸 알 수 있는데 메서드의 구현체인 ChannelN
클래스에서 확인해보면 세번째 매개변수가 메세지 설정 객체임을 알 수 있습니다.
/** Public API - {@inheritDoc} */
@Override
public void basicPublish(String exchange, String routingKey,
BasicProperties props, byte[] body)
throws IOException
{
basicPublish(exchange, routingKey, false, props, body);
}
ChannelN
클래스에서 최종적으로 발급 처리하는 메소드 코드를 보면 props가 null인 경우 MessageProperties.MINIMAL_BASIC
Static 변수를 사용하는걸 확인 할 수 있습니다.
@Override
public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
BasicProperties props, byte[] body)
throws IOException
{
if (nextPublishSeqNo > 0) {
unconfirmedSet.add(getNextPublishSeqNo());
nextPublishSeqNo++;
}
// BasicProperties 객체인 props가 Null인 경우
if (props == null) {
props = MessageProperties.MINIMAL_BASIC;
}
AMQCommand command = new AMQCommand(
new Basic.Publish.Builder()
.exchange(exchange)
.routingKey(routingKey)
.mandatory(mandatory)
.immediate(immediate)
.build(), props, body);
try {
transmit(command);
} catch (IOException | AlreadyClosedException e) {
metricsCollector.basicPublishFailure(this, e);
throw e;
}
metricsCollector.basicPublish(this);
}
MessageProperties
클래스에 MINIMAL_BASIC
Static 변수를 보면 모든 값을 null로 초기화 하기 때문에 Delivery mode 설정이 없어 디스크가 아닌 메모리에 저장되어 RabbitMQ를 재시작하면 메세지가 손실 되었던 거였습니다.
public class MessageProperties {
/** Empty basic properties, with no fields set */
public static final BasicProperties MINIMAL_BASIC =
new BasicProperties(null, null, null, null,
null, null, null, null,
null, null, null, null,
null, null);
/** Empty basic properties, with only deliveryMode set to 2 (persistent) */
public static final BasicProperties MINIMAL_PERSISTENT_BASIC =
new BasicProperties(null, null, null, 2,
null, null, null, null,
null, null, null, null,
null, null);
/** Content-type "application/octet-stream", deliveryMode 1 (nonpersistent), priority zero */
public static final BasicProperties BASIC =
new BasicProperties("application/octet-stream",
null,
null,
1,
0, null, null, null,
null, null, null, null,
null, null);
/** Content-type "application/octet-stream", deliveryMode 2 (persistent), priority zero */
public static final BasicProperties PERSISTENT_BASIC =
new BasicProperties("application/octet-stream",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
/** Content-type "text/plain", deliveryMode 1 (nonpersistent), priority zero */
public static final BasicProperties TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
1,
0, null, null, null,
null, null, null, null,
null, null);
/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
}
Delivery mode를 2로 설정하기 위해서 활용 할 수 있는 방법은 2가지가 있습니다.
BasicProperties
클래스
아래 코드와 같이 BasicProperties
클래스의 빌더의 사용 입니다.
new AMQP.BasicProperties.Builder().deliveryMode(2).build();
MessageProperties
클래스
단 MessageProperties
클래스는 Spring AMQP, RabbitMQ Lib 둘 다 있기 때문에 한 클래스에서 사용할 경우 아래와 같이 패키지명까지 작성해야 할 수도 있습니다.
com.rabbitmq.client.MessageProperties.MINIMAL_PERSISTENT_BASIC
예제에선 BasicProperties
클래스를 사용하도록 하겠습니다. BasicProperties
클래스가 적용된 수정 코드 입니다.
@RabbitListener(containerFactory = "simpleRabbitListenerContainerFactory"
, bindings = @QueueBinding(value = @Queue(value = "q.exchange.work")
, exchange = @Exchange("x.exchange.work")
, key = "r1")
, ackMode = "MANUAL")
public void work(Map<String, Object> value, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
logger.info("q.exchange.work : {}", value);
Message message = MessageBuilder.withBody(new Gson().toJson(value).getBytes(StandardCharsets.UTF_8))
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
// 기존 null이던 세번째 매개변수를 BasicProperties 클래스를 이용해서 Delivery Mode(디스크 저장)를 2로 설정.
channel.basicPublish("x.exchange.dlx.work", "r1", new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBody());
channel.basicAck(tag, false);
}
적용 후 다시 테스트 메세지를 발송하고 RabbitMQ를 재시작 하게 되면 아래와 같이 메세지가 유지되어 있음을 확인 할 수 있습니다.
로그를 확인 했을 때도 메세지가 소실되지 않았음을 확인 할 수 있습니다.
2023-06-29T19:41:11.421+09:00 INFO 37052 --- [ntContainer#0-4] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@660e89ca: tags=[[]], channel=null, acknowledgeMode=MANUAL local queue size=0
2023-06-29T19:41:21.330+09:00 INFO 37052 --- [ntContainer#1-5] c.r.consumer.listener.ConsumerListener : q.exchange.dlx.work : {key=value1}
2023-06-29T19:41:41.334+09:00 INFO 37052 --- [ntContainer#1-5] c.r.consumer.listener.ConsumerListener : q.exchange.dlx.work : {key=value2}
2023-06-29T19:41:51.353+09:00 INFO 37052 --- [ntContainer#1-5] c.r.consumer.listener.ConsumerListener : q.exchange.dlx.work : {key=value1}
2023-06-29T19:42:11.358+09:00 INFO 37052 --- [ntContainer#1-5] c.r.consumer.listener.ConsumerListener : q.exchange.dlx.work : {key=value2}
2023-06-29T19:42:21.527+09:00 INFO 37052 --- [ntContainer#1-5] c.r.consumer.listener.ConsumerListener : q.exchange.dlx.work : {key=value1}
그리고 또다른 방법으론 Message
클래스를 사용하는 방법 입니다.
// MessageProperties는 RabbitMQ Lib가 아닌 Spring AMQP 클래스.
MessageProperties messageProperties = new MessageProperties();
// MessageProperties.DEFAULT_DELIVERY_MODE는 Delivery Mode 2를 뜻함.
messageProperties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);
Message message = MessageBuilder.withBody(new Gson().toJson(value).getBytes(StandardCharsets.UTF_8))
// andProperties 메서드에 위 messageProperties 객체를 추가.
.andProperties(messageProperties)
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
// 세번째 매개변수는 null이여도 상관없음.
channel.basicPublish("x.exchange.dlx.work", "r1", null, message.getBody());
부록
- Spring AMQP의 RabbitTemplate 클래스에서의 Delivery Mode는?
RabbitTemplate#convertAndSend
메서드를 사용해서 메세지를 발급할 경우 Message
클래스에 MessageProperties#setDeliveryMode
메서드를 통해 Delivery Mode 2로 설정된 객체 활용해도 되지만 기본적으로 Delivery Mode는 2로 설정 됩니다.
만약 아래와 같이 메세지 전문만 설정 했을 경우
Message message = MessageBuilder.withBody(value.getBytes(StandardCharsets.UTF_8))
.build();
Message
클래스에 아래의 생성자가 호출이 됩니다.
public Message(byte[] body) {
this(body, new MessageProperties());
}
호출 생성자의 두 번째 매개변수로 MessageProperties
객체가 생성 되는데 해당 클래스의 기본 Delivery Mode 상태를 보면
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
// 메세지에 실제로 적용되는 Deliver mode
private MessageDeliveryMode deliveryMode = DEFAULT_DELIVERY_MODE;
PERSISTENT(디스크 모드)로 설정하고 있음을 알 수 있습니다.
그리고 RabbitTemplate
클래스 내에서 메세지 발급 메서드 순서는 doSend → observeTheSend → sendToRabbit인데 sendToRabbit 메서드 코드를 보면 아래와 같이 Delivery Mode 2로 설정된 메세지를 활용 하고 있음을 알 수 있습니다.