ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • RabbitMQ DLX/DLQ 메세지 손실 방지
    MQ 2023. 7. 3. 11:24
    반응형

    RabbitMQ DLX/DLQ 메세지 손실 방지

    이번 글에 주제는 RabbitMQ를 재시작 했을 때 DLX(Dead Letter Exchange)와 DLQ(Dead Letter Queue)의 메세지가 손실 되지 않는 구성 및 코드를 작성하는 방법에 대해서 쓰려고 합니다. 물론 DLX/DLQ에만 국한된 이야기가 아니고 Exchange/Queue에도 관련된 이야기 입니다. DLX/DLQ도 결국에는 Exchange이면서 Queue이기도 하기 때문입니다.

    이와 관련해서 사용된 기술은 Spring AMQP 기준 입니다. 그렇지만 기술만 그러할 뿐 근본적인 내용은 RabbitMQ에 의존하기 때문에 다른 언어에서도 지원 되는 기술을 찾아 동일하게 적용할 수 있습니다.

    방법

    RabbitMQ 서버가 재시작 되었을 때 메세지가 없어지는 현상을 방지하기 위해선

    • Exchange/Queue 설정의 Durability값이 true인지와 Auto delete값이 false인지 확인
    • 메세지를 발송 할 때 메세지의 Delivery mode가 2인지 확인
      • 1 메모리 저장(손실 발생)
      • 2 디스크 저장(손실 없음)

    이와 관련해서 자세한 사항은 아래를 참고하시면 되겠습니다.

    Exchange/Queue

    DLX/DLQ에 대한 설명을 드리기 앞서 DLX/DLQ도 Exchange/Queue의 한 부분이기 때문에 선행으로 Exchange/Queue에 대한 설명을 하자면 Exchange/Queue 구성은 기본적으로 메모리에 저장됩니다. 따라서 RabbitMQ 서버를 재시작하게 되면 메모리가 초기화 되기 때문에 Exchange/Queue 구성 역시 사라지게 됩니다.

    Exchange/Queue를 영구적으로 하기 위해선 Durability옵션을 true로 Auto delete옵션을 false로 설정하면 메모리가 아닌 디스크에 저장되기 때문에 RabbitMQ 서버를 재시작하더라도 사라지지 않습니다. 다행이 Exchange/Queue를 생성 할 때 해당 값들은 기본 값으로 설정되어 있습니다.

    Untitled

    위 이미지와 같이 Exchange를 생성하는 RabbitMQ 콘솔 페이지를 보면 기본 설정이 영구 저장 상태임을 알 수 있습니다. 그리고 Queue도 마찬가지로 아래와 같이 기본 설정이 영구 저장 상태임을 알 수 있습니다.

    Untitled

    그리고 Spring AMQP의 @Exchange 어노테이션으로 생성 할 때도 기본 값은 영구 저장 상태입니다.

    @Target({})
    @Retention(RetentionPolicy.RUNTIME)
    public @interface Exchange {
    
        .
        .
        .
    
        // RabbitMQ의 Durability옵션 중 Durable 값을 뜻함.
        String durable() default TRUE;
    
        // RabbitMQ의 Auto delete옵션 중 No 값을 뜻함.
        String autoDelete() default FALSE;
    
        .
        .
        .
    
    }

    @Queue 어노테이션을 사용 했을 때는 상황이 다릅니다. @Queue 어노테이션의 durable, autoDelete 변수를 보면 Queue에 이름이 있는 경우에만 영구 저장된다고 나와 있습니다.

    @Target({})
    @Retention(RetentionPolicy.RUNTIME)
    public @interface Queue {
    
        .
        .
        .
    
        /**
         * Specifies if this queue should be durable.
         * By default if queue name is provided it is durable.
         * @return true if the queue is to be declared as durable.
         * @see org.springframework.amqp.core.Queue#isDurable()
         */
        String durable() default "";
    
        /**
         * Specifies if this queue should be auto deleted when not used.
         * By default if queue name is provided it is not auto-deleted.
         * @return true if the queue is to be declared as auto-delete.
         * @see org.springframework.amqp.core.Queue#isAutoDelete()
         */
        String autoDelete() default "";
    
        .
        .
        .
    }

    @Queue 어노테이션을 사용 할 땐 name 혹은 value 값에 Queue 이름을 지정하거나 durable에 true, autoDelete에 false로 값을 초기화 해야 합니다.

    만약 여기서 name 혹은 value 값을 지정하지 않는 경우엔 RabbitListenerAnnotationBeanPostProcessor#declareQueue 메서드에서 durable를 false, autoDelete를 true로 초기화 하게 됩니다.

    public class RabbitListenerAnnotationBeanPostProcessor
            implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware,
            SmartInitializingSingleton {
    
        .
        .
        .
    
        private String declareQueue(org.springframework.amqp.rabbit.annotation.Queue bindingQueue,
                    Collection<Declarable> declarables) {
                String queueName = (String) resolveExpression(bindingQueue.value());
                boolean isAnonymous = false;
    
                // Queue 이름이 없는 경우 검증
                if (!StringUtils.hasText(queueName)) {
                    queueName = Base64UrlNamingStrategy.DEFAULT.generateName();
    
                    // 아래 코드의 주석대로 기본 값을 비영구로 설정.
                    // default exclusive/autodelete and non-durable when anonymous
                    isAnonymous = true;
                }
                Queue queue = new Queue(queueName,
                        resolveExpressionAsBoolean(bindingQueue.durable(), !isAnonymous),
                        resolveExpressionAsBoolean(bindingQueue.exclusive(), isAnonymous),
                        resolveExpressionAsBoolean(bindingQueue.autoDelete(), isAnonymous),
                        resolveArguments(bindingQueue.arguments()));
                queue.setIgnoreDeclarationExceptions(resolveExpressionAsBoolean(bindingQueue.ignoreDeclarationExceptions()));
                ((ConfigurableBeanFactory) this.beanFactory).registerSingleton(queueName + ++this.increment, queue);
                if (bindingQueue.admins().length > 0) {
                    queue.setAdminsThatShouldDeclare((Object[]) bindingQueue.admins());
                }
                queue.setShouldDeclare(resolveExpressionAsBoolean(bindingQueue.declare()));
                declarables.add(queue);
                return queueName;
            }
    
        .
        .
        .
    }

    그래서 아래 이미지와 같이 durable은 false, autoDelete는 true로 초기화 되어 Queue가 영구적으로 될 수 없으며 name을 보면 Spring AMQP에서 임의에 이름으로 초기화 합니다.

    Untitled

    RabbitMQ 콘솔 페이지에서 확인하면 아래 이미지와 같이 비영구 상태로 생성 되어 있습니다.

    Untitled

    영구 상태의 경우에는 아래 이미지와 같이 auto-delete 부분이 없고 durable이 true 입니다.

    Untitled

    만약에 어노테이션이 아닌 org.springframework.amqp.core 패키지의 Queue 클래스로 Queue를 생성한다면 Queue 클래스의 점층적 생성자 패턴으로 durable은 true, autoDelete는 false로 기본 값으로 초기화 됩니다.

    package org.springframework.amqp.core;
    
    /**
    * 첫번째 매개변수 : queue 이름
    * 두번째 매개변수 : durable
    * 세번째 매개변수 : exclusive
    * 네번째 매개변수 : autoDelete
    */
    public class Queue extends AbstractDeclarable implements Cloneable {
    
        .
        .
        .
    
        /**
         * The queue is durable, non-exclusive and non auto-delete.
         *
         * @param name the name of the queue.
         */
        public Queue(String name) {
            // Queue 이름만 초기화 하는 생성자 메서드를 보면 durable(두번째 매개변수)은 true, autoDelete(네번째 매개변수)는 false로 초기화
            this(name, true, false, false);
        }
    
        /**
         * Construct a new queue, given a name and durability flag. The queue is non-exclusive and non auto-delete.
         *
         * @param name the name of the queue.
         * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
         */
        public Queue(String name, boolean durable) {
            this(name, durable, false, false, null);
        }
    
        /**
         * Construct a new queue, given a name, durability, exclusive and auto-delete flags.
         * @param name the name of the queue.
         * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
         * @param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's
         * connection)
         * @param autoDelete true if the server should delete the queue when it is no longer in use
         */
        public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
            this(name, durable, exclusive, autoDelete, null);
        }
    
        /**
         * Construct a new queue, given a name, durability flag, and auto-delete flag, and arguments.
         * @param name the name of the queue - must not be null; set to "" to have the broker generate the name.
         * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
         * @param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's
         * connection)
         * @param autoDelete true if the server should delete the queue when it is no longer in use
         * @param arguments the arguments used to declare the queue
         */
        public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete,
                @Nullable Map<String, Object> arguments) {
    
            super(arguments);
            Assert.notNull(name, "'name' cannot be null");
            this.name = name;
            this.actualName = StringUtils.hasText(name) ? name
                    : (Base64UrlNamingStrategy.DEFAULT.generateName() + "_awaiting_declaration");
            this.durable = durable;
            this.exclusive = exclusive;
            this.autoDelete = autoDelete;
        }
    
        .
        .
        .
    
    }

    정리

    • Exchage/Queue 정보를 디스크에 저장하기 위해선 durable 은 true, autoDelete 는 false 여야한다.
    • RabbitMQ 콘솔 페이지에서 Exchage/Queue 생성 시 기본 값은 디스크 저장 방식이다.
    • Spring AMQP인 경우
      • @Exchange 어노테이션의 기본 값은 디스크 저장 방식이고 @Queue 어노테이션은 Queue 이름(value 혹은 name 값)을 지정 하거나 durable 값에 true, autoDelete 값에 false로 지정해야 한다.
      • org.springframework.amqp.core#Queue 클래스로 Queue를 생성할 경우 Queue 이름만 초기화 하는 생성자 메서드를 사용하면 durable는 true, autoDelete는 false로 기본 초기화가 된다.

    DLX/DLQ

    DLX/DLQ 구성은 프로세스 오류를 유연하게 처리하기 위해 사용됩니다. 프로세스에서 오류나 예외가 발생해서 메시지를 재처리 해야 하는 경우 reject을 하게 되는데 reject를 하게 되면 다시 원래 Queue로 돌아가기 때문에 재처리 해야 할 메세지와 신규 메세지가 혼합되면서 처리 효율성이 떨어질 수 있습니다. 이때 DLX/DLQ를 구성하게 되면 reject된 메세지는 원래 Queue로 돌아가지 않고 DLX/DLQ로 가게 됩니다. 재처리 메세지와 신규 메세지가 분리함으로서 메세지 처리의 효율성을 높힐 수 있습니다.

    DLX(Dead Letter Exchange)는 reject된 메세지를 어느 Queue로 보낼지 결정하는 라우팅 역할을 합니다.

    DLQ(Dead Letter Queue)는 DLX에서 라우팅된 메세지를 보관할 Queue를 뜻합니다.

    예제

    RabbitMQ 재시작 시 DLX/DLQ의 메세지가 손실 되지 않게 하는 부분을 예제로 풀어보겠습니다. 예제 DLX/DLQ 구성은 아래와 같습니다.

    Untitled

    예제 상황은

    • 리스너가 q.exchange.work의 메세지를 소비하는 도중에 예외가 발생.
    • 리스너는 예외가 발생된 메세지를 q.exchange.dlx.work로 전송하고 리스너에선 현재 메세지를 ack 처리하여 정상적으로 메세지가 소비 되었음을 알림.
    • q.exchange.dlx.work는 예외 메세지에 예외가 다시 발생하면 reject하여 q.exchange.dlx.wait로 이관.
    • q.exchange.dlx.wait는 지정한 x-message-ttl 시간이 초과하면 reject 되어 다시 q.exchange.dlx.work로 메세지 이관.
    • 예외 메세지가 정상 처리 될 때 까지 q.exchange.dlx.work와 q.exchange.dlx.wait 메세지 이관 반복.

    원래 구성에선 q.exchange.dlx.work 리스너가 언젠가는 ack 처리를 해야 하지만 예제에선 RabbitMQ 서버 재시작 시 메세지가 손실 되는지 확인 여부를 위해 ack 없이 rejec만 하고 있습니다.

    구성 설정

    구성 설정은 RabbitMQ 콘솔 페이지에서 구성하지 않고 Spring AMQP를 사용하였습니다. 구성 코드는 아래를 참고하시면 되겠습니다.

    @Component
    public class ConsumerListener {
        private final static Logger logger = LoggerFactory.getLogger(ConsumerListener.class);
    
        public ConsumerListener(AmqpAdmin amqpAdmin) throws InterruptedException {
            /*
            wait dlx exchange/queue 생성
            리스너가 있는 exchange/queue는 RabbitMQ에 자동 생성 되나 리스너가 없는 exchange/queue는 자동 생성이 되지 않기 때문에 직접 생성해줘야 함.
             */
            org.springframework.amqp.core.Queue dlxWaitQueue = QueueBuilder.durable("q.exchange.dlx.wait")
                    .withArguments(Map.of("x-dead-letter-exchange", "x.exchange.dlx.work"
                            , "x-message-ttl", 20000))
                    .build();
    
            org.springframework.amqp.core.Exchange dlxWaitExchange = new DirectExchange("x.exchange.dlx.wait");
    
            amqpAdmin.declareQueue(dlxWaitQueue);
            amqpAdmin.declareExchange(dlxWaitExchange);
    
            amqpAdmin.declareBinding(BindingBuilder.bind(dlxWaitQueue)
                    .to((DirectExchange)dlxWaitExchange)
                    .with("r1"));
        }
    
        @RabbitListener(containerFactory = "simpleRabbitListenerContainerFactory"
                , bindings = @QueueBinding(value = @Queue(value = "q.exchange.work")
                    , exchange = @Exchange("x.exchange.work")
                    , key = "r1")
                , ackMode = "MANUAL")
        public void work(Map<String, Object> value, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    
            logger.info("q.exchange.work : {}", value);
    
            // 메세지 재처리가 필요하다고 판단되어 메세지를 DLX로 Publish.
            Message message = MessageBuilder.withBody(new Gson().toJson(value).getBytes(StandardCharsets.UTF_8))
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                    .build();
    
            channel.basicPublish("x.exchange.dlx.work", "r1", null, message.getBody());
    
            // 메세지를 DLX로 보냈기 때문에 해당 메세지는 메세지 소비 처리.
            channel.basicAck(tag, false);
        }
    
        @RabbitListener(containerFactory = "simpleRabbitListenerContainerFactory"
                , bindings = @QueueBinding(value = @Queue(value = "q.exchange.dlx.work", arguments = {@Argument(name = "x-dead-letter-exchange", value = "x.exchange.dlx.wait")})
                    , exchange = @Exchange("x.exchange.dlx.work")
                    , key = "r1")
                , ackMode = "MANUAL")
        public void testListener(Map<String, Object> value, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException {
    
            logger.info("q.exchange.dlx.work : {}", value);
    
            // 메세지가 해당 Queue 있는 상태에서 RabbitMQ를 재시작 했을 때 메세지가 손실 여부 확을 위해 임의로 대기시간 지정.
            Thread.sleep(10000);
    
            // 테스트를 위해 대기 성격의 DLX로 메세지 이동.
            channel.basicReject(tag, false);
        }
    }

    위 예제 코드를 실행하면 RabbitMQ 콘솔 페이지에 아래와 같이 DLX/DLQ가 구성이 됩니다.

    • q.exchange.work

    Untitled

    • q.exchange.dlx.work

    Untitled

    • q.exchange.dlx.wait

    해당 Queue의 Consumers를 보면 Consumer가 없음을 확인할 수 있다. 이는 x-message-ttl 기능을 사용해서 지연 목적으로 생성되었기 때문이다.

    x-message-ttl에 지정된 시간만큼 메세지를 Queue가 보관하다 지정된 시간이 지나면 리스너가 메세지를 가져가는데 리스너가 없기 때문에 해당 Queue DLX인 x.exchange.dlx.work로 메세지가 이관 됩니다.

    Untitled

    Queue들을 보면 Details → Features의 durable 값이 true임을 확인 할 수 있습니다. 그리고 이미지엔 없지만 관련 Queue들의 Exchage도 durable 값이 true이기에 RabbitMQ가 재시작 되더라도 Exchage/Queue가 없어지질 않아 1차 메세지 손실은 방지 할 수 있습니다.

    그럼 이 상태에서 x.exchange.work에 테스트 메세지를 발행하고 RabbitMQ를 재시작 해보도록 하겠습니다.

    Untitled

    테스트 메세지를 발행 하고 나면 아래 로그와 같이 메세지를 수신 하고 있음을 확인 할 수 있습니다.

    2023-06-29T18:28:35.635+09:00  INFO 52408 --- [           main] c.rabbitmq.consumer.ConsumerApplication  : Starting ConsumerApplication using Java 17.0.7 with PID 52408 (D:\\Down\\rabbitmq-cd2\\consumer\\build\\classes\\java\\main started by sinna in D:\\Down\\rabbitmq-cd2)
    2023-06-29T18:28:35.639+09:00  INFO 52408 --- [           main] c.rabbitmq.consumer.ConsumerApplication  : No active profile set, falling back to 1 default profile: "default"
    2023-06-29T18:28:36.320+09:00  INFO 52408 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: localhost:5674
    2023-06-29T18:28:36.381+09:00  INFO 52408 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: connectionFactory#26ae880a:0/SimpleConnection@5e1218b4 [delegate=amqp://admin@127.0.0.1:5674/, localPort=12381]
    2023-06-29T18:28:36.813+09:00  WARN 52408 --- [           main] ocalVariableTableParameterNameDiscoverer : Using deprecated '-debug' fallback for parameter name resolution. Compile the affected code with '-parameters' instead or avoid its introspection: org.springframework.amqp.rabbit.core.RabbitAdmin
    2023-06-29T18:28:36.913+09:00  INFO 52408 --- [           main] c.rabbitmq.consumer.ConsumerApplication  : Started ConsumerApplication in 1.787 seconds (process running for 2.847)
    2023-06-29T18:41:35.559+09:00  INFO 52408 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener   : q.exchange.work : {key=value1}
    2023-06-29T18:41:35.573+09:00  INFO 52408 --- [ntContainer#1-1] c.r.consumer.listener.ConsumerListener   : q.exchange.dlx.work : {key=value1}
    2023-06-29T18:42:05.596+09:00  INFO 52408 --- [ntContainer#1-1] c.r.consumer.listener.ConsumerListener   : q.exchange.dlx.work : {key=value1}
    2023-06-29T18:42:35.607+09:00  INFO 52408 --- [ntContainer#1-1] c.r.consumer.listener.ConsumerListener   : q.exchange.dlx.work : {key=value1}
    2023-06-29T18:43:05.619+09:00  INFO 52408 --- [ntContainer#1-1] c.r.consumer.listener.ConsumerListener   : q.exchange.dlx.work : {key=value1}
    2023-06-29T18:43:35.631+09:00  INFO 52408 --- [ntContainer#1-1] c.r.consumer.listener.ConsumerListener   : q.exchange.dlx.work : {key=value1}
    2023-06-29T18:44:05.647+09:00  INFO 52408 --- [ntContainer#1-1] c.r.consumer.listener.ConsumerListener   : q.exchange.dlx.work : {key=value1}
    2023-06-29T18:44:35.654+09:00  INFO 52408 --- [ntContainer#1-1] c.r.consumer.listener.ConsumerListener   : q.exchange.dlx.work : {key=value1}

    테스트 메세지를 하나 더 발행하여 q.exchange.dlx.work, q.exchange.dlx.wait Queue에 메세지가 있는 상태로 만들어 둡니다.

    Untitled

    현 상태에서 RabbitMQ를 재시작하게 되면 Queue가 손실 되었음을 알 수 있습니다.

    Untitled

    Exchage/Queue 손실 부분은 설정을 통해 방지 하였으나 실질적으로 중요한 메세지의 Delivery mode 설정이 없기 때문입니다. Spring AMQP 기준 basicPublish 메서드로 메세지를 발급 할 때 Delivery mode를 설정해야 합니다. basicPublish 를 사용하는 부분의 아래 코드를 수정해야 합니다.

    @RabbitListener(containerFactory = "simpleRabbitListenerContainerFactory"
                , bindings = @QueueBinding(value = @Queue(value = "q.exchange.work")
                    , exchange = @Exchange("x.exchange.work")
                    , key = "r1")
                , ackMode = "MANUAL")
        public void work(Map<String, Object> value, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    
            .
            .
            .
    
            channel.basicPublish("x.exchange.dlx.work", "r1", null, message.getBody());
    
            .
            .
            .
        }

    세번째 매개변수가 null인걸 알 수 있는데 메서드의 구현체인 ChannelN 클래스에서 확인해보면 세번째 매개변수가 메세지 설정 객체임을 알 수 있습니다.

    /** Public API - {@inheritDoc} */
      @Override
      public void basicPublish(String exchange, String routingKey,
                             BasicProperties props, byte[] body)
          throws IOException
      {
          basicPublish(exchange, routingKey, false, props, body);
      }

    ChannelN 클래스에서 최종적으로 발급 처리하는 메소드 코드를 보면 props가 null인 경우 MessageProperties.MINIMAL_BASIC Static 변수를 사용하는걸 확인 할 수 있습니다.

    @Override
    public void basicPublish(String exchange, String routingKey,
                             boolean mandatory, boolean immediate,
                             BasicProperties props, byte[] body)
        throws IOException
    {
        if (nextPublishSeqNo > 0) {
            unconfirmedSet.add(getNextPublishSeqNo());
            nextPublishSeqNo++;
        }
    
        // BasicProperties 객체인 props가 Null인 경우
        if (props == null) {
            props = MessageProperties.MINIMAL_BASIC;
        }
        AMQCommand command = new AMQCommand(
            new Basic.Publish.Builder()
                .exchange(exchange)
                .routingKey(routingKey)
                .mandatory(mandatory)
                .immediate(immediate)
                .build(), props, body);
        try {
            transmit(command);
        } catch (IOException | AlreadyClosedException e) {
            metricsCollector.basicPublishFailure(this, e);
            throw e;
        }
        metricsCollector.basicPublish(this);
    }

    MessageProperties 클래스에 MINIMAL_BASIC Static 변수를 보면 모든 값을 null로 초기화 하기 때문에 Delivery mode 설정이 없어 디스크가 아닌 메모리에 저장되어 RabbitMQ를 재시작하면 메세지가 손실 되었던 거였습니다.

    public class MessageProperties {
    
        /** Empty basic properties, with no fields set */
        public static final BasicProperties MINIMAL_BASIC =
            new BasicProperties(null, null, null, null,
                                null, null, null, null,
                                null, null, null, null,
                                null, null);
        /** Empty basic properties, with only deliveryMode set to 2 (persistent) */
        public static final BasicProperties MINIMAL_PERSISTENT_BASIC =
            new BasicProperties(null, null, null, 2,
                                null, null, null, null,
                                null, null, null, null,
                                null, null);
    
        /** Content-type "application/octet-stream", deliveryMode 1 (nonpersistent), priority zero */
        public static final BasicProperties BASIC =
            new BasicProperties("application/octet-stream",
                                null,
                                null,
                                1,
                                0, null, null, null,
                                null, null, null, null,
                                null, null);
    
        /** Content-type "application/octet-stream", deliveryMode 2 (persistent), priority zero */
        public static final BasicProperties PERSISTENT_BASIC =
            new BasicProperties("application/octet-stream",
                                null,
                                null,
                                2,
                                0, null, null, null,
                                null, null, null, null,
                                null, null);
    
        /** Content-type "text/plain", deliveryMode 1 (nonpersistent), priority zero */
        public static final BasicProperties TEXT_PLAIN =
            new BasicProperties("text/plain",
                                null,
                                null,
                                1,
                                0, null, null, null,
                                null, null, null, null,
                                null, null);
    
        /** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
        public static final BasicProperties PERSISTENT_TEXT_PLAIN =
            new BasicProperties("text/plain",
                                null,
                                null,
                                2,
                                0, null, null, null,
                                null, null, null, null,
                                null, null);
    }

    Delivery mode를 2로 설정하기 위해서 활용 할 수 있는 방법은 2가지가 있습니다.

    • BasicProperties 클래스

    아래 코드와 같이 BasicProperties 클래스의 빌더의 사용 입니다.

    new AMQP.BasicProperties.Builder().deliveryMode(2).build();
    • MessageProperties 클래스

    MessageProperties 클래스는 Spring AMQP, RabbitMQ Lib 둘 다 있기 때문에 한 클래스에서 사용할 경우 아래와 같이 패키지명까지 작성해야 할 수도 있습니다.

    com.rabbitmq.client.MessageProperties.MINIMAL_PERSISTENT_BASIC

    예제에선 BasicProperties 클래스를 사용하도록 하겠습니다. BasicProperties 클래스가 적용된 수정 코드 입니다.

    @RabbitListener(containerFactory = "simpleRabbitListenerContainerFactory"
                , bindings = @QueueBinding(value = @Queue(value = "q.exchange.work")
                    , exchange = @Exchange("x.exchange.work")
                    , key = "r1")
                , ackMode = "MANUAL")
        public void work(Map<String, Object> value, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    
            logger.info("q.exchange.work : {}", value);
    
            Message message = MessageBuilder.withBody(new Gson().toJson(value).getBytes(StandardCharsets.UTF_8))
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                    .build();
    
            // 기존 null이던 세번째 매개변수를 BasicProperties 클래스를 이용해서 Delivery Mode(디스크 저장)를 2로 설정.
            channel.basicPublish("x.exchange.dlx.work", "r1", new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBody());
    
            channel.basicAck(tag, false);
        }

    적용 후 다시 테스트 메세지를 발송하고 RabbitMQ를 재시작 하게 되면 아래와 같이 메세지가 유지되어 있음을 확인 할 수 있습니다.

    Untitled

    로그를 확인 했을 때도 메세지가 소실되지 않았음을 확인 할 수 있습니다.

    2023-06-29T19:41:11.421+09:00  INFO 37052 --- [ntContainer#0-4] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@660e89ca: tags=[[]], channel=null, acknowledgeMode=MANUAL local queue size=0
    2023-06-29T19:41:21.330+09:00  INFO 37052 --- [ntContainer#1-5] c.r.consumer.listener.ConsumerListener   : q.exchange.dlx.work : {key=value1}
    2023-06-29T19:41:41.334+09:00  INFO 37052 --- [ntContainer#1-5] c.r.consumer.listener.ConsumerListener   : q.exchange.dlx.work : {key=value2}
    2023-06-29T19:41:51.353+09:00  INFO 37052 --- [ntContainer#1-5] c.r.consumer.listener.ConsumerListener   : q.exchange.dlx.work : {key=value1}
    2023-06-29T19:42:11.358+09:00  INFO 37052 --- [ntContainer#1-5] c.r.consumer.listener.ConsumerListener   : q.exchange.dlx.work : {key=value2}
    2023-06-29T19:42:21.527+09:00  INFO 37052 --- [ntContainer#1-5] c.r.consumer.listener.ConsumerListener   : q.exchange.dlx.work : {key=value1}

    그리고 또다른 방법으론 Message 클래스를 사용하는 방법 입니다.

    // MessageProperties는 RabbitMQ Lib가 아닌 Spring AMQP 클래스.
    MessageProperties messageProperties = new MessageProperties();
    // MessageProperties.DEFAULT_DELIVERY_MODE는 Delivery Mode 2를 뜻함.
    messageProperties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);
    
    Message message = MessageBuilder.withBody(new Gson().toJson(value).getBytes(StandardCharsets.UTF_8))
             // andProperties 메서드에 위 messageProperties 객체를 추가.
            .andProperties(messageProperties)
            .setContentType(MessageProperties.CONTENT_TYPE_JSON)
            .build();
    
    // 세번째 매개변수는 null이여도 상관없음.
    channel.basicPublish("x.exchange.dlx.work", "r1", null, message.getBody());

    부록

    • Spring AMQP의 RabbitTemplate 클래스에서의 Delivery Mode는?

    RabbitTemplate#convertAndSend 메서드를 사용해서 메세지를 발급할 경우 Message 클래스에 MessageProperties#setDeliveryMode 메서드를 통해 Delivery Mode 2로 설정된 객체 활용해도 되지만 기본적으로 Delivery Mode는 2로 설정 됩니다.

    만약 아래와 같이 메세지 전문만 설정 했을 경우

    Message message = MessageBuilder.withBody(value.getBytes(StandardCharsets.UTF_8))
                    .build();

    Message 클래스에 아래의 생성자가 호출이 됩니다.

    public Message(byte[] body) {
        this(body, new MessageProperties());
    }

    호출 생성자의 두 번째 매개변수로 MessageProperties 객체가 생성 되는데 해당 클래스의 기본 Delivery Mode 상태를 보면

    public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
    
    // 메세지에 실제로 적용되는 Deliver mode
    private MessageDeliveryMode deliveryMode = DEFAULT_DELIVERY_MODE;

    PERSISTENT(디스크 모드)로 설정하고 있음을 알 수 있습니다.

    그리고 RabbitTemplate 클래스 내에서 메세지 발급 메서드 순서는 doSend → observeTheSend → sendToRabbit인데 sendToRabbit 메서드 코드를 보면 아래와 같이 Delivery Mode 2로 설정된 메세지를 활용 하고 있음을 알 수 있습니다.

    반응형

    'MQ' 카테고리의 다른 글

    RabbitMQ 리스너 메세지 무손실 배포  (0) 2023.07.03

    댓글

Designed by Tistory.