일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 | 29 |
30 | 31 |
- softeer
- 소프티어
- 자바
- MESSAGEBROKER
- bitmask
- java
- 인가인증
- 코드트리
- dockercompose
- On-Premise
- 카카오엔터프라이즈
- CODETREE
- bfs
- 알고리즘
- BFS
- 함수 종속성
- objectstorage
- 동전 퍼즐
- db
- sonarqube
- DP
- 카카오클라우드
- s3
- jsonwebtoken
- 백엔드 개발
- es_java_home
- 구름
- 완전탐색
- 정렬
- DFS
- Today
- Total
wooing
Spring boot에서 RabbitMQ를 사용하는 방법 (STOMP, AMQP) 본문
Docker를 사용한 RabbitMQ 구축
아래의 명령어로 RabbitMQ를 설치한다.
docker run -d --name rabbitmq \\
-p 5672:5672 \\
-p 15672:15672 \\
-p 61613:61613 \\
rabbitmq:management
방화벽 설정 (필요시)
sudo ufw allow 5672/tcp
sudo ufw allow 15672/tcp
sudo ufw allow 61613/tcp
STOMP 플러그인 설치
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_stomp
사용자 권한 부여
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
http://localhost:15672/#/ 해당 링크에 접속하면 RabbitMQ 웹 콘솔에 접근할 수 있다.
아이디와 비밀번호
- username : guest
- password : guest
Spring boot에서 RabbitMQ 사용 방법
Spring Boot에서 RabbitMQ를 사용하는 방법은 크게 STOMP 프로토콜과 AMQP 프로토콜을 이용하는 두 가지 방식이 있다. 이 두 방식은 메시지를 처리하는 방식과 활용 목적에서 차이가 있다.
구분 | STOMP | AMQP |
사용 예제 | 실시간 채팅, 알림 서비스 | 마이크로서비스 간 메시지 큐, 비동기 이벤트 처리 |
STOMP
Spring Boot는 기본적으로 인메모리 메시지 브로커 기능을 제공하여, 웹소켓(WebSocket)을 통해 클라이언트와 직접 메시지를 주고받을 수 있다. 이 경우, Spring Boot 자체가 메시지 큐 역할을 수행할 수 있지만, 몇 가지 문제점이 발생한다.
인메모리 메시지 브로커의 문제점
- 확장성(Scalability) 문제: 서버를 여러 개(Scale-Out)로 확장하면 각 인스턴스 간 메시지가 공유되지 않음.
- 메시지 손실 가능성: 서버가 재시작되거나 장애가 발생하면 메시지가 사라질 수 있음.
이를 해결하기 위해 RabbitMQ 같은 외부 메시지 브로커를 활용하는것이다. RabbitMQ는 STOMP 메시지를 처리할 수 있는 브로커 역할을 수행하며, 이를 통해 여러 서버 간 메시지를 공유하고 확장성을 확보할 수 있다. RabbitMQ외에도 Kafka 같은 다른 외부 메시지 브로커를 사용할 수도 있다.
AMQP
RabbitMQ는 AMQP(Advanced Message Queuing Protocol)를 기반으로 동작하는 메시지 브로커다. 즉, Spring Boot 애플리케이션이 AMQP 프로토콜을 통해 RabbitMQ와 통신한다는 것은, 애플리케이션이 직접 RabbitMQ의 메시지를 생성하고 소비한다는 의미다.
0. 환경변수, 의존성 주입
application.yml
spring:
rabbitmq:
host: 172.16.211.108
port: 5672
username: admin
password: admin
build.gradle
implementation 'org.springframework.boot:spring-boot-starter-websocket' //Websocket
implementation 'org.springframework.boot:spring-boot-starter-amqp' //RabbitMQ
implementation 'io.projectreactor.netty:reactor-netty'
1-1. Spring boot Message Broker (STOMP)
1. WebSocketConfig 작성
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Value("${spring.rabbitmq.host}")
private String RELAY_HOST;
@Value("${spring.rabbitmq.stomp-port}")
private Integer RELAY_PORT;
@Value("${spring.rabbitmq.username}")
private String RELAY_USERNAME;
@Value("${spring.rabbitmq.password}")
private String RELAY_PASSWORD;
@Autowired
private ArticleEditInterceptor articleEdditInterceptor;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws/test").setAllowedOrigins("*");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 외부의 RabbitMQ 브로커를 사용하기 위한 설정
// TOPIC만 사용하도록 허용
registry.enableStompBrokerRelay("/exchange")
.setRelayHost(RELAY_HOST)
.setRelayPort(RELAY_PORT)
.setClientLogin(RELAY_USERNAME)
.setClientPasscode(RELAY_PASSWORD);
// 클라이언트가 메시지를 보낼 때 사용할 prefix 설정 (/app -> @MessageMapping에 자동으로 매핑됨)
registry.setApplicationDestinationPrefixes("/app");
}
// 인바운드 채널에 인터셉터 추가
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(articleEdditInterceptor);
}
}
2. 메세지 발행
2-1. RabbitTemplate 사용
RabbitTemplate은 Spring에서 RabbitMQ와 직접 통신할 수 있는 API를 제공하는 클래스로, 서비스에서 직접 메세지를 발행하기 위해 사용한다. Exchange, Routing Key등을 직접 지정할 수 있어 더욱 커스텀하게 사용 가능하다.
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.host}")
private String HOSTNAME;
@Value("${spring.rabbitmq.amqp-port}")
private Integer PORT;
@Value("${spring.rabbitmq.username}")
private String USERNAME;
@Value("${spring.rabbitmq.password}")
private String PASSWORD;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(HOSTNAME, PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(){
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
RabbitMQ Service 작성
@Service
public class RabbitMQServiceImpl implements RabbitMQService{
@Value("${POD_NAME:default-pod}") //쿠버네티스 Pod 이름, 기본값은 default-pod. Queue 식별에 사용
private String podName;
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public String createFanoutExchange(String topicName) {
FanoutExchange exchange = ExchangeBuilder.fanoutExchange(topicName)
.autoDelete()
.durable(true)
.build();
rabbitAdmin.declareExchange(exchange);
return "Topic created " + topicName;
}
@Override
public String deleteTopic(String topicName) {
rabbitAdmin.deleteExchange(topicName);
return "Topic deleted " + topicName;
}
@Override
public void createExchangeAndQueue(String topicName, String queueName) {
Exchange exchange = ExchangeBuilder.fanoutExchange(topicName)
.autoDelete()
.durable(true)
.build();
Queue queue = new Queue(queueName, false);
Binding binding = BindingBuilder.bind(queue).to(exchange).with("").noargs();
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(binding);
}
@Override
public void sendMessage(String topicName, String routing, Object message) {
rabbitTemplate.convertAndSend(topicName, routing, message);
}
}
메세지 전송 예제
다른 프로젝트에서 사용한 예시이다. /updateHashtags/{postID}를 통해 웹소켓 메시지가 오면 서비스 로직을 수행한 후 결과를 rabbitMQService의 sendMessage메소드를 통해 RabbitMQ에 메세지를 발행한다.
@MessageMapping("/updateHashtags/{postID}")
public WSResponse<HashtagUpdateResponseDTO> updateHashtag(WSRequest<HashtagUpdateRequestDTO> requestDTO, @DestinationVariable Long postID) {
HashtagUpdateResponseDTO responseDTO = articleComposeService.updateHashtags(requestDTO.getUserID(), postID, requestDTO.getDto());
WSResponse<HashtagUpdateResponseDTO> response = WSResponse.onSuccess("/updateHashtags/"+postID, requestDTO.getUuid(), responseDTO);
rabbitMQService.sendMessage(postID.toString(), "", response);
return response;
}
2-2. @SendTo 어노테이션 사용
아래와 같이 return되는 메시지를 /topic/public으로 자동으로 전송된다.
@SendTo("/topic/public") // 메시지를 처리한 후 "response.queue"로 응답을 보냄
public String receiveAndForward(String message) {
System.out.println("Received: " + message);
return "Processed: " + message;
}
3. 클라이언트 파트
[JS코드 일부분]
const stompClient = new StompJs.Client({
brokerURL: 'ws://yourserver.com/ws' // WebSocket STOMP 서버 주소
});
stompClient.onConnect = (frame) => {
console.log('Connected: ' + frame);
// 메시지 구독 (RabbitMQ에서 특정 Exchange의 메시지를 받음)
stompClient.subscribe('/exchange/fanout', (message) => {
console.log("Received: " + message.body);
});
};
stompClient.onWebSocketError = (error) => {
console.error('WebSocket Error:', error);
};
stompClient.onStompError = (frame) => {
console.error('STOMP Error:', frame.headers['message']);
console.error('Details:', frame.body);
};
// WebSocket 연결
function connect() {
stompClient.activate();
}
// WebSocket 연결 해제
function disconnect() {
stompClient.deactivate();
console.log("Disconnected");
}
1-2. Spring boot 어플리케이션이 직접 RabbitMQ와 통신 (AMQP)
1. RabbitMQ Config작성
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.host}")
private String HOSTNAME;
@Value("${spring.rabbitmq.amqp-port}")
private Integer PORT;
@Value("${spring.rabbitmq.username}")
private String USERNAME;
@Value("${spring.rabbitmq.password}")
private String PASSWORD;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(HOSTNAME, PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(){
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
// Exchange (Direct 방식)
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange");
}
// Queue 생성
@Bean
public Queue queue() {
return new Queue("sample.queue", true);
}
// Exchange와 Queue를 바인딩 (Routing Key 사용)
@Bean
public Binding binding(Queue queue, DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("sample.routingKey");
}
}
2. Consumer 구현
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class MessageConsumer {
@RabbitListener(queues = "sample.queue")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
3. Producer 구현
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest {
@Autowired
private MessageProducer messageProducer;
@Test
public void testSendMessage() {
messageProducer.sendMessage("Hello, RabbitMQ!");
}
}
'Server > Spring' 카테고리의 다른 글
Spring boot에서 카카오클라우드 Object Storage사용하기 (0) | 2024.05.20 |
---|