Search

카프카 핵심 가이드 5장

프로그램 내에서 코드로 카프카 관리하기

5.1 AdminClient 개요

카프카에서는 AdminClient를 활용
토픽 목록 조회, 생성, 삭제, 클러스터 상세 정보 확인, ACL 관리 등 다양한 기능을 제공.
이전까지는 토픽이 있는 걸 확인할 수 없어서 producer.send() 메서드 내에서 UNKNOWN_TOPIC_OR_PARTITION예외가 발생하는 걸 잡아서 처리. 
AdminClient는 이런 문제점을 해결
AdminClient.createTopics메서드는 토픽이 생성될 때까지 기다리거나, 생성 상태를 확인 후 토픽 상태 설정을 가져올 수 있음
하지만 카프카 컨트롤러부터 브로커로의 메타데이터 전파는 비동기적으로 이루어짐
AdminClient가 전달하는 정보는 실시간으로 동기화될 수는 없지만, 언젠가는 동기화됨.

5.2 Admin Client 사용법: 생성, 설정, 닫기

val props = new Properties() props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") val admin = AdminClient.create(props) admin.close(Duration.ofSeconds(30))
Scala
복사
close()를 실행할 때는 아직 진행중인 작업이 있을 수 있다는 것을 유의해야 함. 
close()는 30초동안 모든 작업을 기다리고, 그 이후의 작업에 대해서는 예외를 발생하게 됨
client.dns.lookup
카프카는 호스트명을 기준으로 연결을 검증하고 해석하고 생성.
이 과정 중 DNS를 사용하는 경우와 IP를 활용하는 경우 각각의 설정이 필요.
1.
DNS를 활용하는 경우
all-brokers.hostname.com을 활용하면 쉽게 연결할 수 있지만, SASL인증을 활요할 경우 문제가 생길 수 있습니다.
SASL은 Kafka 프로토콜이 데이터 교환 과정에서 Kafka가 지원하는 Kerberos, PLAIN, SCRAM, OAUTHBEARER 등의 메커니즘을 사용하여 인증/인가를 할 수 있도록 해주며, 인증/인가 교환이 성공했을 때, 후속 데이터 교환을 데이터 보안 계층 위에서 할 수 있도록 해 주는 기술 입니다.
또한 특정 브로커만을 연결할 수 있도록 할 수 있지만, 다음과 같이 브로커 DNS가 일치하지 않을 경우 resolve_cannonical_bootstrap_servers_only설정을 수행하면 됩니다.
broker1.hostname.com broker2.hostname.com broker3.hostname.com
Plain Text
복사
이는 DNS를 펼치게 적용하기에 DNS 별칭에 포함된 모든 브로커 이름을 일일이 부트스트랩 서버 목록에 넣어 준 것과 동일하게 작동합니다.
2.
다수의 IP주소로 연결되는 DNS 이름을 사용하는 경우
하나의 DNS가 다양한 IP로 연결될 수 있기에 use_all_dns_ips를 활용하면 안전하게 클라이언트에서 연결할 수 있습니다.
request.timeout.ms
AdminClient의 응답을 기다리는 최대 값을 지정합니다.

5.3 토픽 관리 기능

val topics = admin.listTopics() topics.names().asScala.foreach(println)
Scala
복사
adminClient는 모든 응답을 Future객체로 감싸어 리턴.
따라서 적절하게 Future객체를 처리하는 것이 필요.
또한 토픽 리스트 출력 뿐만아니라, 파티션 리스트, 토픽 생성, 삭제 등도 제공.
어플리케이션에서의 토픽 삭제는 돌이킬 수 없음.
주의하여 사용하기를 권장.

5.4 설정 관리

ConfigResource객체를 활용하여 설정 관리를 진행할 수 있음.

5.5 컨슈머 그룹 관리

카프카는 컨슈머 그룹마다 이전에 데이터를 읽어서 처리한 것과 완전히 동일한 순서로 데이터를 재처리할 수 있게 해줌.
어플리케이션 내에서 데이터 재처리 기능을 미리 구현해놓았다는 의미.
즉 adminClient를 활용하면 컨슈머 그룹과 이 그룹들이 커밋한 오프셋을 조회하고 수정할 수 있음.
admin.listConsumerGroups().asScala.map(_.groupId()).toSet
Scala
복사
조회 방법은 책의 소스를 참고하도록 하고 그렇다면 컨슈머 그룹의 무엇을 수정할까요?
그룹 삭제
멤버 제외
커밋된 오프셋 삭제 혹은 변경
등의 기능을 제공합니다.
이 중 가장 많이 쓰이는 것은 오프셋 변경 기능이 가장 자주 사용됨.
오프셋 삭제
컨슈머를 맨 처음부터 실행시키는 가장 간단한 방법으로 보일 수 있지만, 이는 컨슈머 설정에 의존적.
컨슈머가 시작됐는데 커밋된 오프셋을 못 찾을 경우, 컨슈머는 토픽의 맨 앞에서 부터 처리를 시작하게 됨 (컨슈머의 리셋)
이러한 오프셋 토픽의 변경은 컨슈머 그룹에 변경 여부는 전달되지 않음
컨슈머 그룹은 컨슈머가 새로운 파티션을 할당 받거나 새로 시작할 때만 오프셋을 토픽에 저장된 값을 읽어올 뿐.
컨슈머 그룹이 돌아가고 있는 상태에서 오프셋을 변경하고자 한다면 UnknownMemeberIdException이 발생

5.7 고급 어드민 작업

1.
토픽에 파티션 추가하기
토픽에 파티션을 추가할 때 어플리케이션 데이터들이 깨질 수 있습니다.
따라서 토픽 용량 한계를 늘리기 위해 파티션을 늘리는 상황은 드뭅니다.
2.
토픽에서 특정 레코드 삭제하기
3.
리더 선출 규칙 변경
이는 선호 리더 선출 기법과 언클린 리더 선출 기법이 존재합니다.
4.
레플리카 재할당
레플리카를 하나의 브로커에서 다른 브로커로 옮기는 것은 재할당을 하는것과 동일합니다.

5.8 카프카 테스트하기

아파치 카프카는 원하는 수만큼의 브로커를 설정해서 초기화할 수 있는 MockAdminClient테스트 클래스를 제공.
실제 어드민 작업을 수행할 필요 없이 어플리케이션이 작동하는지 테스트 가능.
Mock 카프카 객체가 특정 토픽을 가지는지, 토픽이 정상적으로 만들어지는지에 대한 테스트를 진행