문제
Queue에 메세지가 존재 하는 상황에서 배포를 할 경우 리스너에서 Queue 처리가 어디까지 진행되었는지 모르는 상황인데 이때 리스너를 종료하게 되면 Queue가 unacked 상태가 되거나 혹은 다시 대기열로 들어가서 Queue를 재처리 하는 과정에서 원치 않는 프로세스의 중복을 유발 시킬 수도 있다. 그래서 이를 피하긴 위해 Queue에 메세지가 있는지 확인하고 없는 상황에서만 배포를 해야 하는데 그럴려면 Queue에 메세지를 상시 확인 해야 하며 배포 일자를 원하는 시기에 맞춰서 할 수 없다는 문제가 있다.
목적
Queue에 메세지가 존재하거나 리스너에서 Queue를 처리 하고 있는 상태에서도 Queue 메세지를 상시 확인하지 않고 배포 일자를 원하는 시기에 맞춰 할 수 있게 리스너 배포를 구성.
흐름
기본
리스너 배포의 기본 구성은
- RabbitMQ를 1대 이상 구축
- 라우팅을 통해 메세지를 어느 RabbitMQ 서버로 전달할지 결정
리스너 배포
신규로 배포할 리스너를 종료하지 않은 상태에서 다른 RabbitMQ와 연결하는 리스너를 배포.
라우팅
라우팅을 통해 메세지를 배포한 리스너와 연결되어 있는 RabbitMQ로 전달.
이전 리스너 종료
이전 리스너와 연결되어 있는 RabbitMQ에 Queue 처리가 모두 정상적으로 완료되면 이전 리스너는 종료 및 삭제.
이슈사항
- 라우팅
Nginx를 사용하여 라우팅을 하려 했으나 RabbitMQ의 메세지 전달 방식이 Push가 아닌 Pull 방식이라서 리스너와 RabbitMQ가 한번 연결되면 연결을 끊지 않고 계속 유지하는 부분 때문에 Nginx에서 라우팅 설정 후 Nginx reload를 하더라도 라우팅한 RabbitMQ로 접속하지 않고 기존 RabbitMQ 서버와의 연결이 유지 되는 문제.
이 문제를 해결 하기 위해서 사용한 방법은 Spring의 AbstractRoutingConnectionFactory
추상 클래스를 구현해서 메세지를 어느 RabbitMQ로 전달할지를 구현하는 방법으로 해결.
환경
Spring과 RabbitMQ 사이에 발행과 소비를 위해 Spring Amqp를 활용.
- Publisher(메세지 발행)
- spring boot
- spring amqp
- spring data jpa
- spring web
- H2
spring data jpa, spring web, h2는 라우팅 분기 테스트를 위해 추가한 라이브러리임으로 필수는 아니다. h2는 in-memory 모드.
- Consumer(메세지 소비)
- spring boot
- spring amqp
설정
Publisher(메세지 발행)
- BlueGreenRabbitMQConfig
라우팅은 외부 어플리케이션이나 서비스를 사용하지 않고 Spring Amqp의 AbstractRoutingConnectionFactory
추상 클래스를 사용하며 Consumer에는 영향이 없고 Publisher 영역에만 설정.
AbstractRoutingConnectionFactory
추상 클래스를 사용하기 위한 RabbitMQ 연결 설정은 아래 코드를 참고.
@Configuration
public class BlueGreenRabbitMQConfig {
/**
* <pre>
* Blue RabbitMQ 서버 연결 클래스.
* </pre>
*/
@Configuration
public class BlueRabbitMQConfig {
private final int port;
private final String userName;
private final String password;
private final String virtualHost;
private final String host;
public BlueRabbitMQConfig(@Value("${mq.rabbitmq.blue.port}") int port
, @Value("${mq.rabbitmq.blue.username}") String userName
, @Value("${mq.rabbitmq.blue.password}") String password
, @Value("${mq.rabbitmq.blue.virtual-host}") String virtualHost
, @Value("${mq.rabbitmq.blue.host}") String host) {
this.port = port;
this.userName = userName;
this.password = password;
this.virtualHost = virtualHost;
this.host = host;
}
@Bean
public SimpleRabbitListenerContainerFactory blueSimpleRabbitListenerContainerFactory() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
simpleRabbitListenerContainerFactory.setConnectionFactory(this.blueConnectionFactory());
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
simpleRabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
return simpleRabbitListenerContainerFactory;
}
@Primary
@Bean
public ConnectionFactory blueConnectionFactory() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
connectionFactory.setPort(this.port);
connectionFactory.setUsername(this.userName);
connectionFactory.setPassword(this.password);
connectionFactory.setVirtualHost(this.virtualHost);
connectionFactory.setUri("amqp://" + this.host);
connectionFactory.setAutomaticRecoveryEnabled(false);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
return cachingConnectionFactory;
}
@Bean
public AmqpAdmin blueAmqpAdmin() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
return new RabbitAdmin(blueConnectionFactory());
}
@Bean
public RabbitTemplate blueRabbitTemplate(ConnectionFactory blueConnectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(blueConnectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(new ObjectMapper()));
return rabbitTemplate;
}
}
/**
* <pre>
* Green RabbitMQ 서버 연결 클래스.
* </pre>
*/
@Configuration
public class GreenRabbitMQConfig {
private final int port;
private final String userName;
private final String password;
private final String virtualHost;
private final String host;
public GreenRabbitMQConfig(@Value("${mq.rabbitmq.green.port}") int port
, @Value("${mq.rabbitmq.green.username}") String userName
, @Value("${mq.rabbitmq.green.password}") String password
, @Value("${mq.rabbitmq.green.virtual-host}") String virtualHost
, @Value("${mq.rabbitmq.green.host}") String host) {
this.port = port;
this.userName = userName;
this.password = password;
this.virtualHost = virtualHost;
this.host = host;
}
@Bean
public SimpleRabbitListenerContainerFactory greenSimpleRabbitListenerContainerFactory() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
simpleRabbitListenerContainerFactory.setConnectionFactory(this.greenConnectionFactory());
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
simpleRabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
return simpleRabbitListenerContainerFactory;
}
@Bean
public ConnectionFactory greenConnectionFactory() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
connectionFactory.setPort(this.port);
connectionFactory.setUsername(this.userName);
connectionFactory.setPassword(this.password);
connectionFactory.setVirtualHost(this.virtualHost);
connectionFactory.setUri("amqp://" + this.host);
connectionFactory.setAutomaticRecoveryEnabled(false);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
return cachingConnectionFactory;
}
@Bean
public AmqpAdmin greenAmqpAdmin() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
return new RabbitAdmin(greenConnectionFactory());
}
@Bean
public RabbitTemplate greenRabbitTemplate(ConnectionFactory blueConnectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(blueConnectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(new ObjectMapper()));
return rabbitTemplate;
}
}
}
여기서 주의할 점은 blueConnectionFactory
메서드가 @Primary
로 선언 되어 있는 부분이다. 이유는 Spring Amqp를 사용하게 되면 RabbitAnnotationDrivenConfiguration
클래스가 AutoConfiguration 되는데 해당 클래스을 보면 ConnectionFactory
구현체를 요구 하는 2개의 메서드가 있기 때문이다. 라우팅을 위해 2개의 RabbitMQ 설정이 Bean에 등록되면서 아래 2개 메서드가 어떤 구현체를 사용해야 할지 몰라 required a single bean, but 2 were found 에러가 나기에 @Primary
어노테이션을 선언.
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple",
matchIfMissing = true)
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
.
.
.
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "direct")
DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(
DirectRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
- RoutingConnectionConfig
메세지 발급에 라우팅 역할을 하는 AbstractRoutingConnectionFactory
추상 클래스 구현체를 가지고 있는 RabbitTemplate
객체를 Bean에 등록하는 설정 클래스이다.
@Configuration
public class RoutingConnectionConfig {
private final MqConnectionService mqConnectionH2ServiceImpl;
private final ConnectionFactory blueConnectionFactory;
private final ConnectionFactory greenConnectionFactory;
public RoutingConnectionConfig(MqConnectionService mqConnectionH2ServiceImpl
, @Qualifier("blueConnectionFactory") ConnectionFactory blueConnectionFactory
, @Qualifier("greenConnectionFactory") ConnectionFactory greenConnectionFactory) {
this.mqConnectionH2ServiceImpl = mqConnectionH2ServiceImpl;
this.blueConnectionFactory = blueConnectionFactory;
this.greenConnectionFactory = greenConnectionFactory;
}
/**
* <pre>
* 라우팅이 가능한 RabbitTemplate 객체를 반환.
* </pre>
*/
@Bean
public RabbitTemplate rabbitTemplate() {
// 라우팅에 사용될 RabbitMQ 접속 객체 설정
Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>();
connectionFactoryMap.put(MqRouting.BLUE, this.blueConnectionFactory);
connectionFactoryMap.put(MqRouting.GREEN, this.greenConnectionFactory);
// 라우팅 설정이 되어 있는 ConnectionFactory 설정
RoutingConnectionFactory routingConnectionFactory = new RoutingConnectionFactory(this.mqConnectionH2ServiceImpl);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
routingConnectionFactory.setDefaultTargetConnectionFactory(this.blueConnectionFactory);
RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(new ObjectMapper()));
return rabbitTemplate;
}
/**
* <pre>
* 라우팅 시 어떤 RabbitMQ를 사용할지 분기 처리하는 클래스.
* </pre>
*/
protected class RoutingConnectionFactory extends AbstractRoutingConnectionFactory {
private final MqConnectionService mqConnectionH2ServiceImpl;
public RoutingConnectionFactory(MqConnectionService mqConnectionH2ServiceImpl) {
this.mqConnectionH2ServiceImpl = mqConnectionH2ServiceImpl;
}
/**
* <pre>
* 반환 하는 값에 따라 사용될 ConnectionFactory가 결정된다.
*
* 해당 메서드는 setTargetConnectionFactories 메서드와 연관 관계가 있다.
* setTargetConnectionFactories 메서드에 설명하자면 해당 메서드는 Map<Object, ConnectionFactory> 타입인 객체를
* 매개변수로 받는데 determineCurrentLookupKey 메서드가 반환하는 값이
* setTargetConnectionFactories 메서드가 전달 받은 매개변수 객체의 Key 중 하나여야 한다.
* </pre>
*/
@Override
protected Object determineCurrentLookupKey() {
return this.mqConnectionH2ServiceImpl.getConnectionName();
}
}
public enum MqRouting {
BLUE("blue")
, GREEN("green");
String value;
MqRouting(String value) {
this.value = value;
}
public String getValue() {
return value;
}
public static MqRouting eq(String routingServerName) {
MqRouting[] mqRoutings = MqRouting.values();
for(MqRouting mqRouting : mqRoutings) {
if(routingServerName.equals(mqRouting.getValue())) {
return mqRouting;
}
}
throw new IllegalStateException("라우팅 접속 정보가 올바르지 않습니다.");
}
}
}
위 코드에서 MqConnectionService
구현체는 라우팅 분기 값 테스트를 위해 임의로 만든 구현체이다. 해당 구현체의 getConnectionName
메서드를 호출하면 H2에서 라우팅 분기 값을 가져와 MqRouting
Enum 값을 반환. 반환된 값에 따라 어느 RabbitMQ를 사용할지 결정 된다. 해당 코드에 대한 부분은 게시글에서 생략.
Consumer(메세지 소비)
consumer 설정은 아래 코드를 참고.
consumer는 별다른 설정 없이 RabbitMQ와 연결만 함.
@Configuration
public class RabbitMQConfig {
private final int port;
private final String userName;
private final String password;
private final String virtualHost;
private final String host;
public RabbitMQConfig(@Value("${mq.rabbitmq.port}") int port
, @Value("${mq.rabbitmq.username}") String userName
, @Value("${mq.rabbitmq.password}") String password
, @Value("${mq.rabbitmq.virtual-host}") String virtualHost
, @Value("${mq.rabbitmq.host}") String host) {
this.port = port;
this.userName = userName;
this.password = password;
this.virtualHost = virtualHost;
this.host = host;
}
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
simpleRabbitListenerContainerFactory.setConnectionFactory(this.connectionFactory());
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
simpleRabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
return simpleRabbitListenerContainerFactory;
}
@Bean
public ConnectionFactory connectionFactory() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
connectionFactory.setPort(this.port);
connectionFactory.setUsername(this.userName);
connectionFactory.setPassword(this.password);
connectionFactory.setVirtualHost(this.virtualHost);
connectionFactory.setUri("amqp://" + this.host);
connectionFactory.setAutomaticRecoveryEnabled(false);
return new CachingConnectionFactory(connectionFactory);
}
@Bean
public AmqpAdmin amqpAdmin() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(new ObjectMapper()));
return rabbitTemplate;
}
}
테스트
테스트를 위해 RabbitMQ 2개를 docker-compose를 사용하고 각 RabbitMQ를 RabbitMQ Blue, RabbitMQ Green로 칭함.
RabbitMQ Blue의 포트는 5674이고 RabbitMQ Green의 포트는 5675.
- RabbitMQ Docker-compose
version: '3'
services:
rabbitmq_blue:
image: rabbitmq:3.8.34-management
ports:
- 5674:5672
- 15673:15672
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=password
rabbitmq_green:
image: rabbitmq:3.8.34-management
ports:
- 5675:5672
- 15674:15672
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=password
생성 하는 명령어는 아래와 같다.
docker-compose -f <RabbitMQ Docker-compose YAML 파일> up -d
Publisher(메세지 발행)
메세지 발행을 위한 Publisher의 테스트 코드는 아래와 같다.
지속적인 발행을 위해 @Scheduled
를 사용하여 3개의 메서드가 병렬로 1초에 1개씩 발행.
@Component
public class PublisherTest {
private static int COUNT = 0;
private final RabbitTemplate rabbitTemplate;
public PublisherTest(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Scheduled(fixedDelay = 1000)
public void scheduleFixedDelayTask() {
send();
}
@Scheduled(fixedDelay = 1000)
public void scheduleFixedDelayTask1() {
send();
}
@Scheduled(fixedDelay = 1000)
public void scheduleFixedDelayTask2() {
send();
}
private void send() {
String connectionServer = "blue";
if(rabbitTemplate.getConnectionFactory().getPort() != 5674) {
connectionServer = "green";
}
String value = String.format("{\\"value\\" : \\"%s - %d\\"}", connectionServer, COUNT++);
Message message = MessageBuilder.withBody(value.getBytes(StandardCharsets.UTF_8))
.build();
rabbitTemplate.convertAndSend("test_exchange"
, "key"
, message);
}
}
Consumer(메세지 소비)
RabbitMQ의 Queue 소비를 위한 테스트 코드는 아래 코드를 참고.
sleep 메서드를 사용해서 1초의 대기를 준 이유는 메세지 처리 중 배포 했을 때 unacked 상태에도 손실 없이 정상 처리가 되는지 확인하기 위함.
@Component
public class ConsumerListener {
private final static Logger logger = LoggerFactory.getLogger(ConsumerListener.class);
@RabbitListener(containerFactory = "simpleRabbitListenerContainerFactory"
, bindings = @QueueBinding(value = @Queue(value = "test_queue")
, exchange = @Exchange("test_exchange"), key = "key")
, ackMode = "MANUAL")
public void testListener(Map<String, Object> value, Channel channel
, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Message message) throws IOException, InterruptedException {
Thread.sleep(1000);
logger.info("{}", value);
channel.basicAck(tag, false);
}
}
결과
테스트는 위 흐름 목차 토대로 진행.
Publisher는 IDE에서 실행 하지만 Consumer인 경우 배포를 가장 했을 때를 위해 IDE로 실행하지 않고 bootJar로 빌드한 jar 파일을 수동으로 실행.
기본
Publisher를 IDE에서 실행하면 RabbitMQ Blue로 연결해서 테스트 메세지를 지속적으로 발행.
2023-06-22 13:36:50.360 INFO 27748 --- [ scheduling-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:5674
2023-06-22 13:36:50.425 INFO 27748 --- [ scheduling-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: blueConnectionFactory#1e228734:0/SimpleConnection@44237dbe [delegate=amqp://admin@127.0.0.1:5674/, localPort= 2995]
RabbitMQ Blue에 테스트 메세지가 지속적으로 쌓이고 있음을 확인 할 수 있다.
RabbitMQ Blue Queue 소비를 위한 테스트 리스터 실행.
실행 할 때 테스트 리스너가 RabbitMQ Blue로 연결할 수 있게 -Dmq.rabbitmq.port
매개변수 값을 RabbitMQ Blue 포트로 값 설정.
java -jar -Dmq.rabbitmq.port=5674 rabbitmq-consumer.jar
실행 되면 테스트 리스너가 RabbitMQ Blue Queue에 있는 테스트 메시지를 소비.
. ____ _ __ _ _
/\\\\ / ___'_ __ _ _(_)_ __ __ _ \\ \\ \\ \\
( ( )\\___ | '_ | '_| | '_ \\/ _` | \\ \\ \\ \\
\\\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.7.RELEASE)
2023-06-22 13:46:06.972 INFO 2314 --- [ main] c.rabbitmq.consumer.ConsumerApplication : Starting ConsumerApplication on DESKTOP-KP5552O with PID 2314 (/mnt/d/Down/rabbitmq-test/rabbitmq-consumer/build/libs/rabbitmq-consumer.jar started by root in /mnt/d/Down/rabbitmq-test/rabbitmq-consumer/build/libs)
2023-06-22 13:46:06.981 INFO 2314 --- [ main] c.rabbitmq.consumer.ConsumerApplication : No active profile set, falling back to default profiles: default
2023-06-22 13:46:12.185 INFO 2314 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:5674
2023-06-22 13:46:12.686 INFO 2314 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: connectionFactory#524d6d96:0/SimpleConnection@16150369 [delegate=amqp://admin@127.0.0.1:5674/, localPort= 49448]
2023-06-22 13:46:12.976 INFO 2314 --- [ main] c.rabbitmq.consumer.ConsumerApplication : Started ConsumerApplication in 8.274 seconds (JVM running for 10.452)
2023-06-22 13:46:14.153 INFO 2314 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener : {value=blue - 57}
2023-06-22 13:46:15.165 INFO 2314 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener : {value=blue - 58}
RabbitMQ Blue 콘솔 페이지에서 Queue 상태를 확인하면 아래와 같이 Unacked 상태가 존재하는데 이는 테스트 리스너에서 임의로 sleep 메서드를 이용해 대기시간을 지정했기 때문이다.
지정된 대기시간이 지나면 ack 처리를 하기 때문에 메세지 소비가 정상적으로 완료 됨.
리스너 배포
새로운 테스트 리스너를 배포. 배포 테스트 리스너의 RabbitMQ를 RabbitMQ Green으로 연결.
java -jar -Dmq.rabbitmq.port=5675 rabbitmq-consumer.jar
위와 같이 실행 하게 되면 아래와 같이 Green 테스트 리스너가 실행되고 현재 RabbitMQ Green으로 메세지를 라우팅하지 않았기 때문에 소비는 발생하지 않음.
. ____ _ __ _ _
/\\\\ / ___'_ __ _ _(_)_ __ __ _ \\ \\ \\ \\
( ( )\\___ | '_ | '_| | '_ \\/ _` | \\ \\ \\ \\
\\\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.7.RELEASE)
2023-06-22 10:13:23.341 INFO 2151 --- [ main] c.rabbitmq.consumer.ConsumerApplication : Starting ConsumerApplication on DESKTOP-KP5552O with PID 2151 (/mnt/d/Down/rabbitmq-test/rabbitmq-consumer/build/libs/rabbitmq-consumer.jar started by root in /mnt/d/Down/rabbitmq-test/rabbitmq-consumer/build/libs)
2023-06-22 10:13:23.358 INFO 2151 --- [ main] c.rabbitmq.consumer.ConsumerApplication : No active profile set, falling back to default profiles: default
2023-06-22 10:13:29.398 INFO 2151 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:5675
2023-06-22 10:13:29.859 INFO 2151 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: connectionFactory#40cb8df7:0/SimpleConnection@29d80d2b [delegate=amqp://admin@127.0.0.1:5675/, localPort= 60656]
2023-06-22 10:13:30.086 INFO 2151 --- [ main] c.rabbitmq.consumer.ConsumerApplication : Started ConsumerApplication in 9.331 seconds (JVM running for 11.049)
라우팅
인제 테스트 메세지 라우팅을 위해 RabbitMQ Blue로 전달되는 메세지를 RabbitMQ Green으로 이동.
테스트에선 분기 값을 H2로 활용 했기 때문에 아래와 같이 값을 green으로 업데이트.
Publisher의 IDE 로그를 보면 아래와 같이 RabbitMQ Green에 연결.
2023-06-22 13:36:50.360 INFO 27748 --- [ scheduling-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:5674
2023-06-22 13:36:50.425 INFO 27748 --- [ scheduling-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: blueConnectionFactory#1e228734:0/SimpleConnection@44237dbe [delegate=amqp://admin@127.0.0.1:5674/, localPort= 2995]
2023-06-22 13:55:16.388 INFO 27748 --- [ scheduling-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:5675
2023-06-22 13:55:16.413 INFO 27748 --- [ scheduling-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: greenConnectionFactory#b47e564a:0/SimpleConnection@8ad58342 [delegate=amqp://admin@127.0.0.1:5675/, localPort= 3187]
이후 부턴 RabbitMQ Blue엔 Queue가 쌓이지 않고 Queue에 있는 메세지만 소비.
2023-06-22 14:02:45.323 INFO 2314 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener : {value=blue - 957}
2023-06-22 14:02:46.324 INFO 2314 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener : {value=blue - 958}
2023-06-22 14:02:47.326 INFO 2314 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener : {value=blue - 959}
2023-06-22 14:02:48.327 INFO 2314 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener : {value=blue - 960}
2023-06-22 14:02:49.327 INFO 2314 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener : {value=blue - 961}
2023-06-22 14:02:50.328 INFO 2314 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener : {value=blue - 962}
2023-06-22 14:02:51.329 INFO 2314 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener : {value=blue - 963}
2023-06-22 14:02:52.330 INFO 2314 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener : {value=blue - 964}
2023-06-22 14:02:53.330 INFO 2314 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener : {value=blue - 965}
라우팅을 했기에 RabbitMQ Green에 Queue가 쌓이면서 테스트 메세지를 소비 시작.
. ____ _ __ _ _
/\\\\ / ___'_ __ _ _(_)_ __ __ _ \\ \\ \\ \\
( ( )\\___ | '_ | '_| | '_ \\/ _` | \\ \\ \\ \\
\\\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.7.RELEASE)
2023-06-22 13:46:06.972 INFO 2314 --- [ main] c.rabbitmq.consumer.ConsumerApplication : Starting ConsumerApplication on DESKTOP-KP5552O with PID 2314 (/mnt/d/Down/rabbitmq-test/rabbitmq-consumer/build/libs/rabbitmq-consumer.jar started by root in /mnt/d/Down/rabbitmq-test/rabbitmq-consumer/build/libs)
2023-06-22 13:46:06.981 INFO 2314 --- [ main] c.rabbitmq.consumer.ConsumerApplication : No active profile set, falling back to default profiles: default
2023-06-22 13:46:12.185 INFO 2314 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:5674
2023-06-22 13:46:12.686 INFO 2314 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: connectionFactory#524d6d96:0/SimpleConnection@16150369 [delegate=amqp://admin@127.0.0.1:5674/, localPort= 49448]
2023-06-22 13:46:12.976 INFO 2314 --- [ main] c.rabbitmq.consumer.ConsumerApplication : Started ConsumerApplication in 8.274 seconds (JVM running for 10.452)
2023-06-22 13:46:14.153 INFO 2314 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener : {value=blue - 57}
2023-06-22 13:46:15.165 INFO 2314 --- [ntContainer#0-1] c.r.consumer.listener.ConsumerListener : {value=blue - 58}
이전 리스너 종료
RabbitMQ Blue Queue가 모두 처리 되었으면 RabbitMQ Blue에 연결되어 있는 리스너를 종료.
2023-06-22 15:01:10.771 INFO 2314 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2023-06-22 15:01:11.317 INFO 2314 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
2023-06-22 15:01:11.328 INFO 2314 --- [extShutdownHook] o.s.a.r.l.SimpleMessageListenerContainer : Shutdown ignored - container is not active already
부록
- 기존 RabbitMQ(여기선 RabbitMQ Blue)를 종료 시키면?
라우팅이 끝난 후엔 기존 RabbitMQ를 종료해도 큰 문제는 없다.
단, 앞에서 말했듯이 RabbitMQ는 연결을 계속 유지 하기 때문에 종료하면 Publisher에 아래와 같은 에러 로그가 1회 출력된다.
Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
라우팅을 했기에 기존 RabbitMQ로 메세지를 전달할 일은 없지만 다시 배포 할 때에는 꼭 기존 RabbitMQ를 실행 시키고 배포 해야 한다. 만약 그러지 않는 상태에서 라우팅을 하게 되면 아래와 같은 로그가 계속 출력된다.
org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
정리
배포 흐름을 간략하게 정리하자면 기본으로
- RabbitMQ는 1개 이상 필요.
- Spring의
AbstractRoutingConnectionFactory
추상 클래스를 이용해서 발급할 메세지 라우팅.
필요하며 배포 순서는
- 미사용 중인 RabbitMQ로 연결 설정하여 리스너 배포.
- 메세지 라우팅 진행.
- 기존 RabbitMQ의 Queue가 모두 완료 처리 되면 기존 리스너 종료.
순서로 진행 된다. 만약 여기서 기존 RabbitMQ까지 종료 한다면
- 배포 리스너에서 사용할 RabbitMQ 실행.
- 신규 생성된 RabbitMQ로 연결 설정하여 리스너 배포.
- 메세지 라우팅 진행.
- 기존 RabbitMQ의 Queue가 모두 완료 처리 되면 기존 RabbitMQ와 리스너 종료.
순서로 진행하면 된다.
만약 RabbitMQ를 종료 해야 한다면 유의해야 할게 있는데 바로 DLX(Dead Letter Exchanges)이다. Spring Amqp를 이용한 메세지 소비는 @RabbitListener
, @QueueBinding
, @Exchange
와 같은 어노테이션을 사용하는데 어노테이션에서 작성한 익스체인지와 큐는 자동으로 RabbitMQ에 생성지만 DLX를 위해 RabbitMQ에서 생성한 익스체인지와 큐는 자동으로 생성하지 않는다. 즉 재실행을 했을 경우 DLX의 익스체이지와 큐가 없을수도 있기 때문에 종료하기 전에 설정을 export 하거나 Spring에서 AmqpAdmin
를 이용한 DLX 생성 코드를 작성해야 한다.
끝으로 샘플은 GitHub - sungwookkim/rabbitmq_cd에서 확인 가능.