목표
분산 시스템으로 구성 되어있는 각 서비스에 이벤트를 발행하고
각 서비스의 데이터의 일관성을 보장한다.
구성도
- 주문 서비스
주문을 생성하고 결제가 완료되면
주문 생성 이벤트를 발행한다.
- 재고 서비스
주문 완료된 상품들의 재고를 차감 한다
- 배송 서비스
주문 완료된 상품들을 배송 시킨다
고려사항
주문이 생성되면 반드시 재고를 지우고 배송을 처리 해야 한다.
그런데
- 카프카에 이벤트 발행이 불가능 하다면 ?
- 결제 API에서 실패 한다면 ?
- 주문 생성이 실패 한다면 ?
위 사항들로 인해 주문은 생성되지 않을 수 있다.
또, 모든 사항에 주문을 실패하게 해서 처음부터 주문을 다시 하게 만든다면 그것은 서비스에 대한 신뢰하락으로 이어질 것이다.
Transcation OutBox Pattern 개념
분산시스템의 데이터 정합성을 맞추고자 Transcation OutBox Pattern을 통해 이벤트 발행을 보장한다.
Transcation OutBox Pattern 의 개념은 아래 글을 참고 하자
https://soobysu.tistory.com/163
[MSA] Transaction Outbox 패턴
이벤트 발행을 보장하는 방법Event Driven Architecture 에서의 이벤트 발행을 보장하는 방법은 여러가지 있다.MSA로 설계된 분산시스템 에서는 데이터의 일관성을 유지 하는것은 매우 중요하다.아웃박
soobysu.tistory.com
로컬 트랜잭션을 사용하여 Transcation OutBox Pattern 을 구현하고 이벤트 발행을 보장 하게 만들었다.
그런데 여기서 Transcation OutBox Pattern 의 두가지 방법의 특징은
폴링
장점
- 구현 방법이 쉽다 ( 기술적 제약이 적음 )
단점
- 실시간성이 보장되지 않는다 ( 10초 단위로 불러온다면 최대 10초까지 데이터 지연이 발생 할 수 있다)
- 지속적인 요청으로 db에 부하를 줄 수 있다.
- 실제 데이터가 없더라도 폴링을 계속 수행해야 하므로 비효율 적일 수 있다.
CDC
DB의 로그를 가져와 변경사항을 추적한다
장점
- 실시간 성이 보장된다
- 부하 감소
단점
- 시스템 복잡도 증가
- 유지보수 어려움
CDC 가 효율적이고 좋아 보이긴 하지만 CDC 를 구성하기 위해 검토해야 될 사항들이 몇가지 더 생기고
이것을 유지보수 하려면 그것 또한 리소스의 낭비어서 많은 회사에서 폴링 기법으로 사용 중이라고 한다.
폴링 단점 극복
- 폴링기법의 @Async 메소드를 통해 바로 이벤트를 발행 하여 이벤트 발행이 실패하지 않는 한 실시간 성이 보장되게 만든다.
- 재시도 요청 ( Dead Letter Queue ) 를 만들어 실패한 이벤트만 요청하여 이벤트를 정상적으로 발행 할 수 있게 구현 한다.
코드 구현
Order
@Transactional
fun create(requestDto: CreateOrderRequestDto): OrderResponseDto {
val order = Orders(
requestDto.orderer,
requestDto.shippingAddr,
requestDto.shippingMemo
)
order.addProducts(requestDto.orderItems)
save(order)
//이벤트 발행
val event = eventManager.create(KafkaTopics.ORDER, EventOperation.ORDER_COMPLETED, order)
kafkaProducer.send(event)
//결제 시스템 완료 후 주문 완료 이벤트 발행
return OrderResponseDto(order.orderId)
}
//EventManager
fun create(topics: KafkaTopics, operation: EventOperation, body: Any): Event {
val jsonBody = objectMapper.writeValueAsString(body)
return Event(topics, operation , jsonBody)
}
//kafkaProducer
@Async
fun send(event: Event) {
val record = ProducerRecord(event.targetName.name,event.operation.name,event.body)
try {
kafkaTemplate.send(record)
}catch (e: Exception){
if (event.status == EventStatus.PENDING){
event.failSend()
deadLetterRepository.save(event)
}
}
}
로컬 트랜잭션으로 묶어서 주문 생성과 결제API 호출의 원자성을 보장한다.
트랜잭션 종료와는 상관없이 이벤트를 발행 [send()] 하고 실패시 deadLetterRepository에 이벤트 실패를 저장 한다.
실패한 이벤트는 스캐쥴과 배치를 통해 지속적으로 이벤트를 다시 발행 할 수 있게 만들어 준다.
@Scheduled(fixedRate = 10000)
fun runDeadLetterJob(){
val jobParameter = JobParametersBuilder().addString("date", LocalDateTime.now().toString()).toJobParameters()
jobLauncher.run(batchJob.deadLetterJob(), jobParameter)
}
Stock ( Kotlin code )
fun decrease(requestDto: UpdateRequestDto) {
requestDto.products.map { item ->
val stock = repository.findByProductId(UUID.fromString(item.productId))
stock.quantity = stock.quantity - item.quantity
saveModel(stock)
log.info("stock decrease : $stock")
}
}
// 롤백, 주문 취소에 사용 됌
fun increase(requestDto: UpdateRequestDto) {
requestDto.products.map { item ->
val stock = repository.findByProductId(UUID.fromString(item.productId))
stock.quantity = stock.quantity + item.quantity
saveModel(stock)
log.info("stock increase : $stock")
}
}
재고 차감과 재고 복구 기능을 만들어준다.
@Transactional
@KafkaListener(topics = ["ORDER"], groupId = "stock-group")
fun orderListener(record: ConsumerRecord<String, String>) {
when (record.key()){
ORDER_COMPLETED.name -> {
val requestDto = objectMapper.readValue(record.value(), UpdateRequestDto::class.java)
stockService.decrease(requestDto)
info("Successfully decreased stock: $requestDto")
}
ORDER_CANCELLED.name -> {
val requestDto = objectMapper.readValue(record.value(), UpdateRequestDto::class.java)
stockService.increase(requestDto)
info("Successfully increased stock: $requestDto")
}
}
}
카프카 리스너를 통해 이벤트를 발행한 토픽을 구독하고
val event = eventManager.create(KafkaTopics.ORDER, EventOperation.ORDER_COMPLETED, order)
이벤트 발행시 작성해둔 키 값을 기준으로 밸류들을 DTO와 매핑시켜주고
재고 차감을 진행한다.
Ship ( JAVA code )
public void register(ShippingRequestDto req) {
Ship ship = req.toEntity();
repository.save(ship);
log.info("Ship registered: {}", ship);
}
public void cancel(ShippingCancelRequestDto req) {
repository.findByOrderId(UUID.fromString(req.getOrderId()))
.ifPresent(repository::delete);
log.info("Ship canceled: {}", req.getOrderId());
}
배송 저장과 배송 삭제 기능을 만들어준다.
@KafkaListener(topics = "ORDER", groupId = "ship-group")
public void orderListener(ConsumerRecord<String, String> record) throws JsonProcessingException {
KafkaOperationType operationType = KafkaOperationType.valueOf(record.key());
switch (operationType) {
case ORDER_COMPLETED:
ShippingRequestDto request = objectMapper.readValue(record.value(), ShippingRequestDto.class);
shipService.register(request);
break;
case ORDER_CANCELLED:
ShippingCancelRequestDto req = objectMapper.readValue(record.value(), ShippingCancelRequestDto.class);
shipService.cancel(req);
break;
}
}
재고 서비스와 마찬가지로 키 값을 기준으로 밸류를 DTO와 매핑시키고 배송 저장을 한다.
주의해야 할 점
매핑할 DTO 직렬화 / 역직렬화가 이루어지므로 필드 이름을 신경써야 한다.
실행
{
"orderer": "홍길동",
"shippingAddr": "서울특별시 강남구 테헤란로 123",
"shippingMemo": "문 앞에 놓아주세요",
"orderItems": [
{
"productId": "331daa93-2483-4cdd-911c-3cb678fb4e0b",
"productName": "만년삼",
"quantity": 2
},
{
"productId": "88439662-0785-401b-89c3-c100261045bc",
"productName": "화분",
"quantity": 2
}
]
}
재고 서비스
배송 서비스
모두 데이터가 잘 처리 된것을 볼 수 있다.
'개-발 > Java + Spring + Kotlin' 카테고리의 다른 글
[Java] 정규표현식 regex 패키지 (0) | 2024.07.29 |
---|---|
[Java] 정규표현식 regex (0) | 2024.07.29 |
[MSA] Transaction Outbox 패턴 (0) | 2024.07.10 |
[Spring] JDK Dynamic Proxy 와 CGLIB (0) | 2024.04.24 |
[Spring] Transaction 트랜잭션의 이해 (0) | 2024.04.24 |