KafkaTransactionManager ( 카프카 트랜잭션 처리 )
스프링 부트에서 트랜잭션을 사용하기 위해서는 단순히 spring.kafka.producer.transaction-id-prefix값을 설정하기만 하면 된다. (application.yml 혹은 KafkaProducerConfig에 설정)
스프링 부트에서는 자동적으로 KafkaTransactionManager bean을 구성하고 listener 컨테이너를 연결한다.
@Bean
public ProducerFactory<String, JsonSerializable> producerFactory() {
...
DefaultKafkaProducerFactory<String, JsonSerializable> factory = new DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix("tx-"); // 추가
return factory;
}
@Test
public void register() throws InterruptedException {
userService.register();
Thread.sleep(1000);
}
Getting transaction for [com.example.user.service.UserService.register]
Sending: ProducerRecord(topic=event-user-topic, partition=null, ...
Sent: ProducerRecord(topic=event-user-topic, partition=null, ...
Sent ok: ProducerRecord(topic=event-user-topic, partition=null, ...
userSubscriber: {
"name" : "홍길동"
}
Completing transaction for [com.example.user.service.UserService.register]
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@38022758] commitTransaction()
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@38022758] close(PT5S)
실행결과를 보면 이전에는 메시지 전송 후에 트랜잭션을 종료했지만 여기서는 Completing transaction 이 후
Kafka commitTransaction()이 나오는 것을 보니 DB 트랜잭션이 끝나고 kafka 트랜잭션을 커밋을 하는 것을 알 수 있다.
그런데 한 가지 이상한 점은 producer 입장에서는 트랜잭션을 종료한 이후에 이벤트를 커밋했는데
subscriber에서는 트랜잭션이 커밋된 이후 데이터를 읽는 것이 아닌 커밋 전의 데이터를 읽는 문제가 있다.
즉, 트랜잭션 커밋이 완료되면 subscriber가 받을 것으로 기대했지만, 그렇지 않은 것이다.
실패 케이스
@Test
public void registerWithFail() throws InterruptedException {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
userService.register(null);
Thread.sleep(1000);
});
}
Getting transaction for [com.example.user.service.UserService.register]
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@8dedec8] beginTransaction()
Sent: ProducerRecord(topic=event-user-topic, partition=null,
Sent ok: ProducerRecord(topic=event-user-topic, partition=null,
received userEvent: {
"name" : null
}
Completing transaction for [com.example.user.service.UserService.register] after exception: java.lang.IllegalArgumentException
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@8dedec8] abortTransaction()
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@8dedec8] close(PT5S)
Completing transaction for .............. after exception: java.lang.IllegalArgumentException
위 로그를 보면 Producer는 트랜잭션이 실패해도 데이터를 보내고 있다.
한가지 다른점은
DB트랜잭션이 실패 하면 kafka Producer는 abortTransaction() 을 보낸다.
프로듀서는 성공 실패와 관계없이 메세지를 항상 발행한다.
Kafka Consumer 트랜잭션
Consumer config를 보면 기본값으로 read_uncommitted로 설정이 되어 있다.
read_uncommited는 아직 커밋되지 않은 레코드도 읽을 수 있다 - 트랜잭션이 실패(abort)한 레코드도 읽는다.
커밋된 레코드만 읽으려면 read_uncommitted를 read_committed로 변경을 하면 된다.
spring:
kafka:
consumer:
isolation-level: read_committed
동일한 코드로 실패 실행결과
Getting transaction for [com.example.user.service.UserService.register]
Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@226e95e9]
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@226e95e9] beginTransaction()
Sending: ProducerRecord(topic=event-user-topic, partition=null,
Sent: ProducerRecord(topic=event-user-topic, partition=null
Sent ok: ProducerRecord(topic=event-user-topic, partition=null,
Completing transaction for [com.example.user.service.UserService.register] after exception: java.lang.IllegalArgumentException
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@226e95e9] abortTransaction()
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@226e95e9] close(PT5S)
프로듀서에서 보내고 있지만
Sent ok: ProducerRecord(topic=event-user-topic, partition=null,
이제 트랜잭션이 종료된 이후에 subscriber가 메시지를 수신하는 것을 알 수 있다. 이제 모든 subscriber는 트랜잭션이 종료된 메시지를 수신한다.
성공 실행결과
Getting transaction for [com.example.user.service.UserService.register]
Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@6ed7c178]
.KafkaProducer@6ed7c178] beginTransaction()
Sending: ProducerRecord(topic=event-user-topic, partition=null,
Sent: ProducerRecord(topic=event-user-topic, partition=null,
Sent ok: ProducerRecord(topic=event-user-topic, partition=null,
Completing transaction for [com.example.user.service.UserService.register]
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@6ed7c178] commitTransaction()
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@6ed7c178] close(PT5S)
received userEvent: {
"name" : "홍길동"
}
위와 같은 설정을 하면 카프카의
3-1 TransactionalEventListener 활용방식
@Transactional
public void register() {
User user = new User(UUID.randomUUID().toString(), 1);
entityManager.persist(user);
applicationEventPublisher.publishEvent(new UserEvent(EventType.Created, user.getId()));
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void send(UserEvent userEvent) {
kafkaTemplate.send("TEST1", userEvent);
}
방식을 사용하면 트랜잭션이 완료된 후
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
Transaction의 Commit 이후에만 이벤트가 발행되며, 예외가 발생한 경우에는 아예 이벤트가 발행되지 않게된다.
TransactionalEventListener를 활용하게 된다면 Kafka Transacation 적용시 Consumer가 알아두어야할 사항들에 대해서 고려하지
않아도 된다는 장점이 존재한다.
'개-발 > Infra' 카테고리의 다른 글
[NAS] 시놀로지 Nas 도메인 설정 (0) | 2024.08.20 |
---|---|
[Docker] 이미 생성된 컨테이너의 포트 바꾸기 (0) | 2024.08.20 |
[AWS] aws ElastiCache cache 서버Redis 구성하기 (0) | 2024.06.27 |
[AWS S3] 특정 확장자 업로드 방지 (0) | 2024.05.31 |
[EasticSearch ] BulkApi 사용법 ( action_request_validation_exception ) (0) | 2024.05.28 |