Problem
카프카로 서버간의 restAPI 통신을 구현을 하려고 하던 중...
게시판에는 멤버정보 들어간다.
게시판 생성요청 정보에는
게시판 제목
멤버 id
게시판 완료
게시판 제목, 멤버 이름(작성자)
게시판 생성요청시에 작성자id 를 가지고 멤버 서버로 멤버 정보를 요청하는 방식을 해보려고 한다.
카프카를 사용할때
1 . 게시판서버 게시판생성메서드에서 프로듀서로 작성자id 를 특정 토픽에 보내고
2 . 멤버서버에서 컨슈머 리스너를 통해 해당 토픽에 요청정보가 들어오면
3 . 요청을 처리하고 다시 객체를 반환 해줘야 하는데 이것을 다시 프로듀서로 작성하고
4 . 게시판서버에서 리스너로 응답받은 객체를 생성메서드에서 처리를 완료하여 게시판객체를 완성시키려고 했다..
찾아보니 컨슈머리스너는 void 이어야 한다고 하는데..
그렇게 되면 요청과 응답을 받는은 한 메서드에서 처리 될 수 있는걸까 ?
찾아본 방법으로
idea 1. CompletableFuture
자바에서 제공하는 CompletableFuture 를 사용하여 자바내에서 응답을 대기하는 방법을 생각했지만
idea 2. kafka rest proxy
kafka rest proxy 는 구현이 쉬운 반면 kafka의 기능을 온전히 사용하지는 못한다고 한다.
idea 3. ReplyingKafkaTemplate
ReplyingKafkaTemplate 은 기존 KafkaTemplate을 사용한 것으로 KafkaTemplate 의 모든 기능을 사용 가능하다.
solution
Apache Kafka는 스트리밍 데이터와 다양한 생산자와 소비자 간의 디커플링을 위해 구축되었기 때문에 정말 필요한 상황이 아니면 Kafka와 요청-응답 개념을 사용하지 않는 것이 좋지 않다고 한다. 하지만 필요한 상황에서는 Spring kafka 템플릿을 사용하여 비동기식 요청-응답을 구현할 수 있습니다. 간단한 예제를 통해 사용 방법을 익혀보았습니다.
Idea 1. CompletableFuture
자바에서 제공하는 CompletableFuture 를 사용하여 자바내에서 응답을 대기하는 방법을 생각했지만
코드가 생각보다 복잡해 졌다.
Idea 2. ReplyingKafkaTemplate
프로듀서와 컨슈머설정을 해준다.
직렬화 / 역직렬화를 Json , String 을 보통 선택하는데
Json을 선택하면 [Object 매퍼로 String > Json 으로 변환하지 않고 그대로 사용가능하지만]
카프카의 헤더에 패키지명이 같이 담겨 오기 때문에 후 처리를 해주어야 한다. 이것때문에 애먹음 ㅜㅜ
헤더무시 , 패키지명 기재를 해야한다.
그래서 Json형태의 String 값을 가져와 직렬화 역질렬화를 통해 응답 값을 가져오고
그 응답값을 다시 오브젝트 매퍼를 통해 객체로 변환 시켜 줄 것이다.
Post객체를 생성하는 메서드이다. Member정보는 Member서버에서 관리한다.
Post객체에는 MemberId와 MemberName(작성자) 가 필요하다.
물론 요청시에 받아 오는 방법도 있지만 Kafka 비동기 통신이 목적이기 때문에 억지스러운 예제이다.
//KafkaConfig.java
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value("${devsoo.kafka.restapi.topic}")
private String requestReplyTopic;
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
//에러핸들러로 한번 감싼 후 String으로 역직렬화 해준다
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "post-server-consumer-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate() {
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory(), replyContainer());
replyingKafkaTemplate.setDefaultReplyTimeout(Duration.ofMillis(5000));
return replyingKafkaTemplate;
}
@Bean //컨슈머리스너가 사용할 컨테이너
public KafkaMessageListenerContainer<String, String> replyContainer() {
//컨슈머를 통해 받아올 토픽을 넣어준다
ContainerProperties containerProperties = new ContainerProperties("replyingTopic");
return new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(false);
//poll 지속시간 (지속시간이 끝나면 대기로 바뀜)
factory.getContainerProperties().setPollTimeout(1000);
//idl상태 지속시간
factory.getContainerProperties().setIdleEventInterval(10000L);
//동시에 처리되는 메시지의 수(컨슈머 개수)
factory.setConcurrency(3);
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
}
//replyRecord
public Object replyRecord(Object requestData) throws ExecutionException, InterruptedException, JsonProcessingException {
//객체를 Json형태의 String 값으로 변환
String jsonData = objectMapper.writeValueAsString(requestData);
//프로듀서 레코드로 변환
ProducerRecord<String, String> record = new ProducerRecord<String, String>(kafkaRestApiTopic,jsonData);
//리플라이 헤더 추가 밑에서 설명 *1
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, kafkaRestApiTopic.getBytes()));
//replyingKafkaTemplate 요청 정보 보내고 응답받기
RequestReplyFuture<String, String, String> sendAndReceive = replyingKafkaTemplate.sendAndReceive(record);
//응답을 컨슈머레코즈로 만들기
ConsumerRecord<String, String> consumerRecord = sendAndReceive.get();
// 응답 밸류 반환
return consumerRecord.value();
}
*1 카프카의 헤더( 메타데이터 저장 )를 명시함으로서
요청 - 응답이 한 쌍으로 연결 짓게 해준다 ( 여러 요청이 와도 a요청 - a응답 , b요청 - b응답 매핑시켜준다 )
@Bean
public ReplyingKafkaTemplate<String, String, String> replyKafkaTemplate() {
//프로듀서의 셋팅과 컨슈머의 셋팅을 합쳐 하나의 템플릿으로 만들기
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory(), replyContainer());
replyingKafkaTemplate.setDefaultReplyTimeout(Duration.ofMillis(5000));
return replyingKafkaTemplate;
}
@Bean //< 프로듀서 서버 관점 > 컨슈머 셋팅
public KafkaMessageListenerContainer<String, String> replyContainer() {
//응답 받아 올 토픽 지정
ContainerProperties containerProperties = new ContainerProperties("replyingTopic");
return new KafkaMessageListenerContainer<>(consumerFactory(), containerProperties);
}
데이터 흐름 ->
프로듀서레코즈 생성 (프로듀싱 보낼 토픽선택) -> 프로듀서 레코즈 헤더 지정 (셋팅) -> 리플라잉템플릿으로 카프카에 보내기 ->
여기 까지가 A 서버 ( 보내는 서버의 역할이다 )
B서버 받아서 요청을 처리한 후 응답 보내기 > B서버는 코틀린 언어로 준비 했다.
//kafkaConf
@Configuration
class KafkaConfig (
@Value("\${spring.kafka.bootstrap-servers}")
val bootstrapAddress: String
) {
@Bean
fun producerFactory(): ProducerFactory<String, String> {
val props: MutableMap<String, Any> = HashMap()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(props)
}
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
val props: MutableMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java
props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = StringDeserializer::class.java
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = true
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
props[ConsumerConfig.GROUP_ID_CONFIG] = "member-server-consumer-group"
return DefaultKafkaConsumerFactory(props)
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
@Bean //컨슈머리스너가 사용할 팩토리 설정
fun kafkaListenerContainerFactory(): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
factory.setReplyTemplate(kafkaTemplate())
return factory
}
//받아 올 토픽 이름
@KafkaListener(topics = ["\${devsoo.kafka.restapi.request.topic}"], groupId = "member-server-consumer-group")
//전달 할 토픽 이름
@SendTo("\${devsoo.kafka.restapi.reply.topic}")
fun getMemberRequestDto(@Payload data: String): String {
//String 으로 받아오기 때문에 객체로 casting 해주어야 한다.
val requestDto: KafkaMemberValidateRequestDto = objectMapper.readValue(data, KafkaMemberValidateRequestDto::class.java)
//요청 처리
val member = memberService.getInstance(requestDto)
//요청 처리 후 반환
return objectMapper.writeValueAsString(member.toKafkaResponseDto())
}
에러 핸들러를 감싼 이유는 아래 글을 참고 하자.
https://soobysu.tistory.com/134
결과
처음엔 하나의 토픽에 요청과 응답을 다 넣으려고 했는데 카프카가 너무 빨라서 인지 대기 시간을 걸어놔도 해당 토픽의 값을 가져오지 못하였다. 계속 하나의 토픽으로만 해결 하려고 했는데 실패 했다.
카프카는 요청과 응답을 분리 해야 한다는......... 교훈을 깨달았다..
하루종일 머리 깨지고 신의 한수로 찾은 답변..
참고 문헌 :
https://dodop-blog.tistory.com/427
https://asuraiv.tistory.com/11
'개-발 > Infra' 카테고리의 다른 글
[ElsticSearch] 엘라스틱서치 개념과 Query (0) | 2024.04.09 |
---|---|
[Redis] 선착순 이벤트 구현 레디스 분산락 (Redisson DistributeLock) (0) | 2024.03.03 |
[Kafka + SSE] 실시간 알림 메세지 구현 (0) | 2024.01.10 |
[Docker + NginX + Jenkins] Jenkins pipline verbose 옵션 CICD번외 편 (0) | 2023.12.13 |
[Docker + NginX + Jenkins] Spring 프로젝트 Blue,Green 무중단 배포 CI CD 2/2 (2) | 2023.12.03 |