

email-send-consumer라고 지어주자. emailsendconsumer라고 지어주자. Spring Boot DevTools, Spring Web, Spring for Apache Kafka를 선택해라. 이 프로젝트에서는application.properties를 지우고application.yml을 생성했다.
server: port: 0 # 사용 가능한 랜덤 포트를 찾아서 서버를 실행 (Producer 서버와의 포트 충돌을 방지) spring: kafka: # Kafka 서버 주소 (EC2에 카프카를 설치했기 때문에 EC2 주소를 입력해야 한다.) bootstrap-servers: 15.164.96.71:9092 consumer: # 메시지의 key 역직렬화 방식 : Kafka에서 받아온 메시지를 String으로 변환 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 메시지의 value 역직렬화 방식 : Kafka에서 받아온 메시지를 String으로 변환 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 컨슈머 그룹이 미리 안 만들어져있는 경우에, 컨슈머 그룹을 직접 생성해서 메시지를 처음부터 읽음. # 만약 컨슈머 그룹이 이미 만들어져있다면, 해당 컨슈머 그룹이 읽었던 메시지부터 읽음. # 이 옵션을 주지 않으면 컨슈머 그룹을 직접 생성해서 메시지를 읽을 때, # 기존에 쌓여있던 메시지를 읽지 않고 컨슈머 그룹이 생성된 이후에 들어온 메시지부터 읽어버린다. # 그럼 컨슈머 그룹이 생성되기 전에 쌓여있던 메시지들이 처리되지 않고 누락돼버린다. auto-offset-reset: earliest
public class EmailSendMessage { private String from; // 발신자 이메일 private String to; // 수신자 이메일 private String subject; // 이메일 제목 private String body; // 이메일 본문 // 역직렬화(String 형태의 카프카 메시지 -> Java 객체)시 필요함 public EmailSendMessage() { } public EmailSendMessage(String from, String to, String subject, String body) { this.from = from; this.to = to; this.subject = subject; this.body = body; } // Json 값을 EmailSendMessage로 역직렬화하는 메서드 public static EmailSendMessage fromJson(String json) { try { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readValue(json, EmailSendMessage.class); } catch (JsonProcessingException e) { throw new RuntimeException("JSON 파싱 실패"); } } public String getFrom() { return from; } public String getTo() { return to; } public String getSubject() { return subject; } public String getBody() { return body; } }
@Service public class EmailSendConsumer { @KafkaListener( topics = "email.send", groupId = "email-send-group" // 컨슈머 그룹 이름 ) public void consume(String message) { System.out.println("Kafka로부터 받아온 메시지: " + message); EmailSendMessage emailSendMessage = EmailSendMessage.fromJson(message); // ... 실제 이메일 발송 로직은 생략 ... System.out.println("이메일 발송 완료"); } }

email.send 토픽에 쌓여있던 메시지를 가져와 처리한 걸 확인할 수 있다.

email.send 토픽에 쌓인 메시지를 실시간으로 처리하는 걸 확인할 수 있다. # 컨슈머 그룹 세부 정보 조회하기 $ bin/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --group email-send-group \ --describe

CURRENT-OFFSET(다음에 읽을 메시지의 오프셋 번호)이 2이므로 0~1의 오프셋 메시지는 이미 읽었다는 뜻이다. 이 출력값을 보니 컨슈머 그룹이 정상적으로 잘 작동했다는 걸 확인할 수 있다. 
email.send 토픽에 메시지를 쌓아뒀다. 그러고 Consumer 서버가 email.send 토픽에 쌓여있는 메시지를 하나씩 읽어들이면서 처리했다.