위 강의를 참고로 작성
카프카 컨슈머 소개
데이터를 토픽의 파티션에서 가져오기 위해 운영하는 컨슈머에는 크게 두 가지 방법이 있습니다.
•
여러 개의 컨슈머로 이루어진 컨슈머 그룹을 운영합니다.
•
토픽의 특정 파티션만을 구독하는 컨슈머를 운영합니다.
컨슈머 내부 구조
•
fetcher는 레코드를 미리 가져온다.
•
ConsumerRecords는 오프셋을 포함한다.
컨슈머 그룹
•
컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징을 가지고 있음
◦
따라서 프로듀서가 보낸 데이터를 각기 다른 역할을 하는 컨슈머 그룹끼리 영향을 받지않게 처리할 수 있다는 장점이 있음.
•
한 컨슈머가 2개의 파티션을 소비하는 것은 가능하지만
•
2개의 컨슈머가 1개의 파티션을 소비하는 것은 불가능하다.
컨슈머 그룹을 사용하여 운영하는 방법
•
각 컨슈머 그룹을 격리된 환경에서 안전하게 운영할 수 있게 지원한다.
•
컨슈머 그룹에 속한 컨슈머들은 토픽의 여러 파티션에 할당되어 데이터를 가져올 수 있다.
•
컨슈머 그룹에 속한 컨슈머가 토픽을 구독하여 데이터를 가져올 때, 한 파티션은 하나의 컨슈머에만 할당될 수 있다.
•
하나의 컨슈머는 여러 파티션에 할당될 수 있다.
•
이런 특성으로 인해, 컨슈머 그룹의 컨슈머 수는 소비하려는 토픽의 파티션 수와 같거나 작아야 한다.
컨슈머 그룹의 컨슈머가 파티션 개수보다 많을 경우
예를들어 컨슈머가 파티션 개수보다 많을 경우, 파티션은 컨슈머에 1대1로 할당되고 남는 컨슈머는 데이터를 처리하지 못하고 불필요한 스레드로 남게 된다.
컨슈머 그룹을 활용하는 이유
리밸런싱
•
문제가 발생한 컨슈머에 할당된 파티션의 소유권은 문제가 없는 컨슈머로 이동합니다.
•
이런 과정을 리밸런싱이라고 합니다.
•
리밸런싱은 주로 두 가지 상황에서 발생합니다:
1. 컨슈머가 컨슈머 그룹에 추가되는 상황
2. 컨슈머가 컨슈머 그룹에서 제외되는 상황
•
이슈가 발생한 컨슈머를 제외함으로써 지속적으로 데이터 처리 가능 → 가용성 증가
•
리밸런싱이 발생하면 문제가 있는 컨슈머의 파티션은 더 이상 데이터 처리를 못하므로, 처리 지연이 발생할 수 있습니다.
◦
이를 해결하기 위해, 문제가 있는 컨슈머를 그룹에서 제거하면 모든 파티션에서 데이터 처리가 계속될 수 있습니다.
•
리밸런싱은 데이터 처리 중 언제든지 발생할 수 있으므로, 리밸런싱에 대응하는 코드를 준비해둬야 합니다.
•
물론, 리밸런싱이 자주 발생하면 성능 문제가 발생합니다.
◦
파티션의 소유권을 다른 컨슈머에게 재할당하는 과정에서 컨슈머 그룹이 토픽의 데이터를 읽지 않기 때문입니다.
•
그룹 조정자는 리밸런싱을 시작하는 역할을 하며, 컨슈머 그룹의 컨슈머가 추가되거나 삭제될 때를 감지합니다.
◦
카프카 브로커 중 하나가 그룹 조정자 역할을 수행합니다.
커밋
•
컨슈머는 카프카 브로커로부터 데이터를 어디까지 가져갔는지 커밋을 통해 기록한다.
◦
특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째 가져갔는지 카프카 브로커 내부에서 사용되는 내부 토픽에 기록된다.
◦
컨슈머 동작 이슈가 발생하여 _consumer_offsets 토픽에 어느 레코드까지 읽어갔는지 오프셋 커밋이 기록되지 못했다면 데이터 처리의 중복이 발생할 수 있다.
◦
그러므로 데이터 처리의 중복이 발생하지 않게 하기 위해서는 컨슈머 어플리케이션이 오프셋 커밋을 정상적으로 처리했는지 검증할 것
어사이너(Assignor)
•
일반적으로 토픽과 컨슈머를 1:1로 대응하므로, 크게 어사이너를 신경 쓸 필요는 없다.
컨슈머 주요 옵션 소개
•
필수옵션
옵션 명 | 설명 |
bootstrap.servers | 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름: 포트를 1개 이상 작성한다.
2개 이상 브로커 정보를 입력하여 일부 브로커에 이슈가 발생하더라도 접속하는 데에 이슈가 없도록 설정가능한다. |
key.deserializer | 레코드의 메시지 키를 역직렬화하는 클래스를 지정한다 |
value.deserializer | 레코드의 메시지 값을 역직렬화하는 클래스를 지정한다. |
•
선택옵션
옵션 명 | 설명 |
group.id | 컨슈머 그룹 id를 지정한다. subscribe() 메소드로 토픽을 구독하여 사용할 때는 이 옵션을 필수로 넣어야 한다. 기본값은 null 이다. |
auto.offset.reset | 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션이다. 이미 컨슈머 오프셋이 있다면 이 옵션 값은 무시된다.이 옵션은 latest,earliest, none중 설정가능하다.latest : 가장 최근에 넣은(높은) 오프셋부터 읽기 시작earilest : 가장 처음에 넣은(낮은) 오프셋부터 읽기 시작none : 컨슈머 그룹이 커밋한 기록이 있는지 찾아보고 커밋 기록이 없으면 오류 반환, 존재한다면 기존 커밋 기록 이후 오프셋부터 읽기시작한다 |
enable.auto.commit | 자동 커밋으로 할지 수동 커밋으로 할지 선택한다. 기본값은 true다 |
auto.commit.interval.ms | 자동 커밋(enable.auto.commit=true)일 경우 오프셋 커밋 간격을 지정한다. 기본값은 5000(5초)이다. |
max.poll.records | Poll() 메소드를 통해 반환되는 레코드 개수를 지정한다 기본값은 500이다. |
session.timeout.ms | 컨슈머가 브로커와 연결이 끊기는 최대 시간이다. 이 시간 내에 하트 비트를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다고 가정하고 리밸런싱을 시작한다. 보통 하트비트 시간 간격의 3배로 설정한다. |
heartbeat.interval.ms | 하트 비트를 전송하는 시간 간격이다. 기본값은 3000(3초)이다. |
max.poll.interval.ms | Poll() 메소드를 호출하는 간격의 최대 시간을 지정한다. poll() 메소드를 호출한 이후에 데이터를 처리하는데에 시간이 너무 많이 걸리는 경우 비정상으로 판단하고 리밸런싱을 시작 |
isolation.level | 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용한다. read_committed, read_uncommited로 설정할 수 있다.read_commited : 커밋이 완료된 레코드만 읽는다.read_uncommited : 커밋 여부와 관계없이 파티션에 있는 모든 레코드를 읽는다. |
기억할 것
•
커밋 관련해서는 자동 커밋이 기본값이다.
•
하트비트 값인 3초가 지나고 하트비트를 기다리는데, 그렇게 10초가 지났다면 리밸런싱 작업이 이뤄난다.
auto.offset.reset
•
컨슈머에 있는 선택 옵션 중 하나이다.
•
새로 만든 컨슈머 그룹을 운영하려고 할 때만 사용된다.
◦
이미 운영중이라면 기존에 오프셋을 보고 동작한다.
수동 커밋 컨슈머 어플리케이션
•
오프셋 커밋은 컨슈머 어플리케이션에서 명시적이나 비명시적으로 수행할 수 있습니다.
◦
기본적으로는 Poll() 메소드가 수행될 때 일정 간격마다 오프셋을 커밋하도록 enable.auto.commit=true로 설정되어 있습니다.
◦
이런 방식으로 일정 간격마다 자동으로 커밋되는 것을 비명시적 오프셋 커밋이라고 합니다.
◦
이 설정은 auto.commit.interval.ms의 값과 함께 사용되며, poll 메소드가 auto.commit.interval.ms의 값보다 오래 걸렸을 때, 그 시점까지 읽은 레코드의 오프셋을 커밋합니다.
◦
poll() 메소드를 호출할 때 커밋을 수행하므로, 별도로 커밋 관련 코드를 작성할 필요가 없습니다.
◦
그러나 비명시적 오프셋 커밋 은 컨슈머가 종료될 경우 처리 중인 데이터가 중복되거나 유실될 가능성이 있어 취약한 구조를 가지고 있습니다.
◦
그래서 데이터 중복이나 유실이 허용되지 않는 서비스에서는 자동 커밋을 허용해서는 안됩니다.
•
명시적인 오프셋 커밋을 하려면 poll() 메소드 호출 이후에 반환받은 데이터가 처리 완료된 후에 commitSync() 메소드를 호출하면 됩니다.
◦
commitSync() 메소드는 poll() 메소드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행합니다.
◦
이 메소드는 브로커에 커밋 요청을 하고, 커밋이 정상적으로 처리되었는지 확인하기까지 대기하며, 이는 컨슈머의 처리량에 영향을 미칩니다.
▪
이는 Round trip time 때문입니다.
•
이 문제를 해결하기 위해 commitAsync() 메소드를 사용하여 커밋 요청을 보내고, 응답이 오기 전까지 데이터 처리를 계속할 수 있습니다.
◦
하지만 비동기 커밋은 커밋 요청이 실패했을 경우 현재 처리 중인 데이터의 순서를 보장하지 않으며, 데이터가 중복 처리될 가능성이 있습니다.
아래 소스코드는 명시적 커밋을 하는 컨슈머 코드입니다.
@Slf4j
public class CommitSyncConsumer {
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// 프로듀서에서 직렬화하여 전송한 데이터를 역직렬화한다.
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/* 명시적 오프셋 커밋 */
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
// 컨슈머에게 토픽을 할당하기 위해 subscribe() 를 사용한다.
// 이 메서드는 Collection 타입의 String 타입을 받는데, 1개 이상의 토픽 이름을 받을 수 있다.
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
// poll() 메서드를 호출하여 데이터를 가져와서 처리한다.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
// for loop 를 통해 poll() 메서드가 반환한 ConsumerRecord 데이터들을 순차적으로 처리한다.
for (ConsumerRecord<String, String> record : records) {
log.info("{}", record);
}
consumer.commitSync(); // 파라미터가 없을 경우 poll()로 반환된 마지막 레코드의 오픗세 기준으로 커밋
}
}
}
JavaScript
복사
1) 컨슈머 그룹 설정
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
Java
복사
컨슈머 그룹을 통해 사용자의 목적을 구분하고, 같은 역할을 하는 컨슈머를 효율적으로 관리할 수 있습니다.
컨슈머 그룹은 컨슈머 오프셋을 관리하는 기준이므로, subscribe() 메서드를 사용해 토픽을 구독할 때는 반드시 컨슈머 그룹을 선언해야 합니다.
이는 컨슈머가 중단되거나 재시작되더라도, 컨슈머 그룹의 컨슈머 오프셋을 기준으로 이후 데이터를 처리하기 때문입니다.
컨슈머 그룹을 선언하지 않으면, 어떤 그룹에도 속하지 않는 컨슈머로 동작하게 됩니다.
2) 데이터 역직렬화
// 프로듀서에서 직렬화하여 전송한 데이터를 역직렬화한다.
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Java
복사
3) 명시적 오프셋 커밋 설정
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Java
복사
poll() 메서드 호출 이후에 반환받은 데이터의 처리가 완료되고 commitSync() 메서드를 호출하면 된다.
commitSync() 메서드는 poll() 메서드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행한다.
4) subscribe()
consumer.subscribe(Arrays.asList(TOPIC_NAME));
Java
복사
컨슈머에게 토픽을 할당하기 위해 subscribe() 를 사용한다.
이 메서드는 Collection 타입의 String 타입을 받는데, 1개 이상의 토픽 이름을 받을 수 있다.
5) poll()
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
...
}
Java
복사
poll() 메서드를 사용하여 데이터를 가져와 처리합니다. 이를 지속적으로 처리하기 위해서는 반복적으로 호출해야 합니다.
지속적으로 호출하기 위한 가장 간단한 방법은 while(true)를 통해 무한루프를 생성하는 것입니다.
이 무한루프 내에서는 poll 메서드를 이용해 데이터를 가져와, 사용자가 원하는 처리를 수행합니다.
Duration 타입을 인자로 받는데, 이 인자 값은 브로커로부터 데이터를 가져올 때, 컨슈머 버퍼에서 데이터를 대기하는 타임아웃 간격입니다.
6) poll() 메서드가 반환한 ConsumerRecord 데이터를 순차적으로 처리
for (ConsumerRecord<String, String> record : records) {
log.info("{}", record);
}
Java
복사
for loop 를 통해 poll() 메서드가 반환한 ConsumerRecord 데이터들을 순차적으로 처리한다.
7) commitSync()
consumer.commitSync(); // 파라미터가 없을 경우 poll()로 반환된 마지막 레코드의 오픗세 기준으로 커밋
Java
복사
commitSync()는 poll() 메서드로 받은 가장 최근 레코드의 오프셋을 기준으로 커밋합니다.
동기 오프셋 커밋을 사용하려면, poll() 메서드로 받은 모든 레코드를 처리한 후 commitSync() 메서드를 호출해야 합니다.
동기 커밋의 경우, 브로커에 커밋 요청을 한 후 커밋이 완료될 때까지 대기합니다.
commitSync()에 파라미터가 없을 경우, poll()로 반환된 가장 최근 레코드의 오프셋을 기준으로 커밋합니다. 만약 개별 레코드 단위로 오프셋을 커밋하려면, Map<TopicPartition, OffsetAndMetaData> 인스턴스를 파라미터로 넣어야 합니다.
브로커로부터 컨슈머 오프셋 커밋이 완료되었음을 받기까지 컨슈머는 데이터를 다 처리하지 않고 기다리기 때문에 자동 커밋이나 비동기 오프셋 커밋보다 동일 시간당 데이터 처리량이 적다는 특징이 있다.
비동기 오프셋, 커밋
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
logger.info("{}","start");
while(true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord<String,String> record : records){
logger.info("{}",record.toString());
}
consumer.commitAsync();
}
JavaScript
복사
•
비동기 오프셋도 마찬가지로 poll() 메소드로 리턴된 가장 마지막 레코드를 기준으로 오프셋을 커밋한다. 다만, 동기 오프셋 커밋과 다른점은 커밋이 완료될때 까지 응답을 기다리지 않는 다는 것
•
따라서 처리량이 더많다.
물론 callback 함수를 따로 구현해줘야한다.
consumer.commitAsync(new OffsetCommitcallback() {
public void onComplete(Map<TopicPartiton, OffsetAndMetadata> offsets, Exception e){
/// 코드 서술
}
})
Java
복사
리밸런스 리스너를 가진 컨슈머
•
컨슈머 그룹에서 컨슈머가 추가되거나 제거될 경우, 리밸런스라는 파티션 재할당 과정이 발생합니다.
•
poll() 메소드를 통해 반환된 데이터를 모두 처리하기 전에 리밸런스가 발생할 경우, 데이터가 중복 처리될 수 있습니다.
•
리밸런스가 발생하면 처리한 데이터를 기준으로 커밋해야 중복 처리를 방지할 수 있습니다.
◦
이를 위해 카프카 라이브러리는 ConsumerRebalanceListener 인터페이스를 제공합니다.
◦
ConsumerRebalanceListener 인터페이스를 구현한 클래스는 onPartitionAssigned() 메소드와 onPartitionRevoked() 메소드로 구성됩니다.
◦
onPartitionAssigned : 리밸런스가 끝난 후 파티션이 완전히 할당되면 호출되는 메소드입니다.
◦
onPartitionRevoked : 리밸런스 시작 직전에 호출되는 메소드입니다.
따라서, 마지막으로 처리한 레코드를 기준으로 커밋하려면 리밸런스가 시작하기 직전에 커밋해야 합니다. 이는 onPartitionRevoked() 메소드에 커밋을 구현함으로써 처리할 수 있습니다.
토픽의 개수가 많아질수록 리밸런싱 시간이 길어집니다.
→ 그래서 리밸런싱이 자주 발생하지 않도록 관리하는 것이 중요합니다.
→ 따라서, 리밸런싱 과정에서 발생하는 문제들을 잘 해결할 수 있어야 합니다.
파티션 할당 컨슈머
컨슈머를 운영할 때 subscribe() 메소드를 이용해 구독형태로 사용할 수 있지만, 명시적으로 할당하여 직접 파티션을 컨슈머에 운영할 수도 있습니다.
•
어떤 토픽, 파티션을 컨슈머에 할당할지 명시적으로 선언하려면 assign() 메소드를 사용합니다.
•
assign() 메소드는 여러 개의 TopicPartition 인스턴스를 포함한 자바 컬렉션 타입을 파라미터로 받습니다.
•
TopicPartition클래스는 카프카 라이브러리 내/외부에서 토픽, 파티션 정보를 담는 객체로 사용됩니다.
private final static int TOPIC_NUMBER = 0;
public static void main(String ...args){
//...
consumer = new KafkaConsumer<String, String>(configs);
// consumer.subscribe(Collections.singletonList(TOPIC_NAME),new RebalanceListener());
// 컨슈머가 특정 파티션을 바라보도록
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER)));
//...
}
JavaScript
복사
컨슈머에 할당된 파티션 확인 방법
•
컨슈머에 할당된 토픽과 파티션 정보는 assignment() 메소드로 확인할 수 있습니다.
•
assignment()메소드는 Set<TopicPartition> 인스턴스를 반환합니다.
◦
TopicPartiton 클래스는 토픽이름과 파티션 번호가 포함된 객체입니다.
// 컨슈머가 특정 파티션을 바라보도록
consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER)));
Set<TopicPartition> assignedTopicPartition = consumer.assignment();
logger.info("topicInfo = {}", assignedTopicPartition);
JavaScript
복사
컨슈머 애플리케이션의 안전한 종료
•
컨슈머 애플리케이션은 안전하게 종료되어야 합니다.
◦
정상적으로 종료되지 않은 컨슈머는 세션 타임아웃 이 발생할 때까지 컨슈머 그룹에 남습니다.
◦
이로 인해 실제로 종료되었지만 더 이상 작동하지 않는 컨슈머가 존재하게 되어, 파티션의 데이터는 소모되지 않고 컨슈머 랙이 증가합니다.
•
컨슈머 랙이 발생하면 데이터 처리 지연이 생깁니다.
•
컨슈머를 안전하게 종료하기 위해 KafkaConsumer 클래스는 wakeup() 메소드를 제공합니다.
◦
wakeup() 메소드 실행 후 poll() 메소드가 호출되면 WakeupException 예외가 발생합니다.
◦
WakeupException 예외를 받은 후에는 데이터 처리를 위해 사용한 자원들을 해제하면 됩니다.
•
마지막으로 close() 메소드를 호출하여 카프카 클러스터에 컨슈머가 안전하게 종료되었음을 명시적으로 알리면 종료가 완료되었다고 볼 수 있습니다.
try{
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("{}", record.toString());
}
consumer.commitSync();
}
}catch (WakeupException we){
logger.warn("Wakeup consumer");
} finally{
consumer.close();
}
JavaScript
복사
•
그럼 wakeup() 메소드는 어디서 호출하면 될까요?
◦
자바 어플리케이션의 경우 코드 내부에 셧다운 훅(shutdown hook)을 구현하여 안전한 종료를 명시적으로 구현할 수 있습니다.
◦
셧다운 훅은 사용자 또는 운영체제로부터 종료 요청을 받으면 실행하는 스레드를 말합니다.
public class Consumer {
private final static Logger logger = LoggerFactory.getLogger(Consumer.class);
//...
public static void main(String ...args){
//...
Runtime.getRuntime().addShutdownHook(new ShutdownThread()); # 이 부분
//...
}
//...
static class ShutdownThread extends Thread{ ## 이 부분
@Override
public void run() {
logger.info("Shutdown hook");
consumer.wakeup();
}
}
Java
복사
•
사용자는 안전한 종료를 위해 위 코드로 실행된 어플리케이션 kill -TERM {프로세스 번호}를 호출하여 셧다운 훅을 발생시킬 수 있습니다.
여기에서는 kill에 대한 자세한 옵션을 확인할 수 있습니다.
멀티스레드 컨슈머 애플리케이션
컨슈머는 기본적으로 1 thread ⇒ 1 consumer 규칙을 따릅니다.
멀티 스레드 컨슈머 어플리케이션은 2가지 방향성이 존재
•
하나의 컨슈머 그룹, 1개의 프로세스에 여러 컨슈머 스레드
◦
하나의 프로세스로 관리하기 때문에, 배포에 대한 용이성 존재
•
하나의 컨슈머 그룹, 분리된 프로세스에 각 1개의 컨슈머 스레드
◦
데브원영님이 주로 사용하는 방식은 각 1개의 프로세스당 1개의 컨슈머 스레드를 사용하여 하나의 컨슈머 그룹으로 묶는 것. 이렇게 되면, 다른 프로세스에는 장애전파가 되지 않아 격리되어 데이터를 안정적으로 처리할 수 있다는 장점이 존재합니다.
◦
주로 컨테이너 오케스트레이션이 가능할 때 사용 가능
컨슈머 랙
컨슈머 랙 : 토픽의 최신 오프셋(LOG-END-OFFSET)과 컨슈머 오프셋(CURRENT-OFFSET) 간의 차이다.
프로듀서는 계속해서 새로운 데이터를 파티션에 저장하고 컨슈머는 자신이 처리할 수 있는 만큼 데이터를 가져간다. 컨슈머 랙은 컨슈머가 정상 동작하는지 여부를 확인할 수 있기 때문에 컨슈머 애플리케이션을 운영한다면 필수적으로 모니터링해야한다. 컨슈머 랙을 모니터링하는 것은 카프카를 통한 데이터 파이프라인을 운영하는 데에 핵심적인 역할을 한다. 컨슈머 랙을 모니터링함으로써 컨슈머의 장애를 확인할 수 있고 파티션의 개수를 정하는 데에 참고할 수 있기 때문이다.
컨슈머 랙은 컨슈머 그룹과 토픽, 파티션별로 생성된다.
예시
1개의 토픽에 3개의 파티션이 있고,
1개의 컨슈머 그룹이 토픽을 구독하여 데이터를 가져가면 컨슈머 랙은 총 3개가 된다.
프로듀서와 컨슈머의 데이터 처리량
컨슈머랙은 기본적으로 프로듀서와 컨슈머의 데이터 처리량에 대한 차이로 인하여 발생
컨슈머 랙 증가/감소
1번 | 프로듀서가 보내는 데이터 양 > 컨슈머의 데이터 처리량 | 컨슈머 랙 증가 |
2번 | 프로듀서가 보내는 데이터 양 < 컨슈머의 데이터 처리량 | 컨슈머 랙 감소 (최솟값 0 : 지연이 없음) |
프로듀서의 데이터 양이 일정함에도 컨슈머의 장애로 인해 컨슈머 랙이 증가할 수도 있다.
컨슈머는 파티션 개수만큼 늘려서 병렬처리하면 파티션마다 컨슈머가 할당되어 데이터를 처리한다.
* 예시
2개의 파티션으로 구성된 토픽에 2개의 컨슈머가 각각 할당되어 데이터를 처리한다고 가정하자.
프로듀서가 보내는 데이터는 동일한데 파티션 1번의 컨슈머 랙이 늘어나는 상황이 발생한다면
1번 파티션에 할당된 컨슈머에 이슈가 발생했음을 유추할 수 있다.
컨슈머 랙 모니터링
컨슈머 랙을 모니터링하는 것은 카프카 운영의 핵심적인 역활을 수행합니다
컨슈머 랙을 모니터링함으로써, 프로듀서가 과부화 혹은 프로듀싱이 많을 때에 지연을 줄이기 위해, 파티션, 컨슈머 개수를 늘려서 병렬 처리량을 늘리는 방법을 사용할 수 있습니다.
랙 모니터링 - 처리량 이슈
프로듀서의 데이터양이 늘어날 경우, 컨슈머 랙이 늘어날 수 있습니다.
이 경우 파티션의 개수와 컨슈머의 개수를 늘려, 병렬 처리량을 늘려 컨슈머 랙을 줄일 수 있습니다. 컨슈머의 개수를 2개로 늘림으로써, 컨슈머의 데이터 처리량을 2배로 늘릴 수 있습니다.
랙 모니터링 - 파티션 이슈
프로듀서의 데이터 양이 일정함에도 불구, 컨슈머의 장애로 인하여 컨슈머 랙이 증가할 수도 있습니다. 컨슈머는 파티션 개수만큼 늘려서 병렬처리하며, 파티션마다 컨슈머가 할당되어 데이터를 처리합니다
프로듀서 데이터 양은 동일한데 만약 컨슈머 2개 중 1개의 파티션의 컨슈머 랙이 늘어난다면, 아래처럼 1번 파티션에 할당된 컨슈머에 이슈가 있음을 알 수 있습니다
컨슈머 랙을 모니터링하는 방법
모니터링 방법
1) 카프카 명령어를 사용
아래 방법은 일회성에 지나지 않습니다.
컨슈머 그룹 이름 : my-group
kafka-consumer-groups.sh --bootstrap-server localhost:8082 --group my-group --describe
Kotlin
복사
2) 컨슈머 어플리케이션에서 metrics() 메서드를 사용
package com.example.consumer._consumer_lag;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Map;
import java.util.Properties;
@Slf4j
public class LagMetricsConsumer {
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
/* metrics() 메서드를 사용하여 컨슈머 랙 조회 */
for (Map.Entry<MetricName, ? extends Metric> entry : consumer.metrics().entrySet()) {
if ("record-lag-max".equals(entry.getKey().name())
|| "record-lags".equals(entry.getKey().name())
|| "record-lag-avg".equals(entry.getKey().name())) {
Metric metric = entry.getValue();
log.info(entry.getKey().name(), metric.metricValue());
}
}
}
}
Kotlin
복사
•
컨슈머가 죽으면 확인을 할 수 없다.
3) 외부 모니터링 툴을 사용
데이터독(Datadog), 컨플루언트 컨트롤 센터(Confluent Control Center)와 같은 카프카 클러스터 종합 모니터링 툴을 사용할 수 있다.
컨슈머 랙 모니터링만을 위한 툴로 오픈소스로 공개되어있는 버로우(Burrow)도 있다.
모든 토픽, 모든 컨슈머 그룹 정보를 수집해서 보여줌으로 엄청난 편의성을 제공한다.
카프카 버러우
컨슈머 랙 이슈 판별
컨슈머 랙 평가
정상 케이스
컨슈머 처리량 이슈
컨슈머 이슈
컨슈머 랙 모니터링 아키텍처