💡알림기능 사용목적
yata project에서는 앱 사용의 편의성을 위해
카풀 신청 완료시, 또는 내가 쓴 게시글에 신청 요청이 왔을 때 알림 기능을 넣기로 결정하였고,
로그인 상태일 때 실시간으로 알림이 오도록 구현하도록 하였다.
알림 기능을 만들며,
Spring AOP를 통해 기능분리를 하였고,
유지보수성, 확정성을 위해 Spring Annotation을 활용하여 적용하였다.
먼저 같이 Notify와 관련된 controller,dto,entity,repository,service 클래스를 정의해 줄 것이다.
그 후 Custom Annotation에 Advice를 붙여 NotifyAspect를 구현한다.
AOP에 대한 것은 아래 포스팅에서 자세하게 다뤘다.
[프로젝트]Custom Annotation으로 알림기능 Spring AOP 적용
서버의 event를 client로 보내는 기술에는 웹소켓, SSE,polling 등이 있다.
우리는 그중 SSE방식을 이용하여 알림 기능을 구현하였다.
SSE(Server-sent-event)
서버의 event를 client로 실시간, 지속적으로 보내는 기술
이벤트가 [ 서버 → 클라이언트 ] 방향으로만 흐르는 단방향 통신
클라이언트에서 처음 HTTP 연결을 맺고 나면 서버는 클라이언트로 지속적인 데이터 전송이 가능
일반적인 HTTP요청은 [요청 - 응답]의 과정을 거치고 연결을 종료하는 반면 SSE 방식은한번 연결하면 클라이언트로 데이터를 계속 보낼 수 있따.
클라이언트가 주기적으로 HTTP 요청을 보낼 필요가 없이 HTTP 연결을 통해 서버에서 클라이언트로 테이터 전달 가능
단점)
지속적인 연결을 유지해야 하는 특성상 서버 리소스와 클라이언트의 네트워크 연결을 소비하게 되고, 많은 수의 클라이언트가 동시에 연결을 유지하면 서버의 처리 부하가 증가하라 수 있다.
네트워크 연결이 불안정한 경우에는 연결이 종료되고 재연결이 필요하므로 추가적인 네트워크 오버헤드가 발생할 수 있다.
yata project에서는 지속적인 요청을 보내는 polling보다 리소스 낭비가 적고,
양방향 통신인 웹 소켓에 비해 가벼운 SSE방식을 선택하였다.
또한 Spring Farmework 5부터 Webflux를 이용해서도 통신을 할 수 있지만,
우리는 Spring framework 4.2부터 생긴 SSE 통신을 지원하는 SseEmitter클래스를 통해 SSE 알림 서비스를 구현하였다.
📌Notify Entity 정의
@Entity
@Getter
@Setter
public class Notify extends Auditable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "notification_id")
private Long id;
private String content;
/*@Embedded
private NotificationContent content;*/
//@Embedded
//private RelatedURL url;
private String url;
@Column(nullable = false)
private Boolean isRead;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private NotificationType notificationType;
@ManyToOne
@JoinColumn(name = "member_id")
@OnDelete(action = OnDeleteAction.CASCADE)
private Member receiver;
@Builder
public Notify(Member receiver, NotificationType notificationType, String content, String url, Boolean isRead) {
this.receiver = receiver;
this.notificationType = notificationType;
this.content = content;
this.url = url;
this.isRead = isRead;
}
public Notify() {
}
public enum NotificationType{
YATA, REVIEW, CHAT
}
}
필드내용
id
content
url
isRead
notifycationType(YATA, REVIEW, CHAT)
receiver
📌Notify Controller
@RestController
@RequestMapping("/api/v1/notify")
public class NotifyController {
private final NotifyService notifyService;
public NotifyController(NotifyService notifyService) {
this.notifyService = notifyService;
}
@GetMapping(value = "/subscribe", produces = "text/event-stream")
public SseEmitter subscribe(@AuthenticationPrincipal User principal,
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
return notifyService.subscribe(principal.getUsername(), lastEventId);
}
}
Last-Event-ID => 이전에 받지 못한 이벤트가 존재하는 경우(SSE 연결에 대한 시간 만료 혹은 종료),
받은 마지막 이벤트 ID 값을 넘겨 그 이후의 데이터(받지 못한 데이터)부터 받을 수 있게 할 수 있는 정보를 의미
subscribe()
실제 클라이언트로부터 오는 알림 구독 요청을 받는다.
SSE 통신을 위해서는 produces로 반환할 데이터 타입을 "text/event-stream"으로 해주어야 함
유저 정보와 Last-Event-ID를 헤더로 받음
notifyService.subscribe()를 호출하여 사용자의 구독을 처리하고,
누구로부터 온 알림 구독인지에 대한 부분은 @AuthenticationPrincipal을 활용해 입력
이전에 받지 못한 정보가 있다면, Last-Event-ID라는 헤더와 함께 날아오므로 이에 대한 정보를 받아주도록 한다.
그리고 실제 구독하는 작업을 진행 후 SseEmitter객체를 반환(클라이언트에게 이벤트 스트림을 전송 가능)
📌Repository
Repository는 알림 객체를 저장하고 관리하는 역할인 NotifyRepository와
SSE연결을 관리하는 SseEmitter 객체와 이벤트 캐시를 맵 형태로 저장하고 관리하는 EmitterRepository를 만들어 주었다.
이벤트 캐시
클라이언트가 연결을 잃어도 이벤트 유실을 방지하기 위해 임시로 저장되는 데이터
ex) save() 메서드로 emitters맵에 SseEmitter객체를 저장,
findAllEmitterStartWithByMemberId()메서드로 특정 조건에 맞는 SseEmitter객체들을 조회하여 반환
Emitter Repository
package com.yata.backend.domain.notify.repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
public interface EmitterRepository {
SseEmitter save(String emitterId, SseEmitter sseEmitter);
void saveEventCache(String emitterId, Object event);
Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId);
Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId);
void deleteById(String id);
void deleteAllEmitterStartWithId(String memberId);
void deleteAllEventCacheStartWithId(String memberId);
}
Emitter RepositoryImpl
@Repository
public class EmitterRepositoryImpl implements EmitterRepository{
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
@Override
public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter);
return sseEmitter;
}
@Override
public void saveEventCache(String eventCacheId, Object event) {
eventCache.put(eventCacheId, event);
}
@Override
public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public void deleteById(String id) {
emitters.remove(id);
}
@Override
public void deleteAllEmitterStartWithId(String memberId) {
emitters.forEach(
(key, emitter) -> {
if (key.startsWith(memberId)) {
emitters.remove(key);
}
}
);
}
@Override
public void deleteAllEventCacheStartWithId(String memberId) {
eventCache.forEach(
(key, emitter) -> {
if (key.startsWith(memberId)) {
eventCache.remove(key);
}
}
);
}
}
emitter 는 클라이언트가 구독을 요청하면 해당 사용자의 식별자를 키로 사용하여 맵에 저장, 이후 알림을 전송할 때 해당 사용자의 SseEmitter를 조회하기 위해 사용
eventCache는 알림을 받을 사용자의 식별자를 키로 저장하고, 해당 사용자에게 전송되지 못한 이밴트를 캐시로 저장!!
저장된 이벤트는 사용자가 구독할 때, 클라이언트로 전송되어 이벤트의 유실을 방지!
emitters와 eventCache가 맵 형태인 이유?
emitters의 경우 key와 value에 각각 emitterId와 SseEmitter객체를 저장하고
eventCache는 key와 value에 각각 eventCacheId와 euentCache객체를 저장한다.
이를 통해 주어진 키를 사용하여 빠르게 데이터를 검색할 수 있고, 중복된 키를 가진 데이터를 저장할 수 없다.
concurrentHashMap은 여러 스레드에서 동시에 안전하게 데이터에 접근할 수 있는 맵 구현체임
알림 시스템에서는 여러 클라이언트가 동시에 구독하고 이벤트를 전송할 수 있으므로, 동시성을 제어하는 것이 중요
따라서 맵 형태를 사용함으로서 데이터 저장, 고유성 보장, 성능 등의 이점을 얻으며 효율적으로 객체를 관리할 수 있다.
그러나 위의 Repository는 JPA를 사용하지 않고, 단순히 메모리 내 맵을 이용하여 데이터를 관리하고 있기 때문에 직접 CRUD를 구현해야 한다.
💡메소드 설명
save() => emitterID와 sseEmitter를 사용하여 SSE이벤트 전송 객체를 저장
saveEventCache() => 이벤트캐시 아이디와 이벤트 객체를 받아 저장.
findAllEmitterStartWithByMemberID() 주어진 memberId로 시작하는 모든 Emitter를 가져옴
findAllEventCacherStartWithByMemberID() ''
deleteById() => 삭제
deleteAllEmitterStartWithId - 해당 회원과 관련된 모든 Emitter를 지움
deleteAllEventCacheStartWithId - 해당 회원과 관련된 모든 이벤트를 지움
Emitter와 이벤트를 찾는 부분에 있어 startsWith을 사용하는 이유
저장할 때 뒤에 구분자로 회원의 ID를 사용하기 때문에 해당 회원과 관련된 Emitter와 이벤트들을 찾아오는 것이다.
Notify Repository
public interface NotifyRepository extends JpaRepository<Notify, Long> {
}
📌Notify Service
@Service
public class NotifyService {
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final EmitterRepository emitterRepository;
private final NotifyRepository notifyRepository;
public NotifyService(EmitterRepository emitterRepository, NotifyRepository notifyRepository) {
this.emitterRepository = emitterRepository;
this.notifyRepository = notifyRepository;
}
public SseEmitter subscribe(String username, String lastEventId) {
String emitterId = makeTimeIncludeId(username);
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
// 503 에러를 방지하기 위한 더미 이벤트 전송
String eventId = makeTimeIncludeId(username);
sendNotification(emitter, eventId, emitterId, "EventStream Created. [userEmail=" + username + "]");
// 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
if (hasLostData(lastEventId)) {
sendLostData(lastEventId, username, emitterId, emitter);
}
return emitter;
}
private String makeTimeIncludeId(String email) {
return email + "_" + System.currentTimeMillis();
}
private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(eventId)
.name("sse")
.data(data)
);
} catch (IOException exception) {
emitterRepository.deleteById(emitterId);
}
}
private boolean hasLostData(String lastEventId) {
return !lastEventId.isEmpty();
}
private void sendLostData(String lastEventId, String userEmail, String emitterId, SseEmitter emitter) {
Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(userEmail));
eventCaches.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendNotification(emitter, entry.getKey(), emitterId, entry.getValue()));
}
//@Override
public void send(Member receiver, Notify.NotificationType notificationType, String content, String url) {
Notify notification = notifyRepository.save(createNotification(receiver, notificationType, content, url));
String receiverEmail = receiver.getEmail();
String eventId = receiverEmail + "_" + System.currentTimeMillis();
Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByMemberId(receiverEmail);
emitters.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, notification);
sendNotification(emitter, eventId, key, NotifyDto.Response.createResponse(notification));
}
);
}
private Notify createNotification(Member receiver, Notify.NotificationType notificationType, String content, String url) {
return Notify.builder()
.receiver(receiver)
.notificationType(notificationType)
.content(content)
.url(url)
.isRead(false)
.build();
}
}
📌subscribe() 메서드 설명
public SseEmitter subscribe(String username, String lastEventId) {
String emitterId = makeTimeIncludeId(username);
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
// 503 에러를 방지하기 위한 더미 이벤트 전송
String eventId = makeTimeIncludeId(username);
sendNotification(emitter, eventId, emitterId, "EventStream Created. [userEmail=" + username + "]");
// 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
if (hasLostData(lastEventId)) {
sendLostData(lastEventId, username, emitterId, emitter);
}
return emitter;
}
private String makeTimeIncludeId(String email) {
return email + "_" + System.currentTimeMillis();
}
username과 lastEventId를 받아 구독을 설정
emitterId 생성 => makeTimeIncludeId() 메서드를 통해 username을 포함한 SseEmitter를 식별하기 위한 고유 아이디 생성
SseEmitter 객체 생성/저장 => emitterRepository.save(emitterId, mew SseEmitter(DEFAULR_TIMEOUT)); 로 새로운 SseEmitter 객체 생성, emitterId를 키로 사용해 emitterRepository에 저장 => 이렇게 생성된 SseEmitter는 클라이언트에게 이벤트를 전송하는 역할 수행
onComplecation() onTimeout()
SseEmitter가 완료/시간초과 시 SseEmitter 삭제
makeTimeIncludeId()
한 브라우저에서 여러개의 구독을 진행할 때 탭마다 SseEmitter의 구분을 위해 시간을 붙여 구분할 수 있어야 함
Last-Event-ID로 마지막 전송받은 이벤트 ID가 무엇인지 알고, 받지 못한 데이터 정보들에 대해 인지할 수 있어야 함
ex) 2849이라는 ID를 가진 회원의 이벤트 중 가장 마지막으로 발생한 이벤트? => 시간을 기준으로 구분할 수 있게 된다.
더미 이벤트 전송
등록 후 SseEmitter의 유효시간 동안 데이터가 전송되지 않으면 503에러 발생 => 맨 처음 연결 진행 더미데이터를 보내 이를 방지
📌SendNotification()
private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(eventId)
.name("sse")
.data(data)
);
} catch (IOException exception) {
emitterRepository.deleteById(emitterId);
}
}
SseEmitter를 통해 이벤트를 전송하는 메서드
파라미터
이벤트를 전송할 대상인 SseEmitter 객체인 emitter,
eventId,
emitterId(SseEmitter를 식별하기 위한 고유 ID),
Objcet data (전송할 데이터 객체)
SseEmitter.event()호출하여 SseEventBuilder객체 생성, 이벤트에 파라미터로 받은 값들을 넣어준 후
emitter.send()를 사용하여 SseEmitter를 통해 이벤트 전송
만약 이벤트 전송 중 IOException이 발생하면(클라이언트와의 연결이 끊어졌거나 오류가 발생하면)
해당 연결을 종료하고 관련 정보를 삭제
📌HasLostData()
private boolean hasLostData(String lastEventId) {
return !lastEventId.isEmpty();
}
마지막 이벤트 ID를 기반으로 구독자가 받지 못한 데이터가 있는지 확인
⭐lastEventid가 비어있지 않다 => controller의 해더를 통해 lastEventId가 들어왔다 => 손실된 이벤트가 있다 => true
⭐lastEventId가 비어있다 => false
📌SendLostData()
private void sendLostData(String lastEventId, String userEmail, String emitterId, SseEmitter emitter) {
Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(userEmail));
eventCaches.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendNotification(emitter, entry.getKey(), emitterId, entry.getValue()));
}
구독자가 받지 못한 데이터가 있다면, 구독자에게 전송하는 메서드
구독자의 이메일을 기반으로 이벤트 캐시를 가져와 마지막 이벤트 ID와 비교하여 미수신한 데이터를 전송
📌Send() 메서드 설명
//@Override
public void send(Member receiver, Notify.NotificationType notificationType, String content, String url) {
Notify notification = notifyRepository.save(createNotification(receiver, notificationType, content, url));
String receiverEmail = receiver.getEmail();
String eventId = receiverEmail + "_" + System.currentTimeMillis();
Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByMemberId(receiverEmail);
emitters.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, notification);
sendNotification(emitter, eventId, key, NotifyDto.Response.createResponse(notification));
}
);
}
지정된 수신자에게 알림을 전송하는 메서드
받아온 정보를 저장한 후 Notify 객체를 생성
findAllEmitterStartWithByMemberId(receiverEmail)을 통해 수신자 이메일로 시작하는 모든 SseEmitter객체(이벤트 스트리밍을 위한 객체)를 가져옴
각 SseEmitter에 대해 이벤트 캐시에 key와 생성한 Notify객체를 저장하고,
SendNotifycation메서드를 호출하여 알림과 관련된 데이터(eventId,key,ResponseNotifyDto)를 emitter로 전송
📌NotifyDto 정의
public class NotifyDto {
@AllArgsConstructor
@Builder
@NoArgsConstructor
@Getter
@Setter
public static class Response {
String id;
String name;
String content;
String type;
String createdAt;
public static Response createResponse(Notify notify) {
return Response.builder()
.content(notify.getContent())
.id(notify.getId().toString())
.name(notify.getReceiver().getName())
.createdAt(notify.getCreatedAt().toString())
.build();
}
}
}
알림 Service코드의 Send()메서드에서 SSE를 클라이언트에게 전송할 때, 이벤트의 데이터로 전송할 DTo
📌NotifyMessage Enum
public enum NotifyMessage {
YATA_NEW_REQUEST("새로운 야타 참가 요청이 있습니다.");
private String message;
NotifyMessage(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
📌NotifyAspect 클래스 정의
아래는 NotifyAspect클래스의 전체 코드이다.
Aspect 클래스에는
부가 기능 (Advice) + 적용할 위치 (Pointcut)를 적용한다.
@Aspect
@Slf4j
@Component
@EnableAsync
public class NotifyAspect {
// yataRequestService createRequest method pointcut
private final NotifyService notifyService;
public NotifyAspect(NotifyService notifyService) {
this.notifyService = notifyService;
}
@Pointcut("@annotation(com.yata.backend.domain.notify.annotation.NeedNotify)")
public void annotationPointcut() {
}
@Async
@AfterReturning(pointcut = "annotationPointcut()", returning = "result")
public void checkValue(JoinPoint joinPoint, Object result) throws Throwable {
NotifyInfo notifyProxy = (NotifyInfo) result;
notifyService.send(
notifyProxy.getReceiver(),
notifyProxy.getNotificationType(),
NotifyMessage.YATA_NEW_REQUEST.getMessage(),
"/api/v1/yata/" + (notifyProxy.getGoUrlId())
);
log.info("result = {}", result);
}
}
이제 위 코드에 적용된 포인트컷과 부가기능에 대해 좀 더 상세히 알아보자
📌pointcut
@Pointcut("@annotation(com.yata.backend.domain.notify.annotation.NeedNotify)")
public void annotationPointcut() {
}
먼저 포인트 컷이다.
annotatinoPointCut()메서드를 통해
com.yata.backend.domain.notify.annotation.NeedNotify에 정의한
어노테이션이 적용된 메소드들을 대상으로 AOP를 적용할 수 있게 되었다.
📌NotifyInfo 정의
public interface NotifyInfo {
Member getReceiver();
Long getGoUrlId();
Notify.NotificationType getNotificationType();
}
@Entity
@Getter
@Setter
@Builder
@AllArgsConstructor
@NoArgsConstructor
@ToString(exclude = "yata")
public class YataRequest extends Auditable implements NotifyInfo {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long YataRequestId;
@Column(length = 100, nullable = false)
private String title;
.
.
.
.
@Override
public Member getReceiver() {
return yata.getMember();
}
@Override
public Long getGoUrlId() {
return yata.getYataId();
}
@Override
public Notify.NotificationType getNotificationType() {
return Notify.NotificationType.YATA;
}
}
📌Advice 정의
@Async
@AfterReturning(pointcut = "annotationPointcut()", returning = "result")
public void checkValue(JoinPoint joinPoint, Object result) throws Throwable {
NotifyInfo notifyProxy = (NotifyInfo) result;
notifyService.send(
notifyProxy.getReceiver(),
notifyProxy.getNotificationType(),
NotifyMessage.YATA_NEW_REQUEST.getMessage(),
"/api/v1/yata/" + (notifyProxy.getGoUrlId())
);
log.info("result = {}", result);
}
@Async -> 해당 메소드를 비동기적으로 실행하도록 지정. 이렇게 설정된 메소드는 별도의 스레드에서 실행되며, 호출자는 비동기 메소드의 결과를 기다리지 않고, 다음 작업을 할 수 있게 된다.
@AfterReturning -> 해당 포인트컷이 정상적으로 실행되었을 시 수행
joinPoint와 정상적으로 실행한 후반환된 결과를 result 매개변수로 받아옴
먼저 result를 NotifyInfo타입으로 형변환 -> NotifyInfo타입의 메소드를 호출할 수 있게 됨
이렇게 구현된 API를 기반으로 JS에서 EventSource를 만들어 요청을 진행하면 알림에 대한 구독을 진행할 수 있다.
'Spring' 카테고리의 다른 글
Pagination 조회 시의 Query tuning , 그리고 N+1 (2) | 2023.09.13 |
---|---|
Hibernate spatial이란? / 간단한 쿼리 메서드 (0) | 2023.07.12 |
[프로젝트] Spring AOP로 로그 구현 (0) | 2023.06.29 |
[프로젝트] Custom Annotation으로 알림 기능 Spring AOP 적용 (7) | 2023.06.29 |
[프로젝트]반경 내 위치의 게시글 검색(GIS) (1) | 2023.06.27 |