효습

SSE(Server-Sent- Events)로 알림 기능 구현하기 본문

프로젝트

SSE(Server-Sent- Events)로 알림 기능 구현하기

효효효효 2025. 1. 10. 16:58

 

이전에 알림 기능을 구현할 때는 Short Polling 방식으로 구현했었다.

 

Short Polling

클라이언트가 서버로부터 정기적으로 알림 조회 요청을 보내서 데이터를 받아오는 방식이다.

 

 

그런데 웹이나 앱을 사용하다보면 알림 페이지에 있지 않아도 실시간으로 알림이 오는데 이걸 어떻게 구현할까?하고

찾아보니까 크게 2가지가 있었다.

 

1. SSE(Server-Sent-Events)

2. FCM(Firebase-Cloud-Messaging)

 

사실 처음에는 FCM을 하려고 해서 실제로 의존성까지 추가했는데 Karma 프로젝트는 웹으로만 동작하고 모바일 앱으로만 동작하지 않는다! ->  SSE로 바꿨다.

 

 

처음에 알림 기능을 구현할 때 ,  주문 상태가 접수완료 , 주문거절 , 제작대기 , 배송중인 주문 정보만 조회해서 응답했다.

여기에다가 이제 주문의 상태가 변경될 때마다 알림을 추가적으로 보내도록 했다.

 

 

그럼 먼저 SSE에 대해 알아보자

 

SSE(Server-Sent-Events) 

 

클라이언트와 서버가 한번 연결을 맺고 나면 일정 시간동안 변경이 일어난 데이터에 대해 서버에서 전송 받는 방식이다.

SSE는 서버에서 클라이언트로 text message를 보내는 브라우저 기반 웹 애플리케이션 기술이며

HTTP의 persistent connections을 기반으로하는 HTML5 표준 기술이다.

 

 

 

알림 기능 구현하기

 

