카프카 와 sse 를 활용하여 입찰 알림을 보내는 프로젝트를 개발해보자.
//클라이언트로 부터 SSE subscription 을 수락한다.
@GetMapping(path = "/pub", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseStatus(HttpStatus.OK)
public SseEmitter subscribe() {
return eventAlarmNotificationService.subscribe(userService.getUserInfo().getId());
}
우선 이렇게 sse 구독하는 코드를 추가한다.
이렇게 하면 클라이언트 입장에서 /pub 를 통해 sse 를 구독할수 있다.
위 코드에서 subscribe 는
public SseEmitter subscribe(Long userId) {
SseEmitter emitter = new SseEmitter(TIMEOUT);
emitterMap.put(userId.toString(), emitter);
emitter.onCompletion(() -> {
this.emitterMap.remove(userId);
});
emitter.onTimeout(() -> {
emitter.complete();
});
//
log.info("SSE 구독 요청 완료: {} (스레드: {})", userId, Thread.currentThread().getName());
return emitter;
}
이렇게 구현했다. 이 메서드는 어떤 유저가 SSE로 실시간 알림 받고 싶다고 서버에 구독 요청을 보냈을 때 실행
유저가 구독했으니까 나중에 알림 보낼 때 찾을 수 있도록 emitterMap에 저장
@Async("notiExecutor")
public void sendAsync(AlarmPayload alarmPayload) { //sse 전송 메서드
String userId = alarmPayload.getReceiveUserId().toString();
SseEmitter emitter = emitterMap.get(userId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event().data(alarmPayload));
log.info("서버로부터 SSE 전송 성공: 사용자 {}, 데이터 {}", userId, alarmPayload);
} catch (IOException e) {
log.error("SSE 전송 실패: 사용자 {}, 오류 메시지 {}", userId, e.getMessage(), e);
emitter.completeWithError(e);
emitterMap.remove(userId);
}
}
else {
log.warn("SSE 전송 시도 실패: 사용자 {}에 대한 Emitter 없음", userId);
}
log.info("비동기 메서드 종료: 사용자 {} (스레드: {})", userId, Thread.currentThread().getName());
}
또, 이렇게 비동기 전송 메서드를 추가하여서 구독한 사용자에게 알림 메시지를 보낼수 있도록 구현하였다.
즉, "누가 입찰했어!" 같은 알림을 구독한 사용자한테 실시간으로 쏴주는 기능이다.
이 과정은 웹소켓을 연동하여 진행되는데, 카프카를 사용해서 더욱 실시간 처리를 원활히 하였다.
현재 이 경매 시스템은 입찰 희망하고 싶은 사람이 입찰 제안하는 창에 접속을 하면 해당 작물아이디를 기반으로 구독을 하게 된다.(stomp 를 이용한 구독) 그후, 입찰 가격을 적고 제안하기 버튼을 누르면 , 카프카 메서드를 통해 alarm 토픽에 요청정보를 보낸다.
***밑에 코드 참조
public void kafkaalarmproduce(MemberProfile memberProfile, ExArticle exArticle, String redirecturl) {
String topicName = "kafka-alarm"; // 알림 전용 토픽
try {
Deal deal = exArticle.getDeal();
if (deal != null) {
log.info("Deal current price: {}", deal.getDealCurPrice());
} else {
log.info("Deal is null");
}
// 세 개의 값을 JSON 객체로 생성
BidNotification messagePayload = BidNotification.builder().
userId(memberProfile.getId())
.articleId(exArticle.getId())
.redirectUrl(redirecturl)
.price(exArticle.getDeal() == null ? exArticle.getTrans().getTrans_sell_price() : exArticle.getDeal().getDealCurPrice())
.build();
String message = objectMapper.writeValueAsString(messagePayload); // JSON으로 직렬화
log.info("kafka messagemessagemessage"+ message);
long startTime = System.currentTimeMillis();
String key = exArticle.getId().toString(); // 게시물 ID를 키로 사용
CompletableFuture<SendResult<String, String>> future = kafkaTemplatetest.send(topicName, key, message);
future
.thenAccept(result -> {
long endTime = System.currentTimeMillis();
log.info("Message sent to topic {} with offset {} in {} ms", topicName, result.getRecordMetadata().offset(), (endTime - startTime));
})
.exceptionally(ex -> {
log.error("Failed to send message to topic {} due to {}", topicName, ex.getMessage());
return null; // Exceptionally는 Void를 반환하므로 null 반환
});
} catch (Exception e) {
log.error("Failed to convert message to JSON due to {}", e.getMessage());
}
}