From db7060ab826f8f9b64011c3d36b912b7fa4eba82 Mon Sep 17 00:00:00 2001 From: 1000hyehyang Date: Wed, 11 Feb 2026 20:43:54 +0900 Subject: [PATCH 1/5] =?UTF-8?q?refactor:=20=EC=95=8C=EB=A6=BC=20=EC=8B=9C?= =?UTF-8?q?=EC=8A=A4=ED=85=9C=20Outbox=20=ED=8C=A8=ED=84=B4=20+=20RabbitMQ?= =?UTF-8?q?=EB=A1=9C=20=EC=A0=84=ED=99=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../NotificationDeliveryClaimService.java | 85 +++++ .../NotificationDeliveryProcessor.java | 106 ------ .../service/NotificationOutboxService.java | 130 +++++++ .../service/NotificationService.java | 39 ++- .../domain/entity/NotificationDelivery.java | 20 +- .../domain/entity/NotificationOutbox.java | 76 +++++ .../domain/entity/enums/DeliveryStatus.java | 7 +- .../domain/entity/enums/OutboxStatus.java | 20 ++ .../NotificationDeliveryRepository.java | 34 +- .../NotificationOutboxRepository.java | 80 +++++ .../NotificationDeliveryConsumer.java | 127 +++++++ .../NotificationDeliveryMessage.java | 48 +++ .../NotificationOutboxPublisher.java | 79 +++++ .../NotificationRecoveryScheduler.java | 145 ++++++++ .../messaging/RabbitMqConfig.java | 130 +++++++ .../sender/NotificationChannelSender.java | 2 +- .../worker/NotificationDeliveryWorker.java | 83 ----- ...ificationEventListenerIntegrationTest.java | 316 ------------------ .../NotificationChannelResolverTest.java | 61 ---- ...otificationMessageTemplateServiceTest.java | 77 ----- 20 files changed, 991 insertions(+), 674 deletions(-) create mode 100644 src/main/java/com/example/RealMatch/notification/application/service/NotificationDeliveryClaimService.java delete mode 100644 src/main/java/com/example/RealMatch/notification/application/service/NotificationDeliveryProcessor.java create mode 100644 src/main/java/com/example/RealMatch/notification/application/service/NotificationOutboxService.java create mode 100644 src/main/java/com/example/RealMatch/notification/domain/entity/NotificationOutbox.java create mode 100644 src/main/java/com/example/RealMatch/notification/domain/entity/enums/OutboxStatus.java create mode 100644 src/main/java/com/example/RealMatch/notification/domain/repository/NotificationOutboxRepository.java create mode 100644 src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryConsumer.java create mode 100644 src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryMessage.java create mode 100644 src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationOutboxPublisher.java create mode 100644 src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationRecoveryScheduler.java create mode 100644 src/main/java/com/example/RealMatch/notification/infrastructure/messaging/RabbitMqConfig.java delete mode 100644 src/main/java/com/example/RealMatch/notification/infrastructure/worker/NotificationDeliveryWorker.java delete mode 100644 src/test/java/com/example/RealMatch/notification/application/event/NotificationEventListenerIntegrationTest.java delete mode 100644 src/test/java/com/example/RealMatch/notification/application/service/NotificationChannelResolverTest.java delete mode 100644 src/test/java/com/example/RealMatch/notification/application/service/NotificationMessageTemplateServiceTest.java diff --git a/src/main/java/com/example/RealMatch/notification/application/service/NotificationDeliveryClaimService.java b/src/main/java/com/example/RealMatch/notification/application/service/NotificationDeliveryClaimService.java new file mode 100644 index 00000000..6c283681 --- /dev/null +++ b/src/main/java/com/example/RealMatch/notification/application/service/NotificationDeliveryClaimService.java @@ -0,0 +1,85 @@ +package com.example.RealMatch.notification.application.service; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import com.example.RealMatch.notification.domain.entity.NotificationDelivery; +import com.example.RealMatch.notification.domain.entity.enums.DeliveryStatus; +import com.example.RealMatch.notification.domain.repository.NotificationDeliveryRepository; + +import lombok.RequiredArgsConstructor; + +/** + * Delivery 상태 전이 서비스. Consumer에서 호출. + * 각 메서드가 독립 TX → 외부 API(FCM/SMTP)가 TX 밖에서 실행됨을 보장. + * 흐름: claimDelivery() → (외부 발송) → markSent() / recordFailure() + */ +@Service +@RequiredArgsConstructor +public class NotificationDeliveryClaimService { + + private static final Logger LOG = LoggerFactory.getLogger(NotificationDeliveryClaimService.class); + + private static final List CLAIMABLE_STATUSES = + List.of(DeliveryStatus.PENDING, DeliveryStatus.RETRY); + + private final NotificationDeliveryRepository deliveryRepository; + + /** PENDING/RETRY → IN_PROGRESS 조건절 UPDATE. 1 row면 성공. */ + @Transactional + public boolean claimDelivery(UUID deliveryId) { + int updated = deliveryRepository.claimDelivery( + deliveryId, + DeliveryStatus.IN_PROGRESS, + LocalDateTime.now(), + CLAIMABLE_STATUSES); + + if (updated == 0) { + LOG.debug("[DeliveryClaim] Claim failed (already processed). deliveryId={}", deliveryId); + } + return updated > 0; + } + + /** IN_PROGRESS → SENT. */ + @Transactional + public void markSent(UUID deliveryId, String providerMessageId) { + NotificationDelivery delivery = deliveryRepository.findById(deliveryId).orElse(null); + if (delivery == null) { + LOG.warn("[DeliveryClaim] Delivery not found for markSent. deliveryId={}", deliveryId); + return; + } + delivery.markAsSent(providerMessageId); + LOG.debug("[DeliveryClaim] Marked SENT. deliveryId={}, providerId={}", deliveryId, providerMessageId); + } + + /** 일시적 실패. attemptCount 기준 RETRY(backoff) 또는 FAILED. */ + @Transactional + public void recordFailure(UUID deliveryId, String reason) { + NotificationDelivery delivery = deliveryRepository.findById(deliveryId).orElse(null); + if (delivery == null) { + LOG.warn("[DeliveryClaim] Delivery not found for recordFailure. deliveryId={}", deliveryId); + return; + } + delivery.recordFailure(reason); + LOG.warn("[DeliveryClaim] Recorded failure. deliveryId={}, attempt={}, newStatus={}", + deliveryId, delivery.getAttemptCount(), delivery.getStatus()); + } + + /** 영구 실패(잘못된 토큰, 미존재 이메일 등). 재시도 불가. */ + @Transactional + public void markPermanentlyFailed(UUID deliveryId, String reason) { + NotificationDelivery delivery = deliveryRepository.findById(deliveryId).orElse(null); + if (delivery == null) { + LOG.warn("[DeliveryClaim] Delivery not found for permanent failure. deliveryId={}", deliveryId); + return; + } + delivery.markAsPermanentlyFailed(reason); + LOG.warn("[DeliveryClaim] Marked PERMANENTLY FAILED. deliveryId={}, reason={}", deliveryId, reason); + } +} diff --git a/src/main/java/com/example/RealMatch/notification/application/service/NotificationDeliveryProcessor.java b/src/main/java/com/example/RealMatch/notification/application/service/NotificationDeliveryProcessor.java deleted file mode 100644 index 8c3e3d87..00000000 --- a/src/main/java/com/example/RealMatch/notification/application/service/NotificationDeliveryProcessor.java +++ /dev/null @@ -1,106 +0,0 @@ -package com.example.RealMatch.notification.application.service; - -import java.util.EnumMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; - -import com.example.RealMatch.notification.domain.entity.Notification; -import com.example.RealMatch.notification.domain.entity.NotificationDelivery; -import com.example.RealMatch.notification.domain.entity.enums.DeliveryStatus; -import com.example.RealMatch.notification.domain.repository.NotificationDeliveryRepository; -import com.example.RealMatch.notification.domain.repository.NotificationRepository; -import com.example.RealMatch.notification.infrastructure.sender.NotificationChannelSender; -import com.example.RealMatch.notification.infrastructure.sender.PermanentSendFailureException; -import com.example.RealMatch.user.domain.entity.enums.NotificationChannel; - -/** - * 개별 NotificationDelivery 건의 발송을 처리하는 프로세서. - * 각 delivery를 독립 트랜잭션(REQUIRES_NEW)으로 처리하여 장애를 격리한다. - */ -@Service -public class NotificationDeliveryProcessor { - - private static final Logger LOG = LoggerFactory.getLogger(NotificationDeliveryProcessor.class); - - private final NotificationDeliveryRepository deliveryRepository; - private final NotificationRepository notificationRepository; - private final Map senderMap; - - public NotificationDeliveryProcessor( - NotificationDeliveryRepository deliveryRepository, - NotificationRepository notificationRepository, - List senders) { - this.deliveryRepository = deliveryRepository; - this.notificationRepository = notificationRepository; - - this.senderMap = new EnumMap<>(NotificationChannel.class); - for (NotificationChannelSender sender : senders) { - this.senderMap.put(sender.getChannel(), sender); - } - LOG.info("[DeliveryProcessor] Initialized with senders: {}", this.senderMap.keySet()); - } - - /** - * 단건 배달을 처리한다. - * REQUIRES_NEW 트랜잭션으로 실행되어 실패가 다른 건에 영향을 주지 않는다. - */ - @Transactional(propagation = Propagation.REQUIRES_NEW) - public void processDelivery(UUID deliveryId) { - // 1) 최신 상태로 다시 조회 (stale read 방지) - NotificationDelivery delivery = deliveryRepository.findById(deliveryId).orElse(null); - if (delivery == null) { - LOG.warn("[DeliveryProcessor] Delivery not found. deliveryId={}", deliveryId); - return; - } - if (delivery.getStatus() != DeliveryStatus.PENDING) { - LOG.debug("[DeliveryProcessor] Delivery already processed. deliveryId={}, status={}", - deliveryId, delivery.getStatus()); - return; - } - - // 2) IN_PROGRESS로 전환 (워커 점유) - delivery.markAsInProgress(); - deliveryRepository.saveAndFlush(delivery); - - // 3) 알림 원장 조회 - Notification notification = notificationRepository.findById(delivery.getNotificationId()).orElse(null); - if (notification == null) { - delivery.markAsPermanentlyFailed("Notification not found: " + delivery.getNotificationId()); - return; - } - - // 4) 채널별 발송기 조회 - NotificationChannelSender sender = senderMap.get(delivery.getChannel()); - if (sender == null || !sender.isAvailable()) { - delivery.recordFailure("No available sender for channel: " + delivery.getChannel()); - LOG.warn("[DeliveryProcessor] No sender for channel={}. deliveryId={}", - delivery.getChannel(), deliveryId); - return; - } - - // 5) 발송 시도 - try { - String providerMessageId = sender.send(notification); - delivery.markAsSent(providerMessageId); - LOG.debug("[DeliveryProcessor] Delivery sent. deliveryId={}, channel={}, providerId={}", - deliveryId, delivery.getChannel(), providerMessageId); - } catch (PermanentSendFailureException e) { - // 재시도 불가능 → 영구 실패 - delivery.markAsPermanentlyFailed(e.getMessage()); - LOG.warn("[DeliveryProcessor] Permanent failure. deliveryId={}, channel={}, reason={}", - deliveryId, delivery.getChannel(), e.getMessage()); - } catch (Exception e) { - // 일시적 실패 → 재시도 스케줄링 - delivery.recordFailure(e.getMessage()); - LOG.warn("[DeliveryProcessor] Transient failure. deliveryId={}, channel={}, attempt={}, reason={}", - deliveryId, delivery.getChannel(), delivery.getAttemptCount(), e.getMessage()); - } - } -} diff --git a/src/main/java/com/example/RealMatch/notification/application/service/NotificationOutboxService.java b/src/main/java/com/example/RealMatch/notification/application/service/NotificationOutboxService.java new file mode 100644 index 00000000..5d745e69 --- /dev/null +++ b/src/main/java/com/example/RealMatch/notification/application/service/NotificationOutboxService.java @@ -0,0 +1,130 @@ +package com.example.RealMatch.notification.application.service; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.domain.PageRequest; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import com.example.RealMatch.notification.domain.entity.NotificationOutbox; +import com.example.RealMatch.notification.domain.entity.enums.OutboxStatus; +import com.example.RealMatch.notification.domain.repository.NotificationOutboxRepository; +import com.example.RealMatch.user.domain.entity.enums.NotificationChannel; + +import lombok.RequiredArgsConstructor; + +/** + * Outbox DB 연산의 트랜잭션 경계를 제공한다. + * 각 메서드가 독립 TX로 실행 → MQ 발행이 TX 밖에서 호출됨을 보장. + */ +@Service +@RequiredArgsConstructor +public class NotificationOutboxService { + + private static final Logger LOG = LoggerFactory.getLogger(NotificationOutboxService.class); + + private final NotificationOutboxRepository outboxRepository; + + @Transactional(readOnly = true) + public List findPendingOutbox(int limit) { + return outboxRepository.findByStatusOrderByCreatedAtAsc( + OutboxStatus.PENDING, + PageRequest.of(0, limit)); + } + + /** PENDING → SENDING claim. */ + @Transactional + public boolean claimOutbox(UUID outboxId) { + int updated = outboxRepository.claimOutbox( + outboxId, OutboxStatus.SENDING, OutboxStatus.PENDING); + return updated > 0; + } + + /** SENDING → SENT. */ + @Transactional + public void markOutboxSent(UUID outboxId) { + int updated = outboxRepository.markAsSent( + outboxId, OutboxStatus.SENT, OutboxStatus.SENDING); + if (updated == 0) { + LOG.warn("[Outbox] Failed to mark SENT. outboxId={}", outboxId); + } + } + + /** 발행 실패 시 retryCount < MAX → PENDING, else FAILED. */ + @Transactional + public void markOutboxPublishFailed(UUID outboxId, String error) { + NotificationOutbox outbox = outboxRepository.findById(outboxId).orElse(null); + if (outbox == null) { + LOG.warn("[Outbox] Not found for failure recording. outboxId={}", outboxId); + return; + } + + OutboxStatus newStatus; + if (outbox.getRetryCount() + 1 >= NotificationOutbox.MAX_PUBLISH_RETRY) { + newStatus = OutboxStatus.FAILED; + LOG.error("[Outbox] Max publish retries exceeded. outboxId={}, deliveryId={}", + outboxId, outbox.getDeliveryId()); + } else { + newStatus = OutboxStatus.PENDING; + LOG.warn("[Outbox] Publish failed, will retry. outboxId={}, retryCount={}", + outboxId, outbox.getRetryCount() + 1); + } + + int updated = outboxRepository.markPublishFailed( + outboxId, + newStatus, + truncate(error, 500), + List.of(OutboxStatus.SENDING, OutboxStatus.PENDING)); + if (updated == 0) { + LOG.warn("[Outbox] markPublishFailed affected 0 rows (concurrent update). outboxId={}", + outboxId); + } + } + + /** Retry용 Outbox 생성. 이미 PENDING/SENDING 있으면 skip(중복 방지). */ + @Transactional + public void createRetryOutboxIfAbsent(UUID deliveryId, UUID notificationId, + NotificationChannel channel) { + boolean alreadyPending = outboxRepository.existsByDeliveryIdAndStatusIn( + deliveryId, + List.of(OutboxStatus.PENDING, OutboxStatus.SENDING)); + + if (alreadyPending) { + LOG.debug("[Outbox] Skip — pending outbox already exists. deliveryId={}", deliveryId); + return; + } + + NotificationOutbox outbox = NotificationOutbox.builder() + .deliveryId(deliveryId) + .notificationId(notificationId) + .channel(channel) + .build(); + outboxRepository.save(outbox); + LOG.debug("[Outbox] Created retry outbox. deliveryId={}, channel={}", deliveryId, channel); + } + + /** SENT/FAILED 중 retentionDays일 지난 레코드 삭제. */ + @Transactional + public int cleanupCompletedOutbox(int retentionDays) { + LocalDateTime before = LocalDateTime.now().minusDays(retentionDays); + int deleted = outboxRepository.deleteCompletedOutboxBefore( + List.of(OutboxStatus.SENT, OutboxStatus.FAILED), + before); + if (deleted > 0) { + LOG.info("[Outbox] Cleaned up {} completed entries older than {} days.", + deleted, retentionDays); + } + return deleted; + } + + private static String truncate(String value, int maxLength) { + if (value == null) { + return null; + } + return value.length() <= maxLength ? value : value.substring(0, maxLength); + } +} diff --git a/src/main/java/com/example/RealMatch/notification/application/service/NotificationService.java b/src/main/java/com/example/RealMatch/notification/application/service/NotificationService.java index 4df72300..417c29e7 100644 --- a/src/main/java/com/example/RealMatch/notification/application/service/NotificationService.java +++ b/src/main/java/com/example/RealMatch/notification/application/service/NotificationService.java @@ -15,15 +15,22 @@ import com.example.RealMatch.notification.application.dto.CreateNotificationCommand; import com.example.RealMatch.notification.domain.entity.Notification; import com.example.RealMatch.notification.domain.entity.NotificationDelivery; +import com.example.RealMatch.notification.domain.entity.NotificationOutbox; import com.example.RealMatch.notification.domain.entity.enums.DeliveryStatus; import com.example.RealMatch.notification.domain.entity.enums.NotificationKind; import com.example.RealMatch.notification.domain.repository.NotificationDeliveryRepository; +import com.example.RealMatch.notification.domain.repository.NotificationOutboxRepository; import com.example.RealMatch.notification.domain.repository.NotificationRepository; import com.example.RealMatch.notification.exception.NotificationErrorCode; import com.example.RealMatch.user.domain.entity.enums.NotificationChannel; import lombok.RequiredArgsConstructor; +/** + * 알림 생성 서비스 (Outbox 패턴). + * Notification + Delivery(PENDING) + Outbox(PENDING)를 한 트랜잭션에 원자적 저장. + * MQ 발행은 이 서비스에서 금지 — OutboxPublisher가 별도로 처리. + */ @Service @RequiredArgsConstructor @Transactional @@ -33,6 +40,7 @@ public class NotificationService { private final NotificationRepository notificationRepository; private final NotificationDeliveryRepository notificationDeliveryRepository; + private final NotificationOutboxRepository notificationOutboxRepository; private final NotificationChannelResolver channelResolver; @Transactional(propagation = Propagation.REQUIRES_NEW) @@ -50,39 +58,54 @@ public Notification create(CreateNotificationCommand command) { Notification savedNotification = notificationRepository.save(notification); - // NotificationKind에 맞는 채널만 PENDING 레코드 생성 - createPendingDeliveries(savedNotification.getId(), command.getKind(), command.getEventId(), command.getUserId()); + createPendingDeliveriesWithOutbox( + savedNotification.getId(), + command.getKind(), + command.getEventId(), + command.getUserId()); return savedNotification; } - private void createPendingDeliveries(UUID notificationId, NotificationKind kind, String eventId, Long receiverId) { + private void createPendingDeliveriesWithOutbox(UUID notificationId, NotificationKind kind, + String eventId, Long receiverId) { Set channels = channelResolver.resolveChannels(kind); for (NotificationChannel channel : channels) { String idempotencyKey = generateIdempotencyKey(eventId, kind, receiverId, channel); try { + // Delivery(PENDING) 생성 NotificationDelivery delivery = NotificationDelivery.builder() .notificationId(notificationId) .channel(channel) .status(DeliveryStatus.PENDING) .idempotencyKey(idempotencyKey) .build(); - notificationDeliveryRepository.save(delivery); - LOG.debug("[Notification] Created delivery. idempotencyKey={}, channel={}", idempotencyKey, channel); + NotificationDelivery savedDelivery = notificationDeliveryRepository.save(delivery); + + // Outbox(PENDING) 생성 — 같은 TX + NotificationOutbox outbox = NotificationOutbox.builder() + .deliveryId(savedDelivery.getId()) + .notificationId(notificationId) + .channel(channel) + .build(); + notificationOutboxRepository.save(outbox); + + LOG.debug("[Notification] Created delivery + outbox. idempotencyKey={}, channel={}", + idempotencyKey, channel); } catch (DataIntegrityViolationException e) { - // unique 제약 충돌 = 이미 생성된 것 (멱등성 보장) if (e.getCause() instanceof ConstraintViolationException) { LOG.debug("[Notification] Delivery already exists (idempotent). idempotencyKey={}, channel={}", idempotencyKey, channel); } else { - throw e; // 다른 데이터 무결성 오류는 재발생 + throw e; } } } } - private String generateIdempotencyKey(String eventId, NotificationKind kind, Long receiverId, NotificationChannel channel) { + private String generateIdempotencyKey(String eventId, NotificationKind kind, + Long receiverId, NotificationChannel channel) { return String.format("%s:%s:%d:%s", eventId, kind, receiverId, channel); } diff --git a/src/main/java/com/example/RealMatch/notification/domain/entity/NotificationDelivery.java b/src/main/java/com/example/RealMatch/notification/domain/entity/NotificationDelivery.java index 3f68605c..90075018 100644 --- a/src/main/java/com/example/RealMatch/notification/domain/entity/NotificationDelivery.java +++ b/src/main/java/com/example/RealMatch/notification/domain/entity/NotificationDelivery.java @@ -81,7 +81,7 @@ protected NotificationDelivery(UUID notificationId, NotificationChannel channel, this.attemptCount = 0; } - /** 발송 성공 */ + /** 발송 성공. IN_PROGRESS → SENT. */ public void markAsSent(String providerMessageId) { this.status = DeliveryStatus.SENT; this.sentAt = LocalDateTime.now(); @@ -89,16 +89,10 @@ public void markAsSent(String providerMessageId) { this.nextRetryAt = null; } - /** 발송 진행 중 (워커가 점유) */ - public void markAsInProgress() { - this.status = DeliveryStatus.IN_PROGRESS; - this.attemptedAt = LocalDateTime.now(); - } - /** * 발송 실패 기록 + 재시도 스케줄링. - * attemptCount < MAX_RETRY_COUNT → PENDING + nextRetryAt 설정 - * attemptCount ≥ MAX_RETRY_COUNT → FAILED (영구 보관) + * attemptCount < MAX_RETRY_COUNT → RETRY + nextRetryAt 설정 (backoff) + * attemptCount ≥ MAX_RETRY_COUNT → FAILED (영구 보관, DLQ 대상) */ public void recordFailure(String failReason) { this.failReason = truncate(failReason, 500); @@ -108,23 +102,19 @@ public void recordFailure(String failReason) { this.status = DeliveryStatus.FAILED; this.nextRetryAt = null; } else { - this.status = DeliveryStatus.PENDING; + this.status = DeliveryStatus.RETRY; int backoffIndex = Math.min(this.attemptCount - 1, BACKOFF_MINUTES.length - 1); this.nextRetryAt = LocalDateTime.now().plusMinutes(BACKOFF_MINUTES[backoffIndex]); } } - /** 영구 실패 처리 */ + /** 영구 실패 처리. IN_PROGRESS → FAILED. */ public void markAsPermanentlyFailed(String failReason) { this.status = DeliveryStatus.FAILED; this.failReason = truncate(failReason, 500); this.nextRetryAt = null; } - public boolean isRetryable() { - return this.attemptCount < MAX_RETRY_COUNT; - } - private static String truncate(String value, int maxLength) { if (value == null) { return null; diff --git a/src/main/java/com/example/RealMatch/notification/domain/entity/NotificationOutbox.java b/src/main/java/com/example/RealMatch/notification/domain/entity/NotificationOutbox.java new file mode 100644 index 00000000..4c715988 --- /dev/null +++ b/src/main/java/com/example/RealMatch/notification/domain/entity/NotificationOutbox.java @@ -0,0 +1,76 @@ +package com.example.RealMatch.notification.domain.entity; + +import java.util.UUID; + +import com.example.RealMatch.global.common.BaseEntity; +import com.example.RealMatch.notification.domain.entity.enums.OutboxStatus; +import com.example.RealMatch.user.domain.entity.enums.NotificationChannel; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.EnumType; +import jakarta.persistence.Enumerated; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.Index; +import jakarta.persistence.Table; +import lombok.AccessLevel; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; + +/** + * Outbox 패턴의 핵심 엔티티. + * + *

Notification + NotificationDelivery와 같은 트랜잭션에서 저장되며, + * 트랜잭션 커밋 이후 별도 스케줄러(OutboxPublisher)가 이 레코드를 조회하여 + * RabbitMQ로 발행한다. + * + *

이 구조에 의해 "DB 저장은 성공했지만 MQ 발행이 누락"되는 시나리오를 방지한다. + */ +@Entity +@Table(name = "notification_outbox", indexes = { + @Index(name = "idx_outbox_status_created", columnList = "status, created_at") +}) +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class NotificationOutbox extends BaseEntity { + + public static final int MAX_PUBLISH_RETRY = 10; + + @Id + @GeneratedValue(strategy = GenerationType.UUID) + @Column(columnDefinition = "BINARY(16)") + private UUID id; + + @Column(name = "delivery_id", nullable = false, columnDefinition = "BINARY(16)") + private UUID deliveryId; + + @Column(name = "notification_id", nullable = false, columnDefinition = "BINARY(16)") + private UUID notificationId; + + @Enumerated(EnumType.STRING) + @Column(name = "channel", nullable = false, length = 30) + private NotificationChannel channel; + + @Enumerated(EnumType.STRING) + @Column(name = "status", nullable = false, length = 20) + private OutboxStatus status; + + @Column(name = "retry_count", nullable = false) + private int retryCount; + + @Column(name = "last_error", length = 500) + private String lastError; + + @Builder + protected NotificationOutbox(UUID deliveryId, UUID notificationId, + NotificationChannel channel) { + this.deliveryId = deliveryId; + this.notificationId = notificationId; + this.channel = channel; + this.status = OutboxStatus.PENDING; + this.retryCount = 0; + } +} diff --git a/src/main/java/com/example/RealMatch/notification/domain/entity/enums/DeliveryStatus.java b/src/main/java/com/example/RealMatch/notification/domain/entity/enums/DeliveryStatus.java index cf6054a4..6a2984e5 100644 --- a/src/main/java/com/example/RealMatch/notification/domain/entity/enums/DeliveryStatus.java +++ b/src/main/java/com/example/RealMatch/notification/domain/entity/enums/DeliveryStatus.java @@ -1,8 +1,9 @@ package com.example.RealMatch.notification.domain.entity.enums; public enum DeliveryStatus { - PENDING, // 발송 대기 - IN_PROGRESS, // 발송 진행 중 (Phase 3에서 워커가 작업 점유 시 사용) + PENDING, // 발송 대기 (최초 생성) + IN_PROGRESS, // 발송 진행 중 (Consumer가 claim 점유) SENT, // 발송 성공 - FAILED // 발송 실패 + RETRY, // 재시도 대기 (일시적 실패 후 backoff 대기) + FAILED // 발송 최종 실패 (재시도 불가) } diff --git a/src/main/java/com/example/RealMatch/notification/domain/entity/enums/OutboxStatus.java b/src/main/java/com/example/RealMatch/notification/domain/entity/enums/OutboxStatus.java new file mode 100644 index 00000000..5c46aff8 --- /dev/null +++ b/src/main/java/com/example/RealMatch/notification/domain/entity/enums/OutboxStatus.java @@ -0,0 +1,20 @@ +package com.example.RealMatch.notification.domain.entity.enums; + +/** + * NotificationOutbox의 MQ 발행 상태 + * + *

상태 전이: + *

+ * PENDING ──claim──▶ SENDING ──발행 성공──▶ SENT
+ *    ▲                  │
+ *    └──발행 실패────────┘
+ *                       │
+ *                       └──retryCount ≥ MAX──▶ FAILED
+ * 
+ */ +public enum OutboxStatus { + PENDING, // 발행 대기 + SENDING, // 발행 진행 중 (claim 완료) + SENT, // MQ 발행 성공 + FAILED // MQ 발행 최종 실패 (운영자 개입 필요) +} diff --git a/src/main/java/com/example/RealMatch/notification/domain/repository/NotificationDeliveryRepository.java b/src/main/java/com/example/RealMatch/notification/domain/repository/NotificationDeliveryRepository.java index f9745817..e634f17b 100644 --- a/src/main/java/com/example/RealMatch/notification/domain/repository/NotificationDeliveryRepository.java +++ b/src/main/java/com/example/RealMatch/notification/domain/repository/NotificationDeliveryRepository.java @@ -1,8 +1,8 @@ package com.example.RealMatch.notification.domain.repository; import java.time.LocalDateTime; +import java.util.Collection; import java.util.List; -import java.util.Optional; import java.util.UUID; import org.springframework.data.domain.Pageable; @@ -13,12 +13,21 @@ import com.example.RealMatch.notification.domain.entity.NotificationDelivery; import com.example.RealMatch.notification.domain.entity.enums.DeliveryStatus; -import com.example.RealMatch.user.domain.entity.enums.NotificationChannel; public interface NotificationDeliveryRepository extends JpaRepository { - Optional findByNotificationIdAndChannel(UUID notificationId, NotificationChannel channel); + /** 조건절 UPDATE로 claim. 1이면 성공, 0이면 이미 처리 중/완료. */ + @Modifying(clearAutomatically = true, flushAutomatically = true) + @Query("UPDATE NotificationDelivery nd " + + "SET nd.status = :targetStatus, nd.attemptedAt = :now " + + "WHERE nd.id = :id AND nd.status IN :sourceStatuses") + int claimDelivery( + @Param("id") UUID id, + @Param("targetStatus") DeliveryStatus targetStatus, + @Param("now") LocalDateTime now, + @Param("sourceStatuses") Collection sourceStatuses); + /** RETRY + backoff 만료된 delivery 조회. RecoveryScheduler용. */ @Query("SELECT nd FROM NotificationDelivery nd " + "WHERE nd.status = :status " + "AND (nd.nextRetryAt IS NULL OR nd.nextRetryAt <= :now) " @@ -28,11 +37,28 @@ List findRetryableDeliveries( @Param("now") LocalDateTime now, Pageable pageable); + /** stuck IN_PROGRESS → RETRY 복구. Consumer crash 대비. */ @Modifying(clearAutomatically = true) - @Query("UPDATE NotificationDelivery nd SET nd.status = :newStatus, nd.nextRetryAt = null " + @Query("UPDATE NotificationDelivery nd " + + "SET nd.status = :newStatus, nd.nextRetryAt = null " + "WHERE nd.status = :stuckStatus AND nd.attemptedAt <= :stuckBefore") int recoverStuckDeliveries( @Param("stuckStatus") DeliveryStatus stuckStatus, @Param("newStatus") DeliveryStatus newStatus, @Param("stuckBefore") LocalDateTime stuckBefore); + + /** 활성 Outbox 없는 고아 PENDING delivery 조회. Outbox FAILED 후 복구용. */ + @Query("SELECT nd FROM NotificationDelivery nd " + + "WHERE nd.status = :status " + + "AND nd.createdAt <= :orphanBefore " + + "AND NOT EXISTS (" + + " SELECT 1 FROM NotificationOutbox o " + + " WHERE o.deliveryId = nd.id " + + " AND o.status IN ('PENDING', 'SENDING')" + + ") " + + "ORDER BY nd.createdAt ASC") + List findOrphanedPendingDeliveries( + @Param("status") DeliveryStatus status, + @Param("orphanBefore") LocalDateTime orphanBefore, + Pageable pageable); } diff --git a/src/main/java/com/example/RealMatch/notification/domain/repository/NotificationOutboxRepository.java b/src/main/java/com/example/RealMatch/notification/domain/repository/NotificationOutboxRepository.java new file mode 100644 index 00000000..8faaf72f --- /dev/null +++ b/src/main/java/com/example/RealMatch/notification/domain/repository/NotificationOutboxRepository.java @@ -0,0 +1,80 @@ +package com.example.RealMatch.notification.domain.repository; + +import java.time.LocalDateTime; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; + +import com.example.RealMatch.notification.domain.entity.NotificationOutbox; +import com.example.RealMatch.notification.domain.entity.enums.OutboxStatus; + +public interface NotificationOutboxRepository extends JpaRepository { + + /** PENDING Outbox 조회 (생성일 ASC). */ + @Query("SELECT o FROM NotificationOutbox o " + + "WHERE o.status = :status " + + "ORDER BY o.createdAt ASC") + List findByStatusOrderByCreatedAtAsc( + @Param("status") OutboxStatus status, + Pageable pageable); + + /** PENDING → SENDING claim. 1이면 성공. */ + @Modifying(clearAutomatically = true, flushAutomatically = true) + @Query("UPDATE NotificationOutbox o " + + "SET o.status = :targetStatus " + + "WHERE o.id = :id AND o.status = :sourceStatus") + int claimOutbox( + @Param("id") UUID id, + @Param("targetStatus") OutboxStatus targetStatus, + @Param("sourceStatus") OutboxStatus sourceStatus); + + /** SENDING → SENT. */ + @Modifying(clearAutomatically = true) + @Query("UPDATE NotificationOutbox o " + + "SET o.status = :targetStatus " + + "WHERE o.id = :id AND o.status = :sourceStatus") + int markAsSent( + @Param("id") UUID id, + @Param("targetStatus") OutboxStatus targetStatus, + @Param("sourceStatus") OutboxStatus sourceStatus); + + /** 발행 실패: retryCount++ 및 상태 전이. */ + @Modifying(clearAutomatically = true) + @Query("UPDATE NotificationOutbox o " + + "SET o.status = :newStatus, " + + "o.retryCount = o.retryCount + 1, " + + "o.lastError = :lastError " + + "WHERE o.id = :id AND o.status IN :sourceStatuses") + int markPublishFailed( + @Param("id") UUID id, + @Param("newStatus") OutboxStatus newStatus, + @Param("lastError") String lastError, + @Param("sourceStatuses") Collection sourceStatuses); + + /** 미처리 Outbox(PENDING/SENDING) 존재 여부. 중복 생성 방지용. */ + boolean existsByDeliveryIdAndStatusIn(UUID deliveryId, Collection statuses); + + /** stuck SENDING → PENDING 복구. */ + @Modifying(clearAutomatically = true) + @Query("UPDATE NotificationOutbox o " + + "SET o.status = :newStatus " + + "WHERE o.status = :stuckStatus AND o.updatedAt <= :stuckBefore") + int recoverStuckOutbox( + @Param("stuckStatus") OutboxStatus stuckStatus, + @Param("newStatus") OutboxStatus newStatus, + @Param("stuckBefore") LocalDateTime stuckBefore); + + /** 완료(SENT/FAILED)된 오래된 Outbox 삭제. */ + @Modifying(clearAutomatically = true) + @Query("DELETE FROM NotificationOutbox o " + + "WHERE o.status IN :statuses AND o.updatedAt <= :before") + int deleteCompletedOutboxBefore( + @Param("statuses") Collection statuses, + @Param("before") LocalDateTime before); +} diff --git a/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryConsumer.java b/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryConsumer.java new file mode 100644 index 00000000..6ad25871 --- /dev/null +++ b/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryConsumer.java @@ -0,0 +1,127 @@ +package com.example.RealMatch.notification.infrastructure.messaging; + +import java.io.IOException; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +import com.example.RealMatch.notification.application.service.NotificationDeliveryClaimService; +import com.example.RealMatch.notification.domain.entity.Notification; +import com.example.RealMatch.notification.domain.repository.NotificationRepository; +import com.example.RealMatch.notification.infrastructure.sender.NotificationChannelSender; +import com.example.RealMatch.notification.infrastructure.sender.PermanentSendFailureException; +import com.example.RealMatch.user.domain.entity.enums.NotificationChannel; +import com.rabbitmq.client.Channel; + +/** + * MQ Consumer. claim → 발송(FCM/Email) → 결과 기록 순서를 엄격히 따른다. + * - claim 실패(0 row) → ACK 후 종료 (중복 발송 방지) + * - 예외 → NACK(requeue=false) → DLQ + * - @Transactional 없음 — 외부 API가 TX 안에서 실행되는 것을 방지 + */ +@Component +public class NotificationDeliveryConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(NotificationDeliveryConsumer.class); + + private final NotificationDeliveryClaimService claimService; + private final NotificationRepository notificationRepository; + private final Map senderMap; + + public NotificationDeliveryConsumer( + NotificationDeliveryClaimService claimService, + NotificationRepository notificationRepository, + List senders) { + this.claimService = claimService; + this.notificationRepository = notificationRepository; + this.senderMap = new EnumMap<>(NotificationChannel.class); + for (NotificationChannelSender sender : senders) { + this.senderMap.put(sender.getChannel(), sender); + } + LOG.info("[DeliveryConsumer] Initialized with senders: {}", this.senderMap.keySet()); + } + + @RabbitListener( + queues = RabbitMqConfig.NOTIFICATION_QUEUE, + containerFactory = "rabbitListenerContainerFactory" + ) + public void handleDelivery(NotificationDeliveryMessage message, + Channel channel, + @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { + LOG.debug("[DeliveryConsumer] Received message. {}", message); + + try { + processMessage(message); + channel.basicAck(deliveryTag, false); // 정상 처리 완료 → ACK + } catch (Exception e) { + // 예상치 못한 예외 → NACK(requeue=false) → DLQ + LOG.error("[DeliveryConsumer] Unexpected error. Sending to DLQ. message={}, error={}", + message, e.getMessage(), e); + safeNack(channel, deliveryTag); + } + } + + private void processMessage(NotificationDeliveryMessage message) { + UUID deliveryId = UUID.fromString(message.getDeliveryId()); + UUID notificationId = UUID.fromString(message.getNotificationId()); + NotificationChannel notificationChannel = NotificationChannel.valueOf(message.getChannel()); + + // Step 1: 조건절 UPDATE로 claim (PENDING/RETRY → IN_PROGRESS) + boolean claimed = claimService.claimDelivery(deliveryId); + if (!claimed) { + LOG.debug("[DeliveryConsumer] Already processed. deliveryId={}", deliveryId); + return; + } + + // Step 2: 알림 원장 + 발송기 조회 + Notification notification = notificationRepository.findById(notificationId).orElse(null); + if (notification == null) { + claimService.markPermanentlyFailed(deliveryId, + "Notification not found: " + notificationId); + return; + } + + NotificationChannelSender sender = senderMap.get(notificationChannel); + if (sender == null || !sender.isAvailable()) { + claimService.recordFailure(deliveryId, + "No available sender for channel: " + notificationChannel); + LOG.warn("[DeliveryConsumer] No sender for channel={}. deliveryId={}", + notificationChannel, deliveryId); + return; + } + + // Step 3: 외부 API 발송 (TX 밖) → 결과 기록 + try { + String providerMessageId = sender.send(notification); + claimService.markSent(deliveryId, providerMessageId); + LOG.info("[DeliveryConsumer] Sent. deliveryId={}, channel={}, providerId={}", + deliveryId, notificationChannel, providerMessageId); + + } catch (PermanentSendFailureException e) { + claimService.markPermanentlyFailed(deliveryId, e.getMessage()); + LOG.warn("[DeliveryConsumer] Permanent failure. deliveryId={}, channel={}, reason={}", + deliveryId, notificationChannel, e.getMessage()); + + } catch (Exception e) { + claimService.recordFailure(deliveryId, e.getMessage()); + LOG.warn("[DeliveryConsumer] Transient failure. deliveryId={}, channel={}, reason={}", + deliveryId, notificationChannel, e.getMessage()); + } + } + + private void safeNack(Channel channel, long deliveryTag) { + try { + channel.basicNack(deliveryTag, false, false); + } catch (IOException e) { + LOG.error("[DeliveryConsumer] Failed to NACK. deliveryTag={}", deliveryTag, e); + } + } +} diff --git a/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryMessage.java b/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryMessage.java new file mode 100644 index 00000000..0c7944b6 --- /dev/null +++ b/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryMessage.java @@ -0,0 +1,48 @@ +package com.example.RealMatch.notification.infrastructure.messaging; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * RabbitMQ를 통해 전달되는 알림 배달 메시지 + * + *

OutboxPublisher가 발행하고 DeliveryConsumer가 소비한다. + * JSON 직렬화/역직렬화를 위해 불변 객체로 설계한다. + */ +public class NotificationDeliveryMessage { + + private final String deliveryId; + private final String notificationId; + private final String channel; + + @JsonCreator + public NotificationDeliveryMessage( + @JsonProperty("deliveryId") String deliveryId, + @JsonProperty("notificationId") String notificationId, + @JsonProperty("channel") String channel) { + this.deliveryId = deliveryId; + this.notificationId = notificationId; + this.channel = channel; + } + + public String getDeliveryId() { + return deliveryId; + } + + public String getNotificationId() { + return notificationId; + } + + public String getChannel() { + return channel; + } + + @Override + public String toString() { + return "NotificationDeliveryMessage{" + + "deliveryId='" + deliveryId + '\'' + + ", notificationId='" + notificationId + '\'' + + ", channel='" + channel + '\'' + + '}'; + } +} diff --git a/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationOutboxPublisher.java b/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationOutboxPublisher.java new file mode 100644 index 00000000..b862bb3b --- /dev/null +++ b/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationOutboxPublisher.java @@ -0,0 +1,79 @@ +package com.example.RealMatch.notification.infrastructure.messaging; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import com.example.RealMatch.notification.application.service.NotificationOutboxService; +import com.example.RealMatch.notification.domain.entity.NotificationOutbox; + +import lombok.RequiredArgsConstructor; + +/** + * Outbox Publisher. 5초마다 PENDING Outbox를 claim → MQ 발행. + * - @Transactional 없음 — MQ 발행이 TX 밖에서 실행됨을 구조적으로 보장 + * - DB 연산은 NotificationOutboxService를 통해 메서드별 독립 TX + */ +@Component +@RequiredArgsConstructor +public class NotificationOutboxPublisher { + + private static final Logger LOG = LoggerFactory.getLogger(NotificationOutboxPublisher.class); + private static final int BATCH_SIZE = 50; + + private final NotificationOutboxService outboxService; + private final RabbitTemplate rabbitTemplate; + + @Scheduled(fixedDelay = 5_000, initialDelay = 5_000) + public void publishPendingOutbox() { + List pendingList = outboxService.findPendingOutbox(BATCH_SIZE); + + if (pendingList.isEmpty()) { + return; + } + + LOG.info("[OutboxPublisher] Found {} pending outbox entries.", pendingList.size()); + + for (NotificationOutbox outbox : pendingList) { + publishWithClaimGuard(outbox); + } + } + + private void publishWithClaimGuard(NotificationOutbox outbox) { + // PENDING → SENDING claim (조건절 UPDATE) + boolean claimed = outboxService.claimOutbox(outbox.getId()); + if (!claimed) { + LOG.debug("[OutboxPublisher] Claim failed (already processing). outboxId={}", outbox.getId()); + return; + } + + try { + NotificationDeliveryMessage message = new NotificationDeliveryMessage( + outbox.getDeliveryId().toString(), + outbox.getNotificationId().toString(), + outbox.getChannel().name()); + + // MQ 발행 — TX 밖에서 실행 + rabbitTemplate.convertAndSend( + RabbitMqConfig.NOTIFICATION_EXCHANGE, + RabbitMqConfig.NOTIFICATION_ROUTING_KEY, + message); + + // 성공 → SENDING → SENT + outboxService.markOutboxSent(outbox.getId()); + + LOG.debug("[OutboxPublisher] Published. outboxId={}, deliveryId={}", + outbox.getId(), outbox.getDeliveryId()); + + } catch (Exception e) { + // 실패 → retryCount++ / PENDING 또는 FAILED + outboxService.markOutboxPublishFailed(outbox.getId(), e.getMessage()); + LOG.error("[OutboxPublisher] Publish failed. outboxId={}, deliveryId={}, error={}", + outbox.getId(), outbox.getDeliveryId(), e.getMessage()); + } + } +} diff --git a/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationRecoveryScheduler.java b/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationRecoveryScheduler.java new file mode 100644 index 00000000..edbee28f --- /dev/null +++ b/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationRecoveryScheduler.java @@ -0,0 +1,145 @@ +package com.example.RealMatch.notification.infrastructure.messaging; + +import java.time.LocalDateTime; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.domain.PageRequest; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import com.example.RealMatch.notification.application.service.NotificationOutboxService; +import com.example.RealMatch.notification.domain.entity.NotificationDelivery; +import com.example.RealMatch.notification.domain.entity.enums.DeliveryStatus; +import com.example.RealMatch.notification.domain.entity.enums.OutboxStatus; +import com.example.RealMatch.notification.domain.repository.NotificationDeliveryRepository; +import com.example.RealMatch.notification.domain.repository.NotificationOutboxRepository; + +import lombok.RequiredArgsConstructor; + +/** + * 장애 복구 스케줄러. + * + *

복구 대상: + * 1. Stuck IN_PROGRESS Delivery → RETRY (10분 임계) + * 2. Due RETRY Delivery → 새 Outbox 생성 (중복 방지) + * 3. Stuck SENDING Outbox → PENDING (5분 임계) + * 4. Orphaned PENDING Delivery → 새 Outbox 생성 (활성 Outbox 없는 고아) + * 5. Completed Outbox Cleanup (7일 보관) + */ +@Component +@RequiredArgsConstructor +public class NotificationRecoveryScheduler { + + private static final Logger LOG = LoggerFactory.getLogger(NotificationRecoveryScheduler.class); + + private static final int DELIVERY_STUCK_THRESHOLD_MINUTES = 10; + private static final int OUTBOX_STUCK_THRESHOLD_MINUTES = 5; + private static final int ORPHAN_THRESHOLD_MINUTES = 30; + private static final int RETRY_BATCH_SIZE = 50; + private static final int OUTBOX_RETENTION_DAYS = 7; + + private final NotificationDeliveryRepository deliveryRepository; + private final NotificationOutboxRepository outboxRepository; + private final NotificationOutboxService outboxService; + + /** Consumer crash 복구: 10분 이상 IN_PROGRESS → RETRY */ + @Transactional + @Scheduled(fixedDelay = 300_000, initialDelay = 60_000) + public void recoverStuckDeliveries() { + LocalDateTime stuckBefore = LocalDateTime.now() + .minusMinutes(DELIVERY_STUCK_THRESHOLD_MINUTES); + + int recovered = deliveryRepository.recoverStuckDeliveries( + DeliveryStatus.IN_PROGRESS, + DeliveryStatus.RETRY, + stuckBefore); + + if (recovered > 0) { + LOG.warn("[Recovery] Recovered {} stuck IN_PROGRESS deliveries → RETRY.", recovered); + } + } + + /** backoff 만료된 RETRY delivery → 새 Outbox 생성하여 MQ 재발행 */ + @Scheduled(fixedDelay = 60_000, initialDelay = 30_000) + public void reprocessRetryDeliveries() { + List retryList = deliveryRepository.findRetryableDeliveries( + DeliveryStatus.RETRY, + LocalDateTime.now(), + PageRequest.of(0, RETRY_BATCH_SIZE)); + + if (retryList.isEmpty()) { + return; + } + + LOG.info("[Recovery] Found {} RETRY deliveries due for reprocessing.", retryList.size()); + + for (NotificationDelivery delivery : retryList) { + try { + outboxService.createRetryOutboxIfAbsent( + delivery.getId(), + delivery.getNotificationId(), + delivery.getChannel()); + } catch (Exception e) { + LOG.error("[Recovery] Failed to create retry outbox. deliveryId={}", + delivery.getId(), e); + } + } + } + + /** Publisher crash 복구: 5분 이상 SENDING → PENDING */ + @Transactional + @Scheduled(fixedDelay = 300_000, initialDelay = 90_000) + public void recoverStuckOutbox() { + LocalDateTime stuckBefore = LocalDateTime.now() + .minusMinutes(OUTBOX_STUCK_THRESHOLD_MINUTES); + + int recovered = outboxRepository.recoverStuckOutbox( + OutboxStatus.SENDING, + OutboxStatus.PENDING, + stuckBefore); + + if (recovered > 0) { + LOG.warn("[Recovery] Recovered {} stuck SENDING outbox → PENDING.", recovered); + } + } + + /** Outbox FAILED 후 PENDING으로 남은 고아 delivery 복구 (30분 threshold) */ + @Scheduled(fixedDelay = 600_000, initialDelay = 180_000) + public void recoverOrphanedPendingDeliveries() { + LocalDateTime orphanBefore = LocalDateTime.now() + .minusMinutes(ORPHAN_THRESHOLD_MINUTES); + + List orphanList = deliveryRepository.findOrphanedPendingDeliveries( + DeliveryStatus.PENDING, + orphanBefore, + PageRequest.of(0, RETRY_BATCH_SIZE)); + + if (orphanList.isEmpty()) { + return; + } + + LOG.warn("[Recovery] Found {} orphaned PENDING deliveries (no active outbox).", + orphanList.size()); + + for (NotificationDelivery delivery : orphanList) { + try { + outboxService.createRetryOutboxIfAbsent( + delivery.getId(), + delivery.getNotificationId(), + delivery.getChannel()); + } catch (Exception e) { + LOG.error("[Recovery] Failed to create outbox for orphaned delivery. deliveryId={}", + delivery.getId(), e); + } + } + } + + /** 7일 지난 SENT/FAILED Outbox 삭제 — 테이블 무한 성장 방지 */ + @Scheduled(fixedDelay = 3_600_000, initialDelay = 120_000) + public void cleanupCompletedOutbox() { + outboxService.cleanupCompletedOutbox(OUTBOX_RETENTION_DAYS); + } +} diff --git a/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/RabbitMqConfig.java b/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/RabbitMqConfig.java new file mode 100644 index 00000000..005932bd --- /dev/null +++ b/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/RabbitMqConfig.java @@ -0,0 +1,130 @@ +package com.example.RealMatch.notification.infrastructure.messaging; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * RabbitMQ 인프라 설정 + * + *

토폴로지: + *

+ * notification.exchange ──routing──▶ notification.delivery.queue
+ *                                        │ (x-dead-letter-exchange)
+ *                                        ▼
+ * notification.dlx ──────routing──▶ notification.delivery.dlq
+ * 
+ * + *

설계 원칙: + *

    + *
  • Manual ACK: Consumer가 처리 완료/실패를 명시적으로 결정
  • + *
  • DLQ: 처리 불가 메시지를 Dead Letter Queue로 격리
  • + *
  • JSON 직렬화: 메시지 디버깅과 모니터링 용이
  • + *
  • prefetch=10: Consumer가 한 번에 가져오는 메시지 수 제한
  • + *
+ */ +@Configuration +public class RabbitMqConfig { + + public static final String NOTIFICATION_EXCHANGE = "notification.exchange"; + public static final String NOTIFICATION_QUEUE = "notification.delivery.queue"; + public static final String NOTIFICATION_ROUTING_KEY = "notification.delivery"; + + public static final String NOTIFICATION_DLX = "notification.dlx"; + public static final String NOTIFICATION_DLQ = "notification.delivery.dlq"; + public static final String NOTIFICATION_DLQ_ROUTING_KEY = "notification.delivery.dead"; + + // ==================== Exchange ==================== + + @Bean + DirectExchange notificationExchange() { + return new DirectExchange(NOTIFICATION_EXCHANGE, true, false); + } + + @Bean + DirectExchange notificationDlx() { + return new DirectExchange(NOTIFICATION_DLX, true, false); + } + + // ==================== Queue ==================== + + /** + * 메인 큐: DLX 설정으로 처리 불가 메시지 자동 이동. + */ + @Bean + Queue notificationQueue() { + return QueueBuilder.durable(NOTIFICATION_QUEUE) + .withArgument("x-dead-letter-exchange", NOTIFICATION_DLX) + .withArgument("x-dead-letter-routing-key", NOTIFICATION_DLQ_ROUTING_KEY) + .build(); + } + + /** + * Dead Letter Queue: 최종 실패 메시지 보관. 운영팀 모니터링 대상. + */ + @Bean + Queue notificationDlq() { + return QueueBuilder.durable(NOTIFICATION_DLQ).build(); + } + + // ==================== Binding ==================== + + @Bean + Binding notificationBinding(Queue notificationQueue, DirectExchange notificationExchange) { + return BindingBuilder.bind(notificationQueue) + .to(notificationExchange) + .with(NOTIFICATION_ROUTING_KEY); + } + + @Bean + Binding notificationDlqBinding(Queue notificationDlq, DirectExchange notificationDlx) { + return BindingBuilder.bind(notificationDlq) + .to(notificationDlx) + .with(NOTIFICATION_DLQ_ROUTING_KEY); + } + + // ==================== Converter & Template ==================== + + @Bean + MessageConverter jackson2JsonMessageConverter() { + return new Jackson2JsonMessageConverter(); + } + + @Bean + RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, + MessageConverter jackson2JsonMessageConverter) { + RabbitTemplate template = new RabbitTemplate(connectionFactory); + template.setMessageConverter(jackson2JsonMessageConverter); + template.setChannelTransacted(false); + return template; + } + + // ==================== Listener Container ==================== + + /** + * Manual ACK 모드 컨테이너 팩토리. + * Consumer가 basicAck/basicNack를 직접 호출하여 + * 메시지 유실이나 무한 redelivery를 방지한다. + */ + @Bean + SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( + ConnectionFactory connectionFactory, + MessageConverter jackson2JsonMessageConverter) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory); + factory.setMessageConverter(jackson2JsonMessageConverter); + factory.setAcknowledgeMode(org.springframework.amqp.core.AcknowledgeMode.MANUAL); + factory.setPrefetchCount(10); + factory.setDefaultRequeueRejected(false); + return factory; + } +} diff --git a/src/main/java/com/example/RealMatch/notification/infrastructure/sender/NotificationChannelSender.java b/src/main/java/com/example/RealMatch/notification/infrastructure/sender/NotificationChannelSender.java index 201ef80c..54ada58d 100644 --- a/src/main/java/com/example/RealMatch/notification/infrastructure/sender/NotificationChannelSender.java +++ b/src/main/java/com/example/RealMatch/notification/infrastructure/sender/NotificationChannelSender.java @@ -6,7 +6,7 @@ /** * 채널별 알림 발송 전략 인터페이스. * 각 구현체(FCM, Email)는 이 인터페이스를 구현하고, - * NotificationDeliveryProcessor에서 채널에 맞는 구현체를 선택하여 발송한다. + * NotificationDeliveryConsumer에서 채널에 맞는 구현체를 선택하여 발송한다. */ public interface NotificationChannelSender { diff --git a/src/main/java/com/example/RealMatch/notification/infrastructure/worker/NotificationDeliveryWorker.java b/src/main/java/com/example/RealMatch/notification/infrastructure/worker/NotificationDeliveryWorker.java deleted file mode 100644 index 4e86c2f0..00000000 --- a/src/main/java/com/example/RealMatch/notification/infrastructure/worker/NotificationDeliveryWorker.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.example.RealMatch.notification.infrastructure.worker; - -import java.time.LocalDateTime; -import java.util.List; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.data.domain.PageRequest; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; - -import com.example.RealMatch.notification.application.service.NotificationDeliveryProcessor; -import com.example.RealMatch.notification.domain.entity.NotificationDelivery; -import com.example.RealMatch.notification.domain.entity.enums.DeliveryStatus; -import com.example.RealMatch.notification.domain.repository.NotificationDeliveryRepository; - -import lombok.RequiredArgsConstructor; - -/** - * PENDING 상태의 NotificationDelivery를 주기적으로 가져와 발송하는 스케줄러 워커. - * - *

설계 원칙: - *

    - *
  • 외부 I/O(FCM, Email)는 이 워커에서만 수행한다 (리스너에서 직접 호출 금지)
  • - *
  • 각 delivery는 독립 트랜잭션으로 처리하여 장애를 격리한다
  • - *
  • 재시도는 exponential backoff(1m, 5m, 30m, 2h, 12h) 기반
  • - *
  • 워커 비정상 종료 시 IN_PROGRESS 상태 건은 별도 복구 로직으로 처리한다
  • - *
- */ -@Component -@RequiredArgsConstructor -public class NotificationDeliveryWorker { - - private static final Logger LOG = LoggerFactory.getLogger(NotificationDeliveryWorker.class); - - private static final int BATCH_SIZE = 50; - - private static final int STUCK_THRESHOLD_MINUTES = 10; - - private final NotificationDeliveryRepository deliveryRepository; - private final NotificationDeliveryProcessor deliveryProcessor; - - @Scheduled(fixedDelay = 30_000, initialDelay = 10_000) - public void processPendingDeliveries() { - List pendingBatch = deliveryRepository.findRetryableDeliveries( - DeliveryStatus.PENDING, - LocalDateTime.now(), - PageRequest.of(0, BATCH_SIZE)); - - if (pendingBatch.isEmpty()) { - return; - } - - LOG.info("[DeliveryWorker] Processing {} pending deliveries.", pendingBatch.size()); - - for (NotificationDelivery delivery : pendingBatch) { - processWithFaultIsolation(delivery.getId()); - } - } - - @Transactional - @Scheduled(fixedDelay = 300_000, initialDelay = 60_000) - public void recoverStuckDeliveries() { - LocalDateTime stuckBefore = LocalDateTime.now().minusMinutes(STUCK_THRESHOLD_MINUTES); - int recovered = deliveryRepository.recoverStuckDeliveries( - DeliveryStatus.IN_PROGRESS, DeliveryStatus.PENDING, stuckBefore); - if (recovered > 0) { - LOG.warn("[DeliveryWorker] Recovered {} stuck deliveries.", recovered); - } - } - - private void processWithFaultIsolation(UUID deliveryId) { - try { - deliveryProcessor.processDelivery(deliveryId); - } catch (Exception e) { - // 절대 상위로 전파하지 않는다 - LOG.error("[DeliveryWorker] Unexpected error processing delivery. deliveryId={}", - deliveryId, e); - } - } -} diff --git a/src/test/java/com/example/RealMatch/notification/application/event/NotificationEventListenerIntegrationTest.java b/src/test/java/com/example/RealMatch/notification/application/event/NotificationEventListenerIntegrationTest.java deleted file mode 100644 index 207ef77b..00000000 --- a/src/test/java/com/example/RealMatch/notification/application/event/NotificationEventListenerIntegrationTest.java +++ /dev/null @@ -1,316 +0,0 @@ -package com.example.RealMatch.notification.application.event; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.time.LocalDate; -import java.util.List; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.test.context.ActiveProfiles; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.DefaultTransactionDefinition; - -import com.example.RealMatch.brand.domain.entity.Brand; -import com.example.RealMatch.brand.domain.repository.BrandRepository; -import com.example.RealMatch.business.application.event.CampaignApplySentEvent; -import com.example.RealMatch.business.application.event.CampaignProposalSentEvent; -import com.example.RealMatch.business.application.event.CampaignProposalStatusChangedEvent; -import com.example.RealMatch.business.domain.entity.CampaignProposal; -import com.example.RealMatch.business.domain.enums.ProposalDirection; -import com.example.RealMatch.business.domain.enums.ProposalStatus; -import com.example.RealMatch.business.domain.repository.CampaignProposalRepository; -import com.example.RealMatch.notification.domain.entity.Notification; -import com.example.RealMatch.notification.domain.entity.NotificationDelivery; -import com.example.RealMatch.notification.domain.entity.enums.DeliveryStatus; -import com.example.RealMatch.notification.domain.entity.enums.NotificationKind; -import com.example.RealMatch.notification.domain.repository.NotificationDeliveryRepository; -import com.example.RealMatch.notification.domain.repository.NotificationRepository; -import com.example.RealMatch.user.domain.entity.User; -import com.example.RealMatch.user.domain.entity.enums.Role; -import com.example.RealMatch.user.domain.repository.UserRepository; - -/** - * NotificationEventListener 통합 테스트 - * 실제 이벤트 발행 → 알림 생성 → Delivery 생성까지 전체 플로우 검증 - * - *

주의: @TransactionalEventListener(AFTER_COMMIT)을 테스트하기 위해 - * @Transactional을 제거하고 명시적으로 트랜잭션을 커밋해야 합니다. - */ -@SpringBootTest -@ActiveProfiles("test") -@DisplayName("NotificationEventListener 통합 테스트") -class NotificationEventListenerIntegrationTest { - - @Autowired - private ApplicationEventPublisher eventPublisher; - - @Autowired - private NotificationRepository notificationRepository; - - @Autowired - private NotificationDeliveryRepository notificationDeliveryRepository; - - @Autowired - private UserRepository userRepository; - - @Autowired - private BrandRepository brandRepository; - - @Autowired - private CampaignProposalRepository campaignProposalRepository; - - @Autowired - private PlatformTransactionManager transactionManager; - - @Test - @DisplayName("CampaignProposalSentEvent → PROPOSAL_RECEIVED 알림 생성 + PUSH Delivery 생성") - void handleCampaignProposalSent_shouldCreateNotificationAndDelivery() { - // given - User creator = createTestUserInTransaction("크리에이터", Role.CREATOR); - User brandUser = createTestUserInTransaction("브랜드유저", Role.BRAND); - createTestBrandInTransaction("라운드랩", brandUser); - - Long proposalId = 1L; - Long campaignId = 10L; - - CampaignProposalSentEvent event = new CampaignProposalSentEvent( - proposalId, - brandUser.getId(), - creator.getId(), - campaignId, - "테스트 캠페인", - "캠페인 요약", - ProposalStatus.REVIEWING, - ProposalDirection.BRAND_TO_CREATOR, - false - ); - - // when - 트랜잭션 내에서 이벤트 발행 후 커밋 (AFTER_COMMIT 리스너 실행) - executeInTransaction(() -> { - eventPublisher.publishEvent(event); - }); - - // then - 알림 생성 확인 - List notifications = notificationRepository.findByUserId(creator.getId(), null).getContent(); - assertThat(notifications).hasSize(1); - - Notification notification = notifications.get(0); - assertThat(notification.getKind()).isEqualTo(NotificationKind.PROPOSAL_RECEIVED); - assertThat(notification.getUserId()).isEqualTo(creator.getId()); - assertThat(notification.getTitle()).contains("새 캠페인 제안"); - assertThat(notification.getBody()).contains("라운드랩"); - assertThat(notification.getProposalId()).isEqualTo(proposalId); - assertThat(notification.getCampaignId()).isEqualTo(campaignId); - - // then - Delivery 생성 확인 (PUSH만) - NotificationDelivery delivery = notificationDeliveryRepository - .findByNotificationIdAndChannel(notification.getId(), com.example.RealMatch.user.domain.entity.enums.NotificationChannel.PUSH) - .orElseThrow(() -> new AssertionError("Delivery not found")); - assertThat(delivery.getStatus()).isEqualTo(DeliveryStatus.PENDING); - assertThat(delivery.getChannel()).isEqualTo(com.example.RealMatch.user.domain.entity.enums.NotificationChannel.PUSH); - assertThat(delivery.getAttemptCount()).isZero(); - } - - @Test - @DisplayName("CampaignProposalStatusChangedEvent(MATCHED) → CAMPAIGN_MATCHED + PROPOSAL_SENT 알림 생성") - void handleCampaignProposalStatusChanged_shouldCreateTwoNotifications_whenMatched() { - // given - User creator = createTestUserInTransaction("크리에이터", Role.CREATOR); - User brandUser = createTestUserInTransaction("브랜드유저", Role.BRAND); - Brand brand = createTestBrandInTransaction("라운드랩", brandUser); - - // CampaignProposal 엔티티 생성 (senderUserId 조회용) - CampaignProposal proposal = createTestProposalInTransaction(creator, brand, brandUser.getId(), creator.getId()); - - CampaignProposalStatusChangedEvent event = new CampaignProposalStatusChangedEvent( - proposal.getId(), - null, // campaignId - brandUser.getId(), - creator.getId(), - ProposalStatus.MATCHED, - creator.getId(), // actorUserId - ProposalDirection.BRAND_TO_CREATOR - ); - - // when - 트랜잭션 내에서 이벤트 발행 후 커밋 (AFTER_COMMIT 리스너 실행) - executeInTransaction(() -> { - eventPublisher.publishEvent(event); - }); - - // then - 크리에이터에게 CAMPAIGN_MATCHED 알림 - List creatorNotifications = notificationRepository.findByUserId(creator.getId(), null).getContent(); - assertThat(creatorNotifications).anyMatch(n -> - n.getKind() == NotificationKind.CAMPAIGN_MATCHED && - n.getUserId().equals(creator.getId()) - ); - - // then - 제안 보낸 사람(brandUser)에게 PROPOSAL_SENT 알림 - List brandNotifications = notificationRepository.findByUserId(brandUser.getId(), null).getContent(); - assertThat(brandNotifications).anyMatch(n -> - n.getKind() == NotificationKind.PROPOSAL_SENT && - n.getUserId().equals(brandUser.getId()) - ); - - // then - CAMPAIGN_MATCHED는 PUSH + EMAIL Delivery 생성 - Notification matchedNotification = creatorNotifications.stream() - .filter(n -> n.getKind() == NotificationKind.CAMPAIGN_MATCHED) - .findFirst() - .orElseThrow(); - - List matchedDeliveries = notificationDeliveryRepository.findAll().stream() - .filter(d -> d.getNotificationId().equals(matchedNotification.getId())) - .toList(); - assertThat(matchedDeliveries).hasSize(2); // PUSH + EMAIL - } - - @Test - @DisplayName("CampaignApplySentEvent → CAMPAIGN_APPLIED 알림 생성") - void handleCampaignApplySent_shouldCreateNotification() { - // given - User creator = createTestUserInTransaction("크리에이터", Role.CREATOR); - User brandUser = createTestUserInTransaction("브랜드유저", Role.BRAND); - - Long applyId = 3L; - Long campaignId = 30L; - - CampaignApplySentEvent event = new CampaignApplySentEvent( - applyId, - campaignId, - creator.getId(), - brandUser.getId(), - "테스트 캠페인", - "캠페인 설명", - "지원 사유" - ); - - // when - 트랜잭션 내에서 이벤트 발행 후 커밋 (AFTER_COMMIT 리스너 실행) - executeInTransaction(() -> { - eventPublisher.publishEvent(event); - }); - - // then - List notifications = notificationRepository.findByUserId(brandUser.getId(), null).getContent(); - assertThat(notifications).hasSize(1); - - Notification notification = notifications.get(0); - assertThat(notification.getKind()).isEqualTo(NotificationKind.CAMPAIGN_APPLIED); - assertThat(notification.getUserId()).isEqualTo(brandUser.getId()); - assertThat(notification.getBody()).contains("지원했어요"); - } - - @Test - @DisplayName("멱등성: 동일 이벤트 재발행 시 중복 알림 생성 안됨") - void handleEvent_shouldNotCreateDuplicate_whenSameEventPublishedTwice() { - // given - User creator = createTestUserInTransaction("크리에이터", Role.CREATOR); - User brandUser = createTestUserInTransaction("브랜드유저", Role.BRAND); - createTestBrandInTransaction("라운드랩", brandUser); - - CampaignProposalSentEvent event = new CampaignProposalSentEvent( - 100L, - brandUser.getId(), - creator.getId(), - 1000L, - "테스트", - "요약", - ProposalStatus.REVIEWING, - ProposalDirection.BRAND_TO_CREATOR, - false - ); - - // when - 동일 이벤트 2번 발행 (각각 트랜잭션 커밋) - executeInTransaction(() -> { - eventPublisher.publishEvent(event); - }); - int firstCount = notificationRepository.findByUserId(creator.getId(), null).getContent().size(); - - executeInTransaction(() -> { - eventPublisher.publishEvent(event); - }); - int secondCount = notificationRepository.findByUserId(creator.getId(), null).getContent().size(); - - // then - 알림은 1개만 생성 (멱등성 보장) - assertThat(firstCount).isEqualTo(1); - assertThat(secondCount).isEqualTo(1); - } - - // ==================== 헬퍼 메서드 ==================== - - /** - * 트랜잭션 내에서 작업을 실행하고 커밋합니다. - * @TransactionalEventListener(AFTER_COMMIT) 리스너가 실행되도록 보장합니다. - */ - private void executeInTransaction(Runnable task) { - TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition()); - try { - task.run(); - transactionManager.commit(status); - } catch (Exception e) { - transactionManager.rollback(status); - throw e; - } - } - - /** - * 트랜잭션 내에서 User를 생성하고 커밋합니다. - */ - private User createTestUserInTransaction(String nickname, Role role) { - User[] result = new User[1]; - executeInTransaction(() -> { - User user = User.builder() - .name("테스트유저") - .nickname(nickname) - .email(nickname + "@example.com") // 고유한 이메일 - .role(role) - .build(); - result[0] = userRepository.save(user); - }); - return result[0]; - } - - /** - * 트랜잭션 내에서 Brand를 생성하고 커밋합니다. - */ - private Brand createTestBrandInTransaction(String brandName, User user) { - Brand[] result = new Brand[1]; - executeInTransaction(() -> { - Brand brand = Brand.builder() - .brandName(brandName) - .industryType(com.example.RealMatch.brand.domain.entity.enums.IndustryType.BEAUTY) - .user(user) - .createdBy(user.getId()) - .build(); - result[0] = brandRepository.save(brand); - }); - return result[0]; - } - - /** - * 트랜잭션 내에서 CampaignProposal을 생성하고 커밋합니다. - */ - private CampaignProposal createTestProposalInTransaction(User creator, Brand brand, Long senderUserId, Long receiverUserId) { - CampaignProposal[] result = new CampaignProposal[1]; - executeInTransaction(() -> { - CampaignProposal proposal = CampaignProposal.builder() - .creator(creator) - .brand(brand) - .whoProposed(Role.BRAND) - .senderUserId(senderUserId) - .receiverUserId(receiverUserId) - .title("테스트 제안") - .campaignDescription("테스트 설명") - .rewardAmount(100000) - .productId(1L) - .startDate(LocalDate.now().plusDays(1)) - .endDate(LocalDate.now().plusDays(30)) - .build(); - result[0] = campaignProposalRepository.save(proposal); - }); - return result[0]; - } -} diff --git a/src/test/java/com/example/RealMatch/notification/application/service/NotificationChannelResolverTest.java b/src/test/java/com/example/RealMatch/notification/application/service/NotificationChannelResolverTest.java deleted file mode 100644 index 6fd07a73..00000000 --- a/src/test/java/com/example/RealMatch/notification/application/service/NotificationChannelResolverTest.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.example.RealMatch.notification.application.service; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.Set; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -import com.example.RealMatch.notification.domain.entity.enums.NotificationKind; -import com.example.RealMatch.user.domain.entity.enums.NotificationChannel; - -/** - * NotificationChannelResolver 단위 테스트 - * Spring 없이 순수 로직만 테스트 - */ -@DisplayName("NotificationChannelResolver 테스트") -class NotificationChannelResolverTest { - - private final NotificationChannelResolver resolver = new NotificationChannelResolver(); - - @Test - @DisplayName("PROPOSAL_RECEIVED는 PUSH와 EMAIL을 반환") - void resolveChannels_shouldReturnPushAndEmail_whenProposalReceived() { - // when - Set channels = resolver.resolveChannels(NotificationKind.PROPOSAL_RECEIVED); - - // then - assertThat(channels).containsExactlyInAnyOrder(NotificationChannel.PUSH, NotificationChannel.EMAIL); - } - - @Test - @DisplayName("CAMPAIGN_MATCHED는 PUSH와 EMAIL 반환") - void resolveChannels_shouldReturnPushAndEmail_whenCampaignMatched() { - // when - Set channels = resolver.resolveChannels(NotificationKind.CAMPAIGN_MATCHED); - - // then - assertThat(channels).containsExactlyInAnyOrder(NotificationChannel.PUSH, NotificationChannel.EMAIL); - } - - @Test - @DisplayName("AUTO_CONFIRMED는 EMAIL만 반환") - void resolveChannels_shouldReturnEmailOnly_whenAutoConfirmed() { - // when - Set channels = resolver.resolveChannels(NotificationKind.AUTO_CONFIRMED); - - // then - assertThat(channels).containsExactly(NotificationChannel.EMAIL); - } - - @Test - @DisplayName("모든 NotificationKind에 대해 채널이 정의되어 있음") - void resolveChannels_shouldReturnChannels_whenAnyKind() { - // when & then - for (NotificationKind kind : NotificationKind.values()) { - Set channels = resolver.resolveChannels(kind); - assertThat(channels).isNotEmpty(); - } - } -} diff --git a/src/test/java/com/example/RealMatch/notification/application/service/NotificationMessageTemplateServiceTest.java b/src/test/java/com/example/RealMatch/notification/application/service/NotificationMessageTemplateServiceTest.java deleted file mode 100644 index a16bbb52..00000000 --- a/src/test/java/com/example/RealMatch/notification/application/service/NotificationMessageTemplateServiceTest.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.example.RealMatch.notification.application.service; - -import static org.assertj.core.api.Assertions.assertThat; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -import com.example.RealMatch.notification.application.service.NotificationMessageTemplateService.MessageTemplate; - -/** - * NotificationMessageTemplateService 단위 테스트 - */ -@DisplayName("NotificationMessageTemplateService 테스트") -class NotificationMessageTemplateServiceTest { - - private final NotificationMessageTemplateService service = new NotificationMessageTemplateService(); - - @Test - @DisplayName("PROPOSAL_RECEIVED 메시지 생성 - 브랜드명 포함") - void createProposalReceivedMessage_shouldContainBrandName() { - // when - MessageTemplate template = service.createProposalReceivedMessage("라운드랩"); - - // then - assertThat(template.title()).isEqualTo("새 캠페인 제안"); - assertThat(template.body()).contains("라운드랩"); - assertThat(template.body()).contains("새로운 캠페인 제안을 보냈어요"); - } - - @Test - @DisplayName("PROPOSAL_SENT 메시지 생성 - 수락 시") - void createProposalSentMessage_shouldContainAccepted_whenAccepted() { - // when - MessageTemplate template = service.createProposalSentMessage("라운드랩", true); - - // then - assertThat(template.title()).isEqualTo("제안 결과"); - assertThat(template.body()).contains("라운드랩"); - assertThat(template.body()).contains("수락되었어요"); - } - - @Test - @DisplayName("PROPOSAL_SENT 메시지 생성 - 거절 시") - void createProposalSentMessage_shouldContainRejected_whenRejected() { - // when - MessageTemplate template = service.createProposalSentMessage("라운드랩", false); - - // then - assertThat(template.title()).isEqualTo("제안 결과"); - assertThat(template.body()).contains("라운드랩"); - assertThat(template.body()).contains("거절되었어요"); - } - - @Test - @DisplayName("CAMPAIGN_MATCHED 메시지 생성") - void createCampaignMatchedMessage_shouldContainBrandName() { - // when - MessageTemplate template = service.createCampaignMatchedMessage("라운드랩"); - - // then - assertThat(template.title()).isEqualTo("캠페인 매칭"); - assertThat(template.body()).contains("라운드랩"); - assertThat(template.body()).contains("매칭되었어요"); - } - - @Test - @DisplayName("CAMPAIGN_APPLIED 메시지 생성 - 크리에이터명 포함") - void createCampaignAppliedMessage_shouldContainCreatorName() { - // when - MessageTemplate template = service.createCampaignAppliedMessage("크리에이터A"); - - // then - assertThat(template.title()).isEqualTo("캠페인 지원"); - assertThat(template.body()).contains("크리에이터A"); - assertThat(template.body()).contains("지원했어요"); - } -} From 9fa6f6b74c5b173f3582791463a77ede9034d1d5 Mon Sep 17 00:00:00 2001 From: 1000hyehyang Date: Wed, 11 Feb 2026 21:25:09 +0900 Subject: [PATCH 2/5] =?UTF-8?q?refactor(#348):=20Consumer=20=EC=98=88?= =?UTF-8?q?=EC=99=B8=20=EB=B6=84=EA=B8=B0=C2=B7Outbox=20=EC=9B=90=EC=9E=90?= =?UTF-8?q?=EC=A0=81=20=EC=8B=A4=ED=8C=A8=20=EC=B2=98=EB=A6=AC=C2=B7DLQ=20?= =?UTF-8?q?=EC=A0=95=EC=B1=85=20=EC=A0=95=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/NotificationOutboxService.java | 41 +++++----- .../NotificationOutboxRepository.java | 12 +-- .../NotificationDeliveryConsumer.java | 78 ++++++++++++++++++- 3 files changed, 102 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/example/RealMatch/notification/application/service/NotificationOutboxService.java b/src/main/java/com/example/RealMatch/notification/application/service/NotificationOutboxService.java index 5d745e69..6d59e75f 100644 --- a/src/main/java/com/example/RealMatch/notification/application/service/NotificationOutboxService.java +++ b/src/main/java/com/example/RealMatch/notification/application/service/NotificationOutboxService.java @@ -10,8 +10,11 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import com.example.RealMatch.notification.domain.entity.NotificationDelivery; import com.example.RealMatch.notification.domain.entity.NotificationOutbox; +import com.example.RealMatch.notification.domain.entity.enums.DeliveryStatus; import com.example.RealMatch.notification.domain.entity.enums.OutboxStatus; +import com.example.RealMatch.notification.domain.repository.NotificationDeliveryRepository; import com.example.RealMatch.notification.domain.repository.NotificationOutboxRepository; import com.example.RealMatch.user.domain.entity.enums.NotificationChannel; @@ -27,7 +30,11 @@ public class NotificationOutboxService { private static final Logger LOG = LoggerFactory.getLogger(NotificationOutboxService.class); + private static final List RETRYABLE_DELIVERY_STATUSES = + List.of(DeliveryStatus.PENDING, DeliveryStatus.RETRY); + private final NotificationOutboxRepository outboxRepository; + private final NotificationDeliveryRepository deliveryRepository; @Transactional(readOnly = true) public List findPendingOutbox(int limit) { @@ -54,41 +61,35 @@ public void markOutboxSent(UUID outboxId) { } } - /** 발행 실패 시 retryCount < MAX → PENDING, else FAILED. */ + /** 발행 실패: retryCount 증가 + status(PENDING/FAILED)를 DB에서 원자적으로 결정. */ @Transactional public void markOutboxPublishFailed(UUID outboxId, String error) { - NotificationOutbox outbox = outboxRepository.findById(outboxId).orElse(null); - if (outbox == null) { - LOG.warn("[Outbox] Not found for failure recording. outboxId={}", outboxId); - return; - } - - OutboxStatus newStatus; - if (outbox.getRetryCount() + 1 >= NotificationOutbox.MAX_PUBLISH_RETRY) { - newStatus = OutboxStatus.FAILED; - LOG.error("[Outbox] Max publish retries exceeded. outboxId={}, deliveryId={}", - outboxId, outbox.getDeliveryId()); - } else { - newStatus = OutboxStatus.PENDING; - LOG.warn("[Outbox] Publish failed, will retry. outboxId={}, retryCount={}", - outboxId, outbox.getRetryCount() + 1); - } - int updated = outboxRepository.markPublishFailed( outboxId, - newStatus, truncate(error, 500), + NotificationOutbox.MAX_PUBLISH_RETRY, + OutboxStatus.FAILED, + OutboxStatus.PENDING, List.of(OutboxStatus.SENDING, OutboxStatus.PENDING)); if (updated == 0) { LOG.warn("[Outbox] markPublishFailed affected 0 rows (concurrent update). outboxId={}", outboxId); + } else { + LOG.info("[Outbox] Publish failed recorded. outboxId={}, retryCount incremented.", outboxId); } } - /** Retry용 Outbox 생성. 이미 PENDING/SENDING 있으면 skip(중복 방지). */ + /** Retry용 Outbox 생성. delivery가 PENDING/RETRY일 때만 생성. 이미 PENDING/SENDING Outbox 있으면 skip. */ @Transactional public void createRetryOutboxIfAbsent(UUID deliveryId, UUID notificationId, NotificationChannel channel) { + NotificationDelivery delivery = deliveryRepository.findById(deliveryId).orElse(null); + if (delivery == null || !RETRYABLE_DELIVERY_STATUSES.contains(delivery.getStatus())) { + LOG.debug("[Outbox] Skip — delivery not found or not retryable. deliveryId={}, status={}", + deliveryId, delivery != null ? delivery.getStatus() : null); + return; + } + boolean alreadyPending = outboxRepository.existsByDeliveryIdAndStatusIn( deliveryId, List.of(OutboxStatus.PENDING, OutboxStatus.SENDING)); diff --git a/src/main/java/com/example/RealMatch/notification/domain/repository/NotificationOutboxRepository.java b/src/main/java/com/example/RealMatch/notification/domain/repository/NotificationOutboxRepository.java index 8faaf72f..1196e993 100644 --- a/src/main/java/com/example/RealMatch/notification/domain/repository/NotificationOutboxRepository.java +++ b/src/main/java/com/example/RealMatch/notification/domain/repository/NotificationOutboxRepository.java @@ -44,17 +44,19 @@ int markAsSent( @Param("targetStatus") OutboxStatus targetStatus, @Param("sourceStatus") OutboxStatus sourceStatus); - /** 발행 실패: retryCount++ 및 상태 전이. */ + /** 발행 실패: retryCount 증가 + status를 DB에서 원자적으로 결정 (레이스 방지). enum은 파라미터로 전달해 provider-agnostic. */ @Modifying(clearAutomatically = true) @Query("UPDATE NotificationOutbox o " - + "SET o.status = :newStatus, " - + "o.retryCount = o.retryCount + 1, " - + "o.lastError = :lastError " + + "SET o.retryCount = o.retryCount + 1, " + + "o.lastError = :lastError, " + + "o.status = CASE WHEN (o.retryCount + 1) >= :maxRetry THEN :statusFailed ELSE :statusPending END " + "WHERE o.id = :id AND o.status IN :sourceStatuses") int markPublishFailed( @Param("id") UUID id, - @Param("newStatus") OutboxStatus newStatus, @Param("lastError") String lastError, + @Param("maxRetry") int maxRetry, + @Param("statusFailed") OutboxStatus statusFailed, + @Param("statusPending") OutboxStatus statusPending, @Param("sourceStatuses") Collection sourceStatuses); /** 미처리 Outbox(PENDING/SENDING) 존재 여부. 중복 생성 방지용. */ diff --git a/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryConsumer.java b/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryConsumer.java index 6ad25871..729969f8 100644 --- a/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryConsumer.java +++ b/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryConsumer.java @@ -1,6 +1,9 @@ package com.example.RealMatch.notification.infrastructure.messaging; import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; import java.util.EnumMap; import java.util.List; import java.util.Map; @@ -10,10 +13,13 @@ import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.dao.DataAccessException; import org.springframework.messaging.handler.annotation.Header; + import org.springframework.stereotype.Component; import com.example.RealMatch.notification.application.service.NotificationDeliveryClaimService; +import com.example.RealMatch.notification.application.service.NotificationOutboxService; import com.example.RealMatch.notification.domain.entity.Notification; import com.example.RealMatch.notification.domain.repository.NotificationRepository; import com.example.RealMatch.notification.infrastructure.sender.NotificationChannelSender; @@ -24,7 +30,8 @@ /** * MQ Consumer. claim → 발송(FCM/Email) → 결과 기록 순서를 엄격히 따른다. * - claim 실패(0 row) → ACK 후 종료 (중복 발송 방지) - * - 예외 → NACK(requeue=false) → DLQ + * - 일시 오류(DB 등) → ACK + delivery RETRY + Outbox 재생성 → Recovery가 재발행 + * - 영구 오류(역직렬화, 메시지 형식 오류 등) → NACK(requeue=false) → DLQ * - @Transactional 없음 — 외부 API가 TX 안에서 실행되는 것을 방지 */ @Component @@ -33,14 +40,17 @@ public class NotificationDeliveryConsumer { private static final Logger LOG = LoggerFactory.getLogger(NotificationDeliveryConsumer.class); private final NotificationDeliveryClaimService claimService; + private final NotificationOutboxService outboxService; private final NotificationRepository notificationRepository; private final Map senderMap; public NotificationDeliveryConsumer( NotificationDeliveryClaimService claimService, + NotificationOutboxService outboxService, NotificationRepository notificationRepository, List senders) { this.claimService = claimService; + this.outboxService = outboxService; this.notificationRepository = notificationRepository; this.senderMap = new EnumMap<>(NotificationChannel.class); for (NotificationChannelSender sender : senders) { @@ -62,11 +72,71 @@ public void handleDelivery(NotificationDeliveryMessage message, processMessage(message); channel.basicAck(deliveryTag, false); // 정상 처리 완료 → ACK } catch (Exception e) { - // 예상치 못한 예외 → NACK(requeue=false) → DLQ - LOG.error("[DeliveryConsumer] Unexpected error. Sending to DLQ. message={}, error={}", - message, e.getMessage(), e); + if (isTransient(e)) { + handleTransientError(message, channel, deliveryTag, e); + } else { + LOG.error("[DeliveryConsumer] Permanent error. Sending to DLQ. message={}, error={}", + message, e.getMessage(), e); + safeNack(channel, deliveryTag); + } + } + } + + /** + * 일시 오류: ACK하여 메시지 제거 + delivery RETRY 기록 + 재발행용 Outbox 생성. + * RecoveryScheduler가 RETRY delivery에 대해 Outbox를 만들고 Publisher가 다시 MQ로 보낸다. + */ + private void handleTransientError(NotificationDeliveryMessage message, Channel channel, + long deliveryTag, Exception e) { + UUID deliveryId = null; + UUID notificationId = null; + NotificationChannel channelEnum = null; + try { + deliveryId = UUID.fromString(message.getDeliveryId()); + notificationId = UUID.fromString(message.getNotificationId()); + channelEnum = NotificationChannel.valueOf(message.getChannel()); + } catch (Exception parseEx) { + // 파싱 실패 = poison(영구 오류). transient가 아님. + LOG.warn("[DeliveryConsumer] Poison message (cannot parse). Sending to DLQ. message={}", + message, parseEx); safeNack(channel, deliveryTag); + return; + } + + try { + claimService.recordFailure(deliveryId, e.getMessage()); + outboxService.createRetryOutboxIfAbsent(deliveryId, notificationId, channelEnum); + channel.basicAck(deliveryTag, false); + LOG.warn("[DeliveryConsumer] Transient error. Scheduled retry. deliveryId={}, error={}", + deliveryId, e.getMessage()); + } catch (Exception retryEx) { + // DB가 흔들리는 중이면 재시도 기록도 실패함. DLQ로 보내지 않고 ACK → stuck 상태로 두면 + // recoverStuckDeliveries가 10분 후 IN_PROGRESS → RETRY로 복구하여 재발행 경로로 보냄. + LOG.error("[DeliveryConsumer] Transient but failed to schedule retry. ACK to avoid DLQ pollution. " + + "deliveryId={}, originalError={}, scheduleError={}", + deliveryId, e.getMessage(), retryEx.getMessage(), retryEx); + try { + channel.basicAck(deliveryTag, false); + } catch (IOException ackEx) { + LOG.error("[DeliveryConsumer] Failed to ACK after schedule retry failure. deliveryTag={}", + deliveryTag, ackEx); + } + } + } + + /** DB/네트워크 등 일시 오류면 true. ShutdownSignalException은 제외(채널/커넥션 문제는 컨테이너가 재연결). */ + private static boolean isTransient(Throwable e) { + Throwable t = e; + while (t != null) { + if (t instanceof DataAccessException) { + return true; + } + if (t instanceof SocketTimeoutException || t instanceof ConnectException || t instanceof UnknownHostException) { + return true; + } + t = t.getCause(); } + return false; } private void processMessage(NotificationDeliveryMessage message) { From af866702b4906ff54ce1fb3cc556906f4766941c Mon Sep 17 00:00:00 2001 From: 1000hyehyang Date: Wed, 11 Feb 2026 21:30:39 +0900 Subject: [PATCH 3/5] =?UTF-8?q?fix(#348):=20check=20style=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../infrastructure/messaging/NotificationDeliveryConsumer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryConsumer.java b/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryConsumer.java index 729969f8..ec3b2424 100644 --- a/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryConsumer.java +++ b/src/main/java/com/example/RealMatch/notification/infrastructure/messaging/NotificationDeliveryConsumer.java @@ -15,7 +15,6 @@ import org.springframework.amqp.support.AmqpHeaders; import org.springframework.dao.DataAccessException; import org.springframework.messaging.handler.annotation.Header; - import org.springframework.stereotype.Component; import com.example.RealMatch.notification.application.service.NotificationDeliveryClaimService; From 932b86a36ed630bbbe0732c9f4bc0121bdcda50e Mon Sep 17 00:00:00 2001 From: 1000hyehyang Date: Wed, 11 Feb 2026 21:36:40 +0900 Subject: [PATCH 4/5] =?UTF-8?q?fix(#348):=20rabbit=20mq=20gralde=20?= =?UTF-8?q?=EC=84=A4=EC=A0=95=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/sql/migrate_notification_tables.sql | 89 +++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 scripts/sql/migrate_notification_tables.sql diff --git a/scripts/sql/migrate_notification_tables.sql b/scripts/sql/migrate_notification_tables.sql new file mode 100644 index 00000000..15cf8bde --- /dev/null +++ b/scripts/sql/migrate_notification_tables.sql @@ -0,0 +1,89 @@ +-- ============================================ +-- 알림 관련 테이블 마이그레이션 (전체) +-- ============================================ +-- 실행 순서: notification → notification_delivery → notification_outbox, fcm_token +-- MySQL 8.0+ / MariaDB 10.2+ 권장 (DATETIME(6), utf8mb4) +-- 사용법: mysql -u 사용자 -p DB명 < migrate_notification_tables.sql +-- ============================================ + +-- --------------------------------------------------------------------------- +-- 1. Notification (알림 원장) +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS notification ( + id BINARY(16) NOT NULL PRIMARY KEY, + user_id BIGINT NOT NULL COMMENT '알림 수신자 사용자 ID', + kind VARCHAR(30) NOT NULL COMMENT '알림 종류 (PROPOSAL_RECEIVED, CAMPAIGN_MATCHED 등)', + title VARCHAR(255) NOT NULL COMMENT '알림 제목', + body VARCHAR(1000) NOT NULL COMMENT '알림 내용', + reference_type VARCHAR(30) COMMENT '참조 타입 (CAMPAIGN_PROPOSAL, CAMPAIGN_APPLY, CAMPAIGN)', + reference_id VARCHAR(36) COMMENT '참조 ID', + campaign_id BIGINT COMMENT '관련 캠페인 ID', + proposal_id BIGINT COMMENT '관련 제안 ID', + is_read BOOLEAN NOT NULL DEFAULT FALSE COMMENT '읽음 여부', + created_at DATETIME(6) NOT NULL COMMENT '생성 시간', + updated_at DATETIME(6) COMMENT '수정 시간', + deleted_at DATETIME(6) COMMENT '삭제 시간 (소프트 삭제)', + is_deleted TINYINT(1) NOT NULL DEFAULT 0 COMMENT '삭제 여부 (소프트 삭제)', + INDEX idx_notification_user_read_created (user_id, is_read, created_at) COMMENT '미읽음 조회 최적화', + INDEX idx_notification_user_created (user_id, created_at) COMMENT '날짜별 조회 최적화', + INDEX idx_notification_user_kind (user_id, kind) COMMENT '필터 조회 최적화' +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci +COMMENT='알림 원장 테이블'; + +-- --------------------------------------------------------------------------- +-- 2. NotificationDelivery (채널별 발송 추적, 재시도) +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS notification_delivery ( + id BINARY(16) NOT NULL PRIMARY KEY, + notification_id BINARY(16) NOT NULL, + channel VARCHAR(30) NOT NULL COMMENT 'PUSH, EMAIL, SMS', + status VARCHAR(20) NOT NULL COMMENT 'PENDING, IN_PROGRESS, RETRY, SENT, FAILED', + fail_reason VARCHAR(500), + attempted_at DATETIME(6), + next_retry_at DATETIME(6) COMMENT '다음 재시도 예정 시간 (exponential backoff: 1m, 5m, 30m, 2h, 12h)', + sent_at DATETIME(6), + provider_message_id VARCHAR(255), + idempotency_key VARCHAR(100) UNIQUE COMMENT 'eventId:kind:receiverId:channel 조합, 멱등성 보장', + attempt_count INT NOT NULL DEFAULT 0 COMMENT '발송 시도 횟수 (최대 5회, 재시도 정책용)', + created_at DATETIME(6) NOT NULL, + updated_at DATETIME(6), + INDEX idx_delivery_notification (notification_id, channel), + INDEX idx_delivery_status_retry (status, next_retry_at, created_at) COMMENT '워커 배치 조회 최적화' +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci +COMMENT='푸시/이메일 발송 추적 (Outbox + RabbitMQ 발송용)'; + +-- --------------------------------------------------------------------------- +-- 3. NotificationOutbox (Outbox 패턴, MQ 발행 대기) +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS notification_outbox ( + id BINARY(16) NOT NULL PRIMARY KEY, + delivery_id BINARY(16) NOT NULL COMMENT 'FK → notification_delivery.id', + notification_id BINARY(16) NOT NULL COMMENT 'FK → notification.id', + channel VARCHAR(30) NOT NULL COMMENT 'PUSH, EMAIL', + status VARCHAR(20) NOT NULL COMMENT 'PENDING, SENDING, SENT, FAILED', + retry_count INT NOT NULL DEFAULT 0 COMMENT 'MQ 발행 재시도 횟수 (최대 10회)', + last_error VARCHAR(500) COMMENT '마지막 발행 실패 사유', + created_at DATETIME(6) NOT NULL, + updated_at DATETIME(6), + INDEX idx_outbox_status_created (status, created_at) COMMENT 'PENDING Outbox 조회 (OutboxPublisher)' +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci +COMMENT='Outbox: Notification+Delivery와 동일 TX 저장, Publisher가 MQ 발행'; + +-- --------------------------------------------------------------------------- +-- 4. FCM Token (웹 푸시 디바이스 토큰) +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS fcm_token ( + id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, + user_id BIGINT NOT NULL COMMENT '토큰 소유자 사용자 ID', + token VARCHAR(500) NOT NULL COMMENT 'FCM 디바이스 토큰 (웹 푸시용)', + device_info VARCHAR(255) COMMENT '디바이스 정보 (예: Chrome/120.0.0.0 Windows 10)', + created_at DATETIME(6) NOT NULL COMMENT '토큰 등록 시간', + updated_at DATETIME(6) COMMENT '토큰 업데이트 시간', + INDEX idx_fcm_token_user (user_id), + UNIQUE KEY uk_fcm_token_value (token) COMMENT '토큰 중복 방지 (멱등성 보장)' +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci +COMMENT='FCM 디바이스 토큰 저장소 (사용자당 여러 토큰 지원)'; + +-- ============================================ +-- 완료 +-- ============================================ From d451f58666fca5881b1c182e900240a9abce7cfb Mon Sep 17 00:00:00 2001 From: 1000hyehyang Date: Wed, 11 Feb 2026 21:43:02 +0900 Subject: [PATCH 5/5] =?UTF-8?q?fix(#348):=20build.gradle=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/pr-check.yml | 2 +- build.gradle | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index c8061b54..c65efb1e 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -129,7 +129,7 @@ jobs: continue-on-error: true - name: Build with Gradle - run: ./gradlew build -x test + run: ./gradlew build -x test --refresh-dependencies - name: Run tests run: ./gradlew test diff --git a/build.gradle b/build.gradle index 5d4d6ecf..1afe07b4 100644 --- a/build.gradle +++ b/build.gradle @@ -27,16 +27,18 @@ repositories { } dependencies { - implementation 'org.springframework.boot:spring-boot-starter-mail' + // Spring Boot starters implementation 'org.springframework.boot:spring-boot-starter' - implementation 'org.springframework.boot:spring-boot-starter-security' + implementation 'org.springframework.boot:spring-boot-starter-actuator' + implementation 'org.springframework.boot:spring-boot-starter-amqp' implementation 'org.springframework.boot:spring-boot-starter-data-jdbc' implementation 'org.springframework.boot:spring-boot-starter-data-jpa' + implementation 'org.springframework.boot:spring-boot-starter-mail' + implementation 'org.springframework.boot:spring-boot-starter-oauth2-client' + implementation 'org.springframework.boot:spring-boot-starter-security' + implementation 'org.springframework.boot:spring-boot-starter-validation' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-websocket' - implementation 'org.springframework.boot:spring-boot-starter-validation' - implementation 'org.springframework.boot:spring-boot-starter-oauth2-client' - implementation 'org.springframework.boot:spring-boot-starter-actuator' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' @@ -85,11 +87,12 @@ dependencies { implementation 'org.springframework.retry:spring-retry' implementation 'org.springframework:spring-aspects' - // Spring Mail - implementation 'org.springframework.boot:spring-boot-starter-mail' - // Firebase Admin SDK (FCM 푸시 알림) implementation 'com.google.firebase:firebase-admin:9.2.0' + + // RabbitMQ + implementation 'com.rabbitmq:amqp-client' + testImplementation 'org.springframework.amqp:spring-rabbit-test' } tasks.named('test') {