카프카 와 sse 를 활용하여 입찰 알림을 보내는 프로젝트를 개발해보자.


    //클라이언트로 부터 SSE subscription 을 수락한다.
    @GetMapping(path = "/pub", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @ResponseStatus(HttpStatus.OK)
    public SseEmitter subscribe() {
         return eventAlarmNotificationService.subscribe(userService.getUserInfo().getId());
    }

우선 이렇게 sse 구독하는 코드를 추가한다.

이렇게 하면 클라이언트 입장에서 /pub 를 통해 sse 를 구독할수 있다.

위 코드에서 subscribe 는


    public SseEmitter subscribe(Long userId) {
 
        SseEmitter emitter = new SseEmitter(TIMEOUT);
        emitterMap.put(userId.toString(), emitter);
 

        emitter.onCompletion(() -> { 
            this.emitterMap.remove(userId);
        });
        emitter.onTimeout(() -> { 
            emitter.complete();
        });
//               
        log.info("SSE 구독 요청 완료: {} (스레드: {})", userId, Thread.currentThread().getName());
        return emitter;
    }

이렇게 구현했다. 이 메서드는 어떤 유저가 SSE로 실시간 알림 받고 싶다고 서버에 구독 요청을 보냈을 때 실행

유저가 구독했으니까 나중에 알림 보낼 때 찾을 수 있도록 emitterMap에 저장


    @Async("notiExecutor")
    public void sendAsync(AlarmPayload alarmPayload) { //sse 전송 메서드
        String userId = alarmPayload.getReceiveUserId().toString();
        SseEmitter emitter = emitterMap.get(userId);

        if (emitter != null) {
            try {
                emitter.send(SseEmitter.event().data(alarmPayload));
                log.info("서버로부터 SSE 전송 성공: 사용자 {}, 데이터 {}", userId, alarmPayload);
            } catch (IOException e) {
                log.error("SSE 전송 실패: 사용자 {}, 오류 메시지 {}", userId, e.getMessage(), e);
                emitter.completeWithError(e);
                emitterMap.remove(userId);
            }
        }
        else {
            log.warn("SSE 전송 시도 실패: 사용자 {}에 대한 Emitter 없음", userId);
        }

        log.info("비동기 메서드 종료: 사용자 {} (스레드: {})", userId, Thread.currentThread().getName());
    }

또, 이렇게 비동기 전송 메서드를 추가하여서 구독한 사용자에게 알림 메시지를 보낼수 있도록 구현하였다.

즉, "누가 입찰했어!" 같은 알림을 구독한 사용자한테 실시간으로 쏴주는 기능이다.

이 과정은 웹소켓을 연동하여 진행되는데, 카프카를 사용해서 더욱 실시간 처리를 원활히 하였다.

현재 이 경매 시스템은 입찰 희망하고 싶은 사람이 입찰 제안하는 창에 접속을 하면 해당 작물아이디를 기반으로 구독을 하게 된다.(stomp 를 이용한 구독) 그후, 입찰 가격을 적고 제안하기 버튼을 누르면 , 카프카 메서드를 통해 alarm 토픽에 요청정보를 보낸다.

***밑에 코드 참조


    public void kafkaalarmproduce(MemberProfile memberProfile, ExArticle exArticle, String redirecturl) {
        String topicName = "kafka-alarm"; // 알림 전용 토픽

        try {
            Deal deal = exArticle.getDeal();
            if (deal != null) {
                log.info("Deal current price: {}", deal.getDealCurPrice());
            } else {
                log.info("Deal is null");
            }

            // 세 개의 값을 JSON 객체로 생성
            BidNotification messagePayload = BidNotification.builder().
                    userId(memberProfile.getId())
                    .articleId(exArticle.getId())
                    .redirectUrl(redirecturl)
                    .price(exArticle.getDeal() == null ? exArticle.getTrans().getTrans_sell_price() : exArticle.getDeal().getDealCurPrice())
                    .build();

            String message = objectMapper.writeValueAsString(messagePayload); // JSON으로 직렬화
            log.info("kafka messagemessagemessage"+ message);
            long startTime = System.currentTimeMillis();

          
            String key = exArticle.getId().toString();  // 게시물 ID를 키로 사용
            CompletableFuture<SendResult<String, String>> future = kafkaTemplatetest.send(topicName, key, message);

            future
                    .thenAccept(result -> {
                        long endTime = System.currentTimeMillis();
                        log.info("Message sent to topic {} with offset {} in {} ms", topicName, result.getRecordMetadata().offset(), (endTime - startTime));
                    })
                    .exceptionally(ex -> {
                        log.error("Failed to send message to topic {} due to {}", topicName, ex.getMessage());
                        return null; // Exceptionally는 Void를 반환하므로 null 반환
                    });
        } catch (Exception e) {
            log.error("Failed to convert message to JSON due to {}", e.getMessage());
        }
    }