$ docker run -d -p 9092:9092 apache/kafka:4.1.0 # 잘 실행됐는 지 확인하기 $ docker ps
... dependencies { implementation 'org.springframework.boot:spring-boot-starter-data-jpa' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.kafka:spring-kafka' developmentOnly 'org.springframework.boot:spring-boot-devtools' runtimeOnly 'com.mysql:mysql-connector-j' testImplementation 'org.springframework.boot:spring-boot-starter-test' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' } ...

server: port: 8081 spring: datasource: url: jdbc:mysql://localhost:3307/board-db username: root password: password driver-class-name: com.mysql.cj.jdbc.Driver jpa: hibernate: ddl-auto: update show-sql: true kafka: # Kafka 서버 주소 bootstrap-servers: localhost:9092 # 게시글 서비스에서는 메시지를 produce 하기만 함 producer: # 메시지의 key 직렬화 방식 : 자바 객체를 문자열(String)로 변환해서 Kafka에 전송 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 메시지의 value 직렬화 방식 : 자바 객체를 문자열(String)로 변환해서 Kafka에 전송 value-serializer: org.apache.kafka.common.serialization.StringSerializer client: user-service: url: http://localhost:8080 point-service: url: http://localhost:8082
public class BoardCreatedEvent { private Long userId; public BoardCreatedEvent(Long userId) { this.userId = userId; } public Long getUserId() { return userId; } }
@Service public class BoardService { private final BoardRepository boardRepository; private final UserClient userClient; private final PointClient pointClient; private final KafkaTemplate<String, String> kafkaTemplate; public BoardService( BoardRepository boardRepository, UserClient userClient, PointClient pointClient, KafkaTemplate<String, String> kafkaTemplate ) { this.boardRepository = boardRepository; this.userClient = userClient; this.pointClient = pointClient; this.kafkaTemplate = kafkaTemplate; } ... public void create(CreateBoardRequestDto createBoardRequestDto) { // 게시글 저장을 성공했는 지 판단하는 플래그 boolean isBoardCreated = false; Long savedBoardId = null; // 포인트 차감을 성공했는 지 판단하는 플래그 boolean isPointDeducted = false; try { // 게시글 작성 전 100 포인트 차감 pointClient.deductPoints(createBoardRequestDto.getUserId(), 100); isPointDeducted = true; // 포인트 차감 성공 플래그 System.out.println("포인트 차감 성공"); // 게시글 작성 Board board = new Board( createBoardRequestDto.getTitle(), createBoardRequestDto.getContent(), createBoardRequestDto.getUserId() ); Board savedBoard = this.boardRepository.save(board); savedBoardId = savedBoard.getBoardId(); isBoardCreated = true; // 게시글 저장 성공 플래그 System.out.println("게시글 저장 성공"); // 게시글 작성 시 작성자에게 활동 점수 10점 부여 userClient.addActivityScore(createBoardRequestDto.getUserId(), 10); System.out.println("활동 점수 적립 성공"); // '게시글 작성 완료' 이벤트 발행 BoardCreatedEvent boardCreatedEvent = new BoardCreatedEvent(createBoardRequestDto.getUserId()); this.kafkaTemplate.send("board.created", toJsonString(boardCreatedEvent)); System.out.println("게시글 작성 완료 이벤트 발행"); } catch (Exception e) { if (isBoardCreated) { // 게시글 작성 보상 트랜잭션 => 게시글 삭제 this.boardRepository.deleteById(savedBoardId); System.out.println("[보상 트랜잭션] 게시글 삭제"); } if (isPointDeducted) { // 포인트 차감 보상 트랜잭션 => 포인트 적립 pointClient.addPoints(createBoardRequestDto.getUserId(), 100); System.out.println("[보상 트랜잭션] 포인트 적립"); } // 실패 응답으로 처리하기 위해 예외 던지기 throw e; } } // 객체를 Json 형태의 String으로 만들어주는 메서드 // (클래스로 분리하면 더 좋지만 편의를 위해 메서드로만 분리) private String toJsonString(Object object) { ObjectMapper objectMapper = new ObjectMapper(); try { String message = objectMapper.writeValueAsString(object); return message; } catch (JsonProcessingException e) { throw new RuntimeException("Json 직렬화 실패"); } } }
