public class UserSignedUpEvent { private Long userId; private String email; private String name; // 역직렬화(String 형태의 카프카 메시지 -> Java 객체)시 필요함 public UserSignedUpEvent() { } public UserSignedUpEvent(Long userId, String email, String name) { this.userId = userId; this.email = email; this.name = name; } public static UserSignedUpEvent fromJson(String json) { try { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readValue(json, UserSignedUpEvent.class); } catch (JsonProcessingException e) { throw new RuntimeException("JSON 파싱 실패"); } } public Long getUserId() { return userId; } public String getEmail() { return email; } public String getName() { return name; } }
@Service public class UserSignedUpEventConsumer { @KafkaListener( topics = "user.signed-up", groupId = "email-service", concurrency = "3" ) @RetryableTopic( attempts = "5", backoff = @Backoff(delay = 1000, multiplier = 2), dltTopicSuffix = ".dlt" ) public void consume(String message) throws InterruptedException { UserSignedUpEvent userSignedUpEvent = UserSignedUpEvent.fromJson(message); // 실제 이메일 발송 로직은 생략 String receiverEmail = userSignedUpEvent.getEmail(); String subject = userSignedUpEvent.getName() + "님, 회원 가입을 축하드립니다!"; Thread.sleep(3000); // 이메일 발송에 3초 정도 시간이 걸리는 걸 가정 System.out.println("이메일 발송 완료"); } }
@Entity @Table(name = "email_logs") public class EmailLog { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private Long receiverUserId; private String receiverEmail; private String subject; public EmailLog() { } public EmailLog(Long receiverUserId, String receiverEmail, String subject) { this.receiverUserId = receiverUserId; this.receiverEmail = receiverEmail; this.subject = subject; } // getter 메서드 }
public interface EmailLogRepository extends JpaRepository<EmailLog, Long> { }
@Service public class UserSignedUpEventConsumer { private EmailLogRepository emailLogRepository; public UserSignedUpEventConsumer(EmailLogRepository emailLogRepository) { this.emailLogRepository = emailLogRepository; } @KafkaListener( topics = "user.signed-up", groupId = "email-service", concurrency = "3" ) @RetryableTopic( attempts = "5", backoff = @Backoff(delay = 1000, multiplier = 2), dltTopicSuffix = ".dlt" ) public void consume(String message) throws InterruptedException { UserSignedUpEvent userSignedUpEvent = UserSignedUpEvent.fromJson(message); String receiverEmail = userSignedUpEvent.getEmail(); String subject = userSignedUpEvent.getName() + "님, 회원 가입을 축하드립니다!"; Thread.sleep(3000); System.out.println("이메일 발송 완료"); EmailLog emailLog = new EmailLog( userSignedUpEvent.getUserId(), receiverEmail, subject ); emailLogRepository.save(emailLog); } }
@Service public class UserSignedUpEventDltConsumer { @KafkaListener( topics = "user.signed-up.dlt", groupId = "email-service" ) public void consume(String message) { // 실제 로직은 생략 System.out.println("로그 시스템에 전송 : " + message); System.out.println("Slack에 알림 발송"); } }
