Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PR] 알림 기능 개발 (SSE Emitter) #38

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.omoknoone.onionhotsayyo.notification.command.aggregate;

import java.time.LocalDateTime;

import org.hibernate.annotations.CreationTimestamp;
import org.omoknoone.onionhotsayyo.member.aggregate.Member;
import org.springframework.stereotype.Service;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.FetchType;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.ManyToOne;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@Entity
@Table(name = "notification")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Notification {

@Id
@Column
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int notificationId;

@Column
private String message;

@Column
@CreationTimestamp
private LocalDateTime eventTime;

@JoinColumn(name = "member_id")
ms1011 marked this conversation as resolved.
Show resolved Hide resolved
// @ManyToOne(fetch = FetchType.LAZY)
private String memberId;

@Column
private String url;

@Column
private boolean isChecked;

@Column
private String type;



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.omoknoone.onionhotsayyo.notification.command.controller;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.omoknoone.onionhotsayyo.notification.command.service.NotificationService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@RestController
@RequestMapping("/notifications")
public class NotificationController {
private final NotificationService notificationService;

@Autowired
public NotificationController(NotificationService notificationService) {
this.notificationService = notificationService;
}

@GetMapping(value = "/subscribe", produces = "text/event-stream")
public SseEmitter subscribe(@RequestParam String memberName,
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
return notificationService.subscribe(memberName, lastEventId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.omoknoone.onionhotsayyo.notification.command.dto;

import java.time.LocalDateTime;

import org.hibernate.annotations.CreationTimestamp;
import org.omoknoone.onionhotsayyo.member.aggregate.Member;
import org.omoknoone.onionhotsayyo.notification.command.aggregate.Notification;

import jakarta.persistence.Column;
import jakarta.persistence.FetchType;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.JoinColumn;
import jakarta.persistence.ManyToOne;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class NotificationDTO {

private int notificationId;
private String message;
private LocalDateTime eventTime;
private String memberId;
private String url;
private boolean isChecked;
private String type;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.omoknoone.onionhotsayyo.notification.command.repository;

import java.util.Map;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

public interface EmitterRepository {

// Emitter를 저장한다
SseEmitter save(String emitterId, SseEmitter sseEmitter);

// 이벤트를 저장한다
void saveEventCache(String emitterId, Object event);

// 해당 회원과 관련된 모든 Emitter를 찾는다. (브라우저당 여러 개 연결이 가능하기에 여러 Emitter가 존재할 수 있다.)
Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId);

// 해당 회원과 관련된 모든 이벤트를 찾는다
Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId);

// Emitter를 지운다
void deleteById(String id);

// 해당 회원과 관련된 모든 Emitter를 지운다
void deleteAllEmitterStartWithId(String memberId);

// 해당 회원과 관련된 모든 이벤트를 지운다
void deleteAllEventCacheStartWithId(String memberId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package org.omoknoone.onionhotsayyo.notification.command.repository;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.springframework.stereotype.Repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Repository
public class EmitterRepositoryImpl implements EmitterRepository{

private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();


public EmitterRepositoryImpl() {
}


@Override
public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter);
return sseEmitter;
}

@Override
public void saveEventCache(String eventCacheId, Object event) {
eventCache.put(eventCacheId, event);
}

@Override
public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId) {
System.out.println("memberId = " + memberId);
System.out.println("emitters = " + emitters);
Map<String, SseEmitter> tempMap = emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
System.out.println("tempMap = " + tempMap);
for (String s : tempMap.keySet()) {
System.out.println(s + " | tempMap = " + tempMap.get(s));
}
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public void deleteById(String id) {
emitters.remove(id);
}

@Override
public void deleteAllEmitterStartWithId(String memberId) {
emitters.forEach(
(key, emitter) -> {
if (key.startsWith(memberId)) {
emitters.remove(key);
}
}
);
}

@Override
public void deleteAllEventCacheStartWithId(String memberId) {
eventCache.forEach(
(key, emitter) -> {
if (key.startsWith(memberId)) {
eventCache.remove(key);
}
}
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.omoknoone.onionhotsayyo.notification.command.repository;

import org.omoknoone.onionhotsayyo.notification.command.aggregate.Notification;
import org.springframework.data.jpa.repository.JpaRepository;

public interface NotificationRepository extends JpaRepository<Notification,Integer> {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package org.omoknoone.onionhotsayyo.notification.command.service;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Map;

import org.modelmapper.ModelMapper;
import org.omoknoone.onionhotsayyo.follow.command.aggregate.Follow;
import org.omoknoone.onionhotsayyo.member.aggregate.Member;
import org.omoknoone.onionhotsayyo.member.service.MemberService;
import org.omoknoone.onionhotsayyo.notification.command.aggregate.Notification;
import org.omoknoone.onionhotsayyo.notification.command.dto.NotificationDTO;
import org.omoknoone.onionhotsayyo.notification.command.repository.EmitterRepository;
import org.omoknoone.onionhotsayyo.notification.command.repository.NotificationRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Service
public class NotificationService {
ms1011 marked this conversation as resolved.
Show resolved Hide resolved

private final MemberService memberService;
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final NotificationRepository notificationRepository;
private final EmitterRepository emitterRepository;
private final ModelMapper modelMapper;

public NotificationService(MemberService memberService, NotificationRepository notificationRepository,
EmitterRepository emitterRepository, ModelMapper modelMapper) {
this.memberService = memberService;
this.notificationRepository = notificationRepository;
this.emitterRepository = emitterRepository;
this.modelMapper = modelMapper;
}

// 로그인 한 유저 emitter 생성
public SseEmitter subscribe(String userId, String lastEventId) {

// Last-Event-Id 값에서 데이터가 유실된 시점을 파악하기 위하여 시간까지 표시되게 함
String id = userId + "_" + System.currentTimeMillis();

// emitter 저장
SseEmitter emitter = emitterRepository.save(id, new SseEmitter(DEFAULT_TIMEOUT));

// 연결 종료 처리
emitter.onCompletion(() -> emitterRepository.deleteById(id));
emitter.onTimeout(() -> emitterRepository.deleteById(id));

// 503 에러를 방지하기 위해 접속되었을 때 더미 이벤트 전송
sendToClient(emitter, id, "EventStream Created. [userId=" + userId + "]");

// 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
if (!lastEventId.isEmpty()) {
Map<String, SseEmitter> events = emitterRepository.findAllEmitterStartWithByMemberId(userId);
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue()));
}

return emitter;
}

private void sendToClient(SseEmitter emitter, String id, Object data) {
System.out.println("[sendToClient] emitter = " + emitter);
System.out.println("id = " + id);
System.out.println("data = " + data);
try {
emitter.send(SseEmitter.event()
.id(id)
.name("sse")
.data(data));
} catch (IOException exception) {
emitterRepository.deleteById(id);
throw new RuntimeException("emitter 연결 오류");
}
}

public void send(String receiverName, String content) {
System.out.println("[send] receiverName = " + receiverName);
Member receiver = memberService.findByMemberId(receiverName);
System.out.println("receiver = " + receiver);
Notification notification = notificationRepository.save(createNotification(receiver, content));
System.out.println("notification = " + notification);

// String eventCreatedTime = receiverName + "_" + System.currentTimeMillis();

// 로그인 한 유저의 emitter 모두 가져오기
Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByMemberId(receiverName);
System.out.println("[send] emitters = " + emitters);
emitters.forEach(
(keyOfReceiverName, emitter) -> {
System.out.println("keyOfReceiverName = " + keyOfReceiverName);
System.out.println("emitter = " + emitter);
emitterRepository.saveEventCache(keyOfReceiverName, notification);

sendToClient(emitter, keyOfReceiverName, modelMapper.map(notification, NotificationDTO.class));
}
);
}

private Notification createNotification(Member receiver, String content) {
// return Notification.builder()
// .receiver(receiver)
// .content(content)
// .review(review)
// .url("/reviews/" + review.getId())
// .isRead(false)
// .build();
Notification notification = new Notification();
notification.setMemberId(receiver.getMemberId());
notification.setChecked(false);
notification.setMessage(content);
// notification.setUrl();
notification.setEventTime(LocalDateTime.now());

return notification;
}


// 알림을 '읽음' 상태로 만듦
public void checkNotification (Notification notification) {
notification.setChecked(true);
notificationRepository.save(notification);
}




}