728x90
Check 사항
앞서 작성했던 docker-compose.yml
KAFKA_ADVERTISED_LISTENERS( 외부 접속을 담당 ) 에 EXTERNAL IP를 넣어주어야 한다.
KAFKA_LISTENERS 는 카프카 내부에서 연결해주는 옵션이니 모든 외부접속을 열어준다.
로컬에 띄었다면 저부분을 localhost로 설정.
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: INTERNAL://:19092,EXTERNAL://:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://공인IP:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_LOG_DIRS: /kafka-logs
Dependency 추가하기
builde.gradle
// kafka 관련 설정
implementation 'org.springframework.kafka:spring-kafka'
// logback-kafka-appender 관련 설정
implementation "com.github.danielwegener:logback-kafka-appender:0.1.0"
logback-spring.xml
logback-spring.xml 파일을 만들고,
src/main/resources 경로에 넣어준다 ( main 밑에 resources폴더가 없으면 만들어서 넣어준다. )
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property resource="application.properties" />
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<!-- <Pattern>%d{yyyy-MM-dd'T'HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>-->
<Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
<!-- <Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>-->
</layout>
</appender>
<!-- Layout 1 : kafkaAppender -->
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
<!-- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>-->
</layout>
</encoder>
<topic>토픽명</topic>
<producerConfig>bootstrap.servers=공인IP:9092</producerConfig>
<neverBlock>true</neverBlock>
</appender>
<!-- Async 한 KafkaAppender 추가 -->
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="kafkaAppender"/>
</appender>
<!-- logger -->
<logger name="org.apache.kafka" level="ERROR"/>
<logger name="com.minsub.java.logger.kafka" level="DEBUG">
<!-- <appender-ref ref="logstashKafkaAppender" />-->
</logger>
<root level="DEBUG">
<appender-ref ref="STDOUT"/>
</root>
<root level="INFO">
<appender-ref ref="ASYNC"/>
</root>
</configuration>
<topic>토픽이름</topic>
<producerConfig>bootstrap.servers=공인IP:9092</producerConfig>
이 부분을 수정해준다. - logback-spring.xml의 민감한 정보들은 환경변수로 처리해준다.
환경변수로 바꾸기 >>> https://soobysu.tistory.com/112
- appender로 KafkaAppender 지정
- encoding 방식으로 PatternLayoutEncoder 사용
- KafkaAppender의 구성
topic 태그 : 발행할 토픽에 대한 값이 들어있어야 함
producerConfig 태그 : 카프카 프로듀서의 properties를 설정을 위한 'key=value'값이 들어감 - 카프카 브로커의 bootstrap-server은 필수적인 요소라 꼭 추가해야 함
- 나머지 설정들은 CONFLUENT Producer Configurations을 참고
- KafkaAppender의 appender-ref : produce 과정이 실패했을 때 출력할 appender를 지정
- kafkaAppender를 가진 kafkaLogger 만들기
additivity="false" : kafkaLogger에서 출력하는 로그 이벤트는 상위 로거로 전파되지 않도록 설정 - AsyncAppender 생성 : Kafka의 메타데이터의 교환이 일어날 때, 블록킹 방식을 피할 수 있음
메타 데이터 교환시 로그 이벤트는 브로커로 전달되지 못함
logging context는 최대 브로커에서 지정한 'metadata.max.age.ms' 시간만큼은 블록킹 상태로 머물게 되기 때문
Kafka 준비
#카프카 컨테이너 접속
docker exec -it 카프카컨테이너명 /bin/bash
#topic 생성
kafka-topics.sh --create --bootstrap-server 공인아이피:9092 --topic 토픽이름
#topic 구독 후 대기
kafka-console-consumer.sh --bootstrap-server 공인ip:9092 --topic 토픽이름 --from-beginning
Log 생성
@RestController
@RequestMapping("/test")
public class TestController {
private static final Logger kafkaLogger = LoggerFactory.getLogger("kafkaLogger");
@PostMapping("/hello")
public void hello(@RequestParam(value = "name") String name) {
kafkaLogger.info("name:{}", name);
}
}
아래 URL로 http Request보내면 info 레벨로 설정한 로그가 찍히면서 로그가 남는다.
localhost:8080/test/hello?name=deingvelop
728x90
'개-발 > Infra' 카테고리의 다른 글
[Elastic] 유저 추가 (0) | 2023.06.17 |
---|---|
[Kafka] kafka 용어 간단정리 (0) | 2023.06.16 |
[Kafka] CMAK 브로커 클러스터 구축하기 (1) | 2023.06.14 |
[NCP] kafka 터미널 데이터 전송 (0) | 2023.06.12 |
[NCP] Ncp kafka VM 생성 (0) | 2023.06.12 |