프로그램 내에서 코드로 카프카 관리하기
5.1 AdminClient 개요
카프카에서는 AdminClient를 활용
•
토픽 목록 조회, 생성, 삭제, 클러스터 상세 정보 확인, ACL 관리 등 다양한 기능을 제공.
•
이전까지는 토픽이 있는 걸 확인할 수 없어서 producer.send() 메서드 내에서 UNKNOWN_TOPIC_OR_PARTITION예외가 발생하는 걸 잡아서 처리.
•
AdminClient는 이런 문제점을 해결
•
AdminClient.createTopics메서드는 토픽이 생성될 때까지 기다리거나, 생성 상태를 확인 후 토픽 상태 설정을 가져올 수 있음
•
하지만 카프카 컨트롤러부터 브로커로의 메타데이터 전파는 비동기적으로 이루어짐
◦
AdminClient가 전달하는 정보는 실시간으로 동기화될 수는 없지만, 언젠가는 동기화됨.
5.2 Admin Client 사용법: 생성, 설정, 닫기
val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
val admin = AdminClient.create(props)
admin.close(Duration.ofSeconds(30))
Scala
복사
close()를 실행할 때는 아직 진행중인 작업이 있을 수 있다는 것을 유의해야 함.
close()는 30초동안 모든 작업을 기다리고, 그 이후의 작업에 대해서는 예외를 발생하게 됨
client.dns.lookup
카프카는 호스트명을 기준으로 연결을 검증하고 해석하고 생성.
이 과정 중 DNS를 사용하는 경우와 IP를 활용하는 경우 각각의 설정이 필요.
1.
DNS를 활용하는 경우
all-brokers.hostname.com을 활용하면 쉽게 연결할 수 있지만, SASL인증을 활요할 경우 문제가 생길 수 있습니다.
SASL은 Kafka 프로토콜이 데이터 교환 과정에서 Kafka가 지원하는 Kerberos, PLAIN, SCRAM, OAUTHBEARER 등의 메커니즘을 사용하여 인증/인가를 할 수 있도록 해주며,
인증/인가 교환이 성공했을 때, 후속 데이터 교환을 데이터 보안 계층 위에서 할 수 있도록 해 주는 기술 입니다.
또한 특정 브로커만을 연결할 수 있도록 할 수 있지만, 다음과 같이 브로커 DNS가 일치하지 않을 경우 resolve_cannonical_bootstrap_servers_only설정을 수행하면 됩니다.
broker1.hostname.com
broker2.hostname.com
broker3.hostname.com
Plain Text
복사
이는 DNS를 펼치게 적용하기에 DNS 별칭에 포함된 모든 브로커 이름을 일일이 부트스트랩 서버 목록에 넣어 준 것과 동일하게 작동합니다.
2.
다수의 IP주소로 연결되는 DNS 이름을 사용하는 경우
하나의 DNS가 다양한 IP로 연결될 수 있기에 use_all_dns_ips를 활용하면 안전하게 클라이언트에서 연결할 수 있습니다.
request.timeout.ms
•
AdminClient의 응답을 기다리는 최대 값을 지정합니다.
5.3 토픽 관리 기능
val topics = admin.listTopics()
topics.names().asScala.foreach(println)
Scala
복사
adminClient는 모든 응답을 Future객체로 감싸어 리턴.
•
따라서 적절하게 Future객체를 처리하는 것이 필요.
•
또한 토픽 리스트 출력 뿐만아니라, 파티션 리스트, 토픽 생성, 삭제 등도 제공.
•
어플리케이션에서의 토픽 삭제는 돌이킬 수 없음.
•
주의하여 사용하기를 권장.
5.4 설정 관리
ConfigResource객체를 활용하여 설정 관리를 진행할 수 있음.
5.5 컨슈머 그룹 관리
카프카는 컨슈머 그룹마다 이전에 데이터를 읽어서 처리한 것과 완전히 동일한 순서로 데이터를 재처리할 수 있게 해줌.
•
어플리케이션 내에서 데이터 재처리 기능을 미리 구현해놓았다는 의미.
•
즉 adminClient를 활용하면 컨슈머 그룹과 이 그룹들이 커밋한 오프셋을 조회하고 수정할 수 있음.
admin.listConsumerGroups().asScala.map(_.groupId()).toSet
Scala
복사
•
조회 방법은 책의 소스를 참고하도록 하고 그렇다면 컨슈머 그룹의 무엇을 수정할까요?
◦
그룹 삭제
◦
멤버 제외
◦
커밋된 오프셋 삭제 혹은 변경
•
등의 기능을 제공합니다.
•
이 중 가장 많이 쓰이는 것은 오프셋 변경 기능이 가장 자주 사용됨.
오프셋 삭제
•
컨슈머를 맨 처음부터 실행시키는 가장 간단한 방법으로 보일 수 있지만, 이는 컨슈머 설정에 의존적.
•
컨슈머가 시작됐는데 커밋된 오프셋을 못 찾을 경우, 컨슈머는 토픽의 맨 앞에서 부터 처리를 시작하게 됨
(컨슈머의 리셋)
•
이러한 오프셋 토픽의 변경은 컨슈머 그룹에 변경 여부는 전달되지 않음
•
컨슈머 그룹은 컨슈머가 새로운 파티션을 할당 받거나 새로 시작할 때만 오프셋을 토픽에 저장된 값을 읽어올 뿐.
•
컨슈머 그룹이 돌아가고 있는 상태에서 오프셋을 변경하고자 한다면
UnknownMemeberIdException이 발생
5.7 고급 어드민 작업
1.
토픽에 파티션 추가하기
•
토픽에 파티션을 추가할 때 어플리케이션 데이터들이 깨질 수 있습니다.
•
따라서 토픽 용량 한계를 늘리기 위해 파티션을 늘리는 상황은 드뭅니다.
2.
토픽에서 특정 레코드 삭제하기
3.
리더 선출 규칙 변경
•
이는 선호 리더 선출 기법과 언클린 리더 선출 기법이 존재합니다.
4.
레플리카 재할당
•
레플리카를 하나의 브로커에서 다른 브로커로 옮기는 것은 재할당을 하는것과 동일합니다.
5.8 카프카 테스트하기
•
아파치 카프카는 원하는 수만큼의 브로커를 설정해서 초기화할 수 있는 MockAdminClient테스트 클래스를 제공.
•
실제 어드민 작업을 수행할 필요 없이 어플리케이션이 작동하는지 테스트 가능.
•
Mock 카프카 객체가 특정 토픽을 가지는지, 토픽이 정상적으로 만들어지는지에 대한 테스트를 진행