NotificationController.java

 

 1. 먼저 Client가 서버에 Subscribe를 요청한다.

     /* 클라이언트에서  구독 */
    @GetMapping(value = "/subscribe" , produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter subscribe(@AuthUser User user){
        return noticeService.subscribe(user);
    }

 

여기서 produces = MediaType.TEXT_EVENT_STREAM_VALUE 를 쓰는 이유는 위에서 언급했다시피 SSE는 HTTP 프로토콜을 통해 text 기반 이벤트 스트림을 전송하기 때문이다.

평소에 자주 사용하는 application/json 방식을 사용하면 브라우저의 EventSource API가 SSE로 인식하지 못해 연결에 실패하기 때문에 저렇게 설정해준다.

 

 

 

 

2. 연결이 된 상태에서 서버에서 클라이언트로 알림을 보낸다.

    /* 서버에서 클라이언트로 알림 */
    @PostMapping("/send-notice")
    public void sendData(@AuthUser User user){
        noticeService.notify(user);
    }

사실 이건 필요에 따라 구현해주면 되는 부분이다.

나는 이 api로 기존의 알림 조회를 클라이언트로 보내준다.

 

 

 

 

 

NoticeService

@Service
@Transactional
@RequiredArgsConstructor
@Slf4j
public class NoticeService {

    private final OrdersRepository ordersRepository;
    private final EmitterRepository emitterRepository;

    private static final long DEFAULT_TIMEOUT = 60L*1000*60; // 1시간


    .....
    
    
    /* 클라이언트가 구독을 위해 호출하는 메서드
    *
    * user - 구독하는 클라이언트 (처음에 연결 요청을 보내는 계정)
    * @param SseEmitter - 서버에서 보낸 이벤트 Emitter
    * */
    public SseEmitter subscribe(User user) {
        SseEmitter emitter = createEmitter(user.getUserId());
        sendToClient(user.getUserId(), "EventStream created. [userId =" + user.getUserId() + "]");
        return emitter;
    }

    /* 서버의 이벤트를 클라이언트에게 보내는 메서드
    * 다른 서비스 로직에서 이 메서드를 사용해 데이터를 Object event에 넣고 전송하면 된다
    *
    * user - 메세지를 전송할 사용자
    * @param event - 전송할 이벤트 객체
    * */
    public void notify(User user) {
        sendToClient(user.getUserId(), getAllNotices(user));
    }

    /*
    * 클라이언트에게 데이터를 전송
    *
    * @param userId - 데이터를 받을 사용자의 아이디
    * @param data - 전송할 데이터
    * */
    private void sendToClient(Long userId , Object data){
        SseEmitter emitter = emitterRepository.get(userId);
        String eventId = userId + "_" + System.currentTimeMillis();
        if(emitter != null){
            try{
                emitter.send(SseEmitter.event().id(eventId).name("sse_notice").data(data));
            }catch (IOException exception){
                emitterRepository.deleteById(userId);
                emitter.completeWithError(exception);
            }
        }
    }

    /* 사용자 아이디를 기반으로 이벤트 Emitter를 생성
    *
    * @param userId - 사용자 id
    * @param SseEmitter - 생성된 이벤트 Emitter
    *
    * */
    private SseEmitter createEmitter(Long userId){

        SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
        emitterRepository.save(userId , emitter);

        //Emitter가 완료될 때 ( 모든 데이터가 성공적으로 전송된 상태) Emitter를 삭제한다.
        emitter.onCompletion(() -> emitterRepository.deleteById(userId));
        //Emitter가 타임아웃되었을 때 (지정된 시간동안 어떠한 이벤트도 전송되지 않았을 때) Emitter를 삭제한다.
        emitter.onTimeout(() -> emitterRepository.deleteById(userId));

        return emitter;
    }



}

 

1.  Emitter를 생성할 때 , 만료시간은 1시간으로 설정해줬다. 연결을 오래 유지하면 서버 메모리가 소모되므로 조건에 맞게 설정하면 된다.
(나는 기능 구현 자체가 목적이었기 때문에 그랬는데 스프링 부트의 내장 톰캣의 디폴트 값은 30초이다.)

만약 만료시간이 지나면 브라우저에서 자동으로 서버로 재요청을 보낸다.

 

 

 

2.  먼저 클라이언트가 /subscribe로 구독 요청을 보내면 연결되었다는 더미 데이터를 클라이언트에게 보낸다.

더미 데이터를 보내는 이유는 처음 SSE 응답을 할 때, 아무것도 보내지 않으면 재연결 요청을 보내거나 연결 요청 자체에서 오류가 발생하기 때문이다.

이때 createEmitter 메서드가 호출되는데  새로운 SseEmitter를 생성한다. 이때 생성된 SseEmitter은  서버 메모리에 저장된다.

(뒤에 나오는 EmitterRepository에서 확인가능)

 

 

3. 그리고 생성된 emitter를 가지고 클라이언트에게 데이터를 전송하는 것이다.

 

 

 

 

EmitterRepository

@Repository
@RequiredArgsConstructor
public class EmitterRepository {

    /*모든 Emitters를 저장하는 ConcurrentHashMap*/
     private final Map<Long , SseEmitter> emitters = new ConcurrentHashMap<>();


     /* 주어진 아이디와 emitter를 저장
     *
     * @param id - 사용자 아이디
     * @param emitter - 이벤트 Emitter
     *
     * */
     public void save(Long id , SseEmitter emitter){
         emitters.put(id , emitter);
     }

     /* 주어진 아이디의 emiiter를 제거
     *
     * @param id - 사용자 아이디
     * */
    public void deleteById(Long id){
        emitters.remove(id);
    }

    /*
    * 주어진 아이디의 Emitter를 가져옴
    *
    * @param  id - 사용자 id
    * @return SseEmitter - 이벤트 Emitter
    *
    * */
    public SseEmitter get(Long id){
        return emitters.get(id);
    }
}

 

여기서 생성된 emitters를 ConcurrentHashMap으로 저장한 이유는  이는 동시성 문제때문이다. 

여러 클라이언트가 동시에 SSE 연결을 생성할 수 있어 동시 작업을 한다면 문제가 발생할 수 있다.

 

ConcurrentHashMap은 읽기 작업에는 여러 스레드가 동시에 읽을 수 있지만, 쓰기 작업에는 특정 세그먼트에 대한 Lock을 사용하기 때문에 여러 스레드가 동시에 get , remove , put 동작을 하여도 안전하게 동작한다. 

 

 

여기까지 하면 기존의 데이터 중에 조건에 맞는 데이터들이 서버에서 클라이언트로 보내진다.

 

내가 구현하고 싶은 건 만약 데이터 변경이 일어나면 서버에서 이와 관련된 데이터를 클라이언트로 보내주는 기능이었다.

 

그래서 EventListener를 만들었다.

 

 

 

먼저 주문 상태 변경되었다는 이벤트를 담을 이벤트 객체를 생성했다.

 

 

OrderStateChangedEvent

public record OrderStateChangedEvent (User user, Orders orders){}

 

 

그럼 다음 이벤트로 인한 상태 변경 메서드를 만들어줬다.

 

 

OrdersService

    /* 주문상태 변경 이벤트 리스너 */
    private void updateOrderState(Long orderId, OrderState newStatus) {
        Orders order = orderRepository.findById(orderId)
                .orElseThrow(() -> new CustomException(ErrorCode.ORDER_NOT_FOUND));
        order.changeOrderState(newStatus);
        orderRepository.save(order);

        // 상태 변경 이벤트 발행
        eventPublisher.publishEvent(new OrderStateChangedEvent(order.getBuyerUser(), order));
    }

 

이 메서드를 모든 주문 상태 변경이 일어날 때마다 실행되도록 하였다.

 

 

 

 

마지막으로 주문 상태 변경 이벤트를 처리하는 클래스를 만들어준다.

 

NotificationEventListener

@Component
@RequiredArgsConstructor
public class NotificationEventListener {

    private final EmitterRepository emitterRepository;
    private final NoticeService noticeService;

    @EventListener
    public void handleOrderStatusChanged(OrderStateChangedEvent event) {
        Long userId = event.user().getUserId();
        String eventId = userId + "_" + System.currentTimeMillis();

        // 알림 데이터 생성 - 주문 상태 변경된 주문에 대한 정보
        NoticeResponseDto notice = noticeService.createNotice(event.orders() , event.user());

        // 클라이언트에게 알림 전송
        SseEmitter emitter = emitterRepository.get(userId);
        if (emitter != null) {
            try {
                emitter.send(SseEmitter.event()
                        .id(eventId)
                        .name("order-status-changed")
                        .data(notice));
            } catch (IOException e) {
                emitter.completeWithError(e);
                emitterRepository.deleteById(userId);
            }
        }
    }
}

 

위에 updateOrderState 메서드를 보면 상태 변경 이벤트를 발행하는 걸 알 수 있다.

이때 handleOrderStateChanged가 자동으로 호출된다.

 

 

호출되면 클라이언트의 SseEmitter를 가져와서 알림 데이터를 생성하고 이를 전송한다.

 

 

 

 

 

로컬에서는 잘 동작한다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

참고 블로그

 

https://dkswnkk.tistory.com/702

 

SSE로 알림 기능 구현하기 with Spring

서론 인터넷은 웹 브라우저와 웹 서버 간의 데이터 통신을 위해서 HTTP 표준 위에 구축되어 있습니다. 대부분의 경우 웹 브라우저인 클라이언트가 HTTP 요청을 서버에 보내고, 서버는 적절한 응답

dkswnkk.tistory.com

https://tecoble.techcourse.co.kr/post/2022-10-11-server-sent-events/

 

Spring에서 Server-Sent-Events 구현하기

tecoble.techcourse.co.kr

https://devlog-wjdrbs96.tistory.com/269

 

[Java] ConcurrentHashMap 이란 무엇일까?

들어가기 전에 HashTable, HashMap, ConcurrnetHashMap은 많이 유사한 특징들을 가지고 있습니다. 하지만 세부적으로 보면 조금씩 꽤나 차이가 있는데요. 간단하게 어떤 차이가 있는지 알아보면서 시작하

devlog-wjdrbs96.tistory.com

https://server-technology.tistory.com/6

 

[Spring] consumes, produces

서버는 클라이언트의 각각 다른 요청에 대하여 다른 방식으로 처리할 수 있으며, 클라이언트는 서버에서 반환해주는 데이터를 제한하여 받을 수 있다. 그것을 가능하게 해주는 것이 바로 consumes

server-technology.tistory.com

 

 

 

 

 

'프로젝트' 카테고리의 다른 글

redis 배포하기  (0) 2025.01.02
QueryDSL 사용하기  (1) 2024.10.05
Java에서 equals() 와 == 의 차이  (0) 2024.08.16
서버 배포 과정 이해하기  (1) 2024.06.19