시나리오
A사용자가 B사용자에게 메세지 / 멘션 / 댓글 을 달면 B사용자에게 알림이 가는 API를 만들려고 한다.
FLOW
웹을 열때 각 사용자는 SSE로 자기 이름으로 구독을 시작한다.
A사용자가 B사용자에게 메세지 보냄 > 카프카에 메세지를 보냈다고 프로듀싱 > 카프카 컨슈머가 토픽을 Listen >
SSE로 구독하고 있는 B사용자의 알림 메서드 보냄 > B사용자 종에 빨간불 ON
서버는 클라이언트에게 요청을 스스로 보낼 수 없다.
그렇기 때문에 중간에 소통을 해줄 기술이 따로 필요하다.
Poling / WebSoket / SSE (Server Sent Events) 등이 있다.
이 세가지 기술을 모두 설명하진 않고 SSE를 채택한 이유는
알림 서비스는 [ 새로운 알림이 있는지 , 실시간성 보장 ] 간단한 일만 처리하면 되기때문에
단방향통신기술인 SSE를 채택 하였다.
그 외에도 아래의 특징이 있다.
SSE는 서버의 데이터를 실시간, 지속적으로 클라이언트에 보내는 기술이다.
클라이언트에서 처음 HTTP 연결을 맺고 나면 서버는 클라이언트로 계속하여 데이터를 전송할 수 있다.
일반적으로 HTTP 요청은 하나의 [요청 - 응답] 과정을 거치고 연결을 종료한다.
하지만 파일 전송과 같이 연결 상태를 유지하고 계속 데이터를 보내는 경우도 있다.
SSE는 이와 같이 한 번 연결 후 서버에서 클라이언트로 데이터를 계속해서 보낼 수 있다.
- 단방향 통신: SSE는 서버에서 클라이언트로의 단방향 통신을 지원합니다. 서버는 클라이언트에게 이벤트를 전송하고, 클라이언트는 이벤트를 수신하여 처리할 수 있습니다.
- 간단한 프로토콜: SSE는 간단한 텍스트 기반 프로토콜을 사용합니다. 클라이언트는 일반적으로 EventSource 객체를 사용하여 서버로부터 이벤트를 수신합니다.
- 자동 재연결: SSE는 네트워크 연결이 끊어지면 자동으로 재연결을 시도합니다. 이는 사용자가 실시간 업데이트를 계속 받을 수 있도록 도와줍니다.
- 이벤트 기반 통신: SSE는 이벤트 기반으로 동작합니다. 서버는 일련의 이벤트를 클라이언트로 보내며, 클라이언트는 이벤트 핸들러를 사용하여 해당 이벤트를 처리합니다.
- 간편한 구현: SSE를 구현하는 것은 비교적 간단합니다. HTTP 헤더를 통해 SSE를 사용하고, 서버와 클라이언트 간의 통신은 기본적인 텍스트 메시지로 이루어집니다.
- 크로스 도메인 허용: 보안 정책으로 인해 많은 웹 통신이 제한되는 환경에서도 SSE는 CORS (Cross-Origin Resource Sharing) 정책을 준수하며 크로스 도메인 통신을 지원합니다.
WebSoket은 이전 글에서 다루니 참고 하시길
https://soobysu.tistory.com/76
서버 - subscribe(구독) 구현
@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class NoticeController {
private final NoticeService noticeService;
@GetMapping(value = "/subscribe", produces = "text/event-stream")
public SseEmitter subscribe(@RequestParam String memberName,
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
return noticeService.subscribe(memberName, lastEventId);
}
}
연결이 되어있는동안 전달 받지 못한 메세지 ( 네트워크 끊김 , 연결시간 만료 등) 어떻게 처리 될까 ?
lastEventId 헤더는 읽지 않은 이벤트를 알려주는 헤더이다.
이 헤더는 클라이언트가 마지막으로 수신한 데이터의 id 값을 의미한다. 이를 이용하여 유실된 데이터를 다시 보내줄 수 있다.
서버 - 서비스 연결 구현
유저의 name과, Last-Event-ID값이 아래의 subscribe()로 넘어온다.
public SseEmitter subscribe(String memberName, String lastEventId) {
String emitterCreatedTimeByMemberName = makeTimeIncludeMemberName(memberName);
SseEmitter emitter = emitterRepository.save(emitterCreatedTimeByMemberName, new SseEmitter(DEFAULT_TIMEOUT));
emitter.onCompletion(() -> emitterRepository.deleteByEmitterCreatedTimeWithMemberName(emitterCreatedTimeByMemberName));
emitter.onTimeout(() -> emitterRepository.deleteByEmitterCreatedTimeWithMemberName(emitterCreatedTimeByMemberName));
// 503 에러를 방지하기 위한 더미 이벤트 전송
String eventId = makeTimeIncludeMemberName(memberName);
sendNotification(emitter, eventId, emitterCreatedTimeByMemberName, "EventStream Created. [memberName=" + memberName + "]");
// 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
if (hasLostData(lastEventId)) {
sendLostData(lastEventId, memberName, emitterCreatedTimeByMemberName, emitter);
}
return emitter;
}
private String makeTimeIncludeMemberName(String memberName) {
return memberName + "_" + System.currentTimeMillis();
}
private void sendNotification(SseEmitter emitter, String eventCreatedTime, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(eventCreatedTime)
.name("sse")
.data(data)
);
} catch (IOException exception) {
emitterRepository.deleteByEmitterCreatedTimeWithMemberName(emitterId);
}
}
makeTimeIncludeMemberName(memberName);
userName_System.currentTimeMillis() 형태로 바꾸어 주는것을 볼 수 있는데 이것은 Last-Event-ID 헤더와 관련이 있다.
클라이언트가 유실된 시점과 비교하여 유실된 데이터만 재전송 할 수 있게 한다.
Last-Event-Id = 3_1631593143664
{3_1631593143664, data1}
{3_1831593143664, data3}
{3_1731593143664, data2}
=> data1 까지 제대로 전송되었고, data2, data3을 다시 보내야한다.
public SseEmitter subscribe(String memberName, String lastEventId) {
클라이언트의 sse연결 요청에 응답하기 위해서는 SseEmitter 객체를 만들어 반환해 주어야한다.
SseEmitter emitter = emitterRepository.save(emitterCreatedTimeByMemberName, new SseEmitter(DEFAULT_TIMEOUT));
SseEmitter 객체를 만들 때 유효 시간을 줄 수 있다. 이때 주는 시간 만큼 sse 연결이 유지되고, 시간이 지나면 자동으로 클라이언트에서 재연결 요청을 보내게 된다.
emitter.onCompletion(() -> emitterRepository.deleteByEmitterCreatedTimeWithMemberName(emitterCreatedTimeByMemberName));
emitter.onTimeout(() -> emitterRepository.deleteByEmitterCreatedTimeWithMemberName(emitterCreatedTimeByMemberName));
userName을 key로, SseEmitter를 value로 저장해둔다. 그리고 SseEmitter의 시간 초과 및 네트워크 오류를 포함한 모든 이유로 비동기 요청이 정상 동작할 수 없다면 저장해둔 SseEmitter를 삭제한다.
// 503 에러를 방지하기 위한 더미 이벤트 전송
String eventId = makeTimeIncludeMemberName(memberName);
sendNotification(emitter, eventId, emitterCreatedTimeByMemberName, "EventStream Created. [memberName=" + memberName + "]");
첫 연결 요청을 할때 SseEmitter가 생성되면 더미 데이터를 보내주어야한다.
sse 연결이 이뤄진 후, 하나의 데이터도 전송되지 않는다면 SseEmitter의 유효 시간이 끝난것으로 간주되어 503응답이 발생한다.
따라서 연결시 바로 더미 데이터를 한 번 보내준다.
// 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
if (hasLostData(lastEventId)) {
sendLostData(lastEventId, memberName, emitterCreatedTimeByMemberName, emitter);
}
Last-Event-ID값이 헤더에 있는 경우, 저장된 데이터 캐시에서 id 값과 Last-Event-ID값을 통해 유실된 데이터들만 다시 보내준다.
서버 - 데이터 전송 구현
서버에서 클라이언트로 테이터를 보내는 부분을 구현해보자.
SSE 단방향 특성상 서버가 클라이언트에 보내기만 한다.
public void send(String receiverName, String content) {
Member receiver = memberService.getMemberByMemberName(receiverName);
Notice notification = noticeRepository.save(createNotification(receiver, content));
String eventCreatedTime = receiverName + "_" + System.currentTimeMillis();
Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByMemberId(receiverName);
emitters.forEach(
(keyOfReceiverName, emitter) -> {
emitterRepository.saveEventCache(keyOfReceiverName, notification);
sendNotification(emitter, eventCreatedTime, keyOfReceiverName, new NoticeResponseDto(notification));
}
);
}
클라이언트에 보낼 데이터 객체(Notice) 를 만들고
현재 로그인 한 유저이름(memberName)을 통해 관련된 SseEmitter를 모두 가져온다.
그 후, 데이터 캐시에도 저장해주고, 실제로 데이터 전송도 한다.
여기서 간단하게 만들기 위해 서버메모리에 올리지만 고가용성을 위해 Redis 같은 캐시 메모리를 사용하는걸 권장한다.
@KafkaListener(topics = "${devsoo.kafka.notice.topic}")
public void kafkaSend(ConsumerRecord<String, Object> record) {
String receiverMemberName = record.key();
String messageContent = record.value().toString();
//SSE 로 클라이언트로 보내기
send(receiverMemberName,messageContent);
}
이후 카프카 리스너를 통해 토픽에 알람이 오면 해당 알람 받을 유저(record.key())에게
메세지를 보내는 send 메서드를 호출한다.
PostMan Test
SSE 연결
A사용자가 B사용자에게 메세지를 보내면(카프카 프로듀싱)
아래처럼 컨슈머를 통해 메세지가 들어오고 SSE를 통해 잘 전달 된것을 볼 수 있다.
참고문헌
https://hstory0208.tistory.com/entry/Spring-kafka와-SSEServer-Send-Event를-사용한-실시간-알림-전송-방법
https://develoyummer.tistory.com/112
끗!
'개-발 > Infra' 카테고리의 다른 글
[Redis] 선착순 이벤트 구현 레디스 분산락 (Redisson DistributeLock) (0) | 2024.03.03 |
---|---|
[Kafka] MSA 서버간 비동기 통신 (요청 - 응답 모델) (0) | 2024.02.29 |
[Docker + NginX + Jenkins] Jenkins pipline verbose 옵션 CICD번외 편 (0) | 2023.12.13 |
[Docker + NginX + Jenkins] Spring 프로젝트 Blue,Green 무중단 배포 CI CD 2/2 (2) | 2023.12.03 |
[Docker + NginX + Jenkins] Spring 프로젝트 Blue,Green 무중단 배포 CI CD 1/2 (2) | 2023.11.28 |