Search

[카프카] 카프카 프로듀서 정리

개요

카프카에서의 데이터 시작점은 프로듀서
프로듀서 어플리케이션은 카프카에 필요한 데이터를 선언하고, 브로커의 특정 토픽의 파티션에 전송합니다.
프로듀서는 데이터를 전송할 때, 리더 파티션을 가지고 있는 카프카 브로커와 직접 통신합니다.
프로듀서는 카프카 브로커로 데이터를 전송 할 때, 내부적으로 파티셔너, 배치 생성 단계를 거치게 됩니다.
ProducerRecord: 프로듀서에서 생성하는 레코드, 오프셋은 미포함
send(): 레코드를 전송 요청하는 메서드
Partitioner: 어느 파티션으로 전송할지 지정하는 파티셔너. 기본값으로 Default Partitioner로 설정됨
Accumulator: 배치로 묶어서 전송할 데이터를 모으는 버퍼
ProducerRecord 객체를 만들 때 필요한 필드값들을 설정할 수 있습니다.
토픽과 메시지값만 설정해도 메시지를 보낼 수 있습니다.
타임스탬프를 설정하지 않으면 메시지를 보낸 시간이 기준이 됩니다.
Producer가 send()를 호출하면, 어느 토픽의 어느 파티션으로 메시지를 보낼지 결정합니다.
파티셔너로 구분된 레코드는 메시지를 보내기 위해 어큐뮬레이터에 데이터를 모읍니다.
sender 스레드를 이용해 메시지를 보냅니다.(배치 전송)
프로듀서 API는 'UnifomStickyPartitioner' 와 'RoundRobinPartitioner' 2개의 파티셔너를 지원하는데
2.5.0버전 이후 부터는 UnifomStickyPartitioner가 기본 파티셔너
하지만, 3.6.1버전에서 파티셔너 구현체를 확인해보니 RoundRobin(파티션 순회방식)을 제외하고는 모두 사용하지 않는다고 표시되어 있습니다.
DefaultPartitioner의 글을 읽어보니, 기본 전략은 레코드에 파티션이 지정되어 있으면 그 파티션을 사용하고, 키가 있으면 키의 해시를 이용해 파티션을 고르는 것입니다.
파티션, 키 둘다 없으면 배치가 가득 찰 때까지 같은 파티션을 사용합니다.
(동일한 배치 크기)
사용자가 원하는 파티셔너를 사용하려면 Partitioner 인터페이스를 구현해야 합니다.
카프카 프로듀서는 압축 옵션도 제공합니다. 압축을 하면 CPU, 메모리를 사용하므로 주의해야 합니다.
컨슈머에서도 압축 해제에 리소스가 필요하므로 이 점도 주의해야 합니다.

적절한 파티션 개수 설정

토픽 생성 시 고려해야 하는 파티션 개수 관련 사항
데이터 처리량
메시지 키 사용 여부
브로커와 컨슈머의 영향도
데이터 처리 속도를 향상시키는 방법
1.
컨슈머의 처리량을 늘립니다.
a.
컨슈머 서버의 사양을 향상시킵니다.(스케일 업)
b.
GC를 튜닝합니다.
c.
파티션 개수를 늘리고, 그에 따라 컨슈머를 추가합니다.
2.
컨슈머를 추가하여 병렬 처리량을 늘립니다.
프로듀서가 전송하는 데이터량 < (컨슈머가 처리하는 데이터량 * 파티션 개수)
1.
컨슈머가 처리하는 데이터량은 반드시 실제 환경에서 테스트하여 측정합니다.
2.
프로듀서가 전송하는 데이터량을 일, 시간, 분 단위로 분할하여 예측합니다.
3.
데이터 처리 순서를 고려하여 메시지 키 사용 여부를 결정합니다.
a.
순서가 중요하다면 파티션 개수가 가능한 변하지 않아야 합니다.
b.
순서를 유지하면서 파티션 개수를 변경해야 한다면, 키 매칭을 유지하기 위해 커스텀 파티셔너를 개발해야 합니다.
4.
파티션 개수를 늘릴 때, 브로커 당 파티션 개수를 확인하고 진행합니다.
a.
브로커 당 파티션 개수가 과도하다면, 브로커 개수를 늘리는 것을 고려해야 합니다.

카프카 프로듀서 주요 옵션

acks 옵션

acks=0
프로듀서는 리더 파티션으로 데이터를 전송하지만, 데이터 저장 여부에 대한 응답을 받지 않습니다.
이 설정은 데이터 전송 속도가 가장 빠르므로, 신뢰성보다 전송 속도가 중요한 경우에 사용합니다.
acks=1
프로듀서는 리더 파티션에만 데이터가 정상적으로 적재되었는지 확인합니다.
데이터가 리더 파티션에만 적재될 때까지 재시도할 수 있습니다.
팔로워 파티션이 데이터를 복제하기 직전에 장애가 발생하면, 데이터 유실이 가능합니다.
acks=all (또는 -1)
프로듀서는 데이터가 리더와 팔로워 파티션(ISR 그룹) 모두에 정상적으로 적재되었는지 확인합니다.
이 설정은 속도는 느리지만, 장애 발생 시에도 데이터 안전성을 보장합니다.
min.insync.replicas: 프로듀서가 데이터 적재를 확인할 최소 ISR 그룹 파티션 개수
이 값은 최소 2로 설정해야 all 설정을 사용하는 의미가 있습니다.
브로커 개수보다 작게 설정해야 합니다.
가장 안정적인 설정 방법
토픽 복제 개수 = 3
min.insync.replicas = 2
acks = all
카프카 옵션은 필수 옵션과 선택옵션이 있는데, 선택옵션이라고 해서 중요하지 않은것이 아니라, Default값을 보고 수정할 수 있어야한다.
필수 옵션
bootstrap.servers: 프로듀서 데이터를 전송할 대상 호스트이름:포트 1개이상 작성
key.serializer: 메시지 키를 직렬화하는 클래스를 지정
value.serializer: 메시지 값을 직렬화하는 클래스 지정
선택 옵션
acks: 프로듀서가 데이터를 정상적으로 브로커들에게 전송했는지 확인하는 옵션입니다. 1은 리더 파티션에 데이터가 저장되면 전송 성공으로 판단합니다. 0은 브로커에 데이터 저장 여부에 관계 없이 성공으로 판단합니다. 'all'은 토픽의 min.insync.replicas 개수에 해당하는 리더 파티션과 팔로워 파티션에 데이터가 저장될 때 성공으로 판단합니다.
buffer.memory: 브로커로 전송할 데이터를 배치로 모으는 데 필요한 버퍼 메모리 양을 설정합니다 (32MB가 기본값입니다).
retries: 브로커로부터 에러를 받은 후 재전송을 시도하는 횟수를 설정합니다 (기본값은 2147483647입니다).
batch.size: 배치로 전송할 레코드의 최대 용량을 지정합니다 (16384).
linger.ms: 배치를 전송하기 전에 기다리는 최소 시간을 설정합니다 (기본값은 0입니다).
partitioner.class: 레코드를 파티션에 전송할 때 적용할 파티셔너 클래스를 지정합니다.
enable.idempotence: 멱등성 프로듀서 동작을 설정합니다.
transactional.id: 레코드 전송 시 레코드를 트랜잭션 단위로 묶을지 여부를 설정합니다.
카프카는 기본적으로 브로커 포트로 9092를 사용하게 되지만, 필자는 39092를 열어서 사용 중입니다.

기본적인 프로듀서 코드

상위 항목들은 설정에 관한 목록입니다.
KafkaProducer 클래스에 메시지를 전송하기 위해, ProducerRecord 클래스를 사용하여 토픽과 메시지 내용을 담습니다.
ProducerRecord 생성자 파라미터에 메시지 키값을 넣지 않으면, null로 전달됩니다.
Producer에서 send()는 즉각 전송하는 것이 아니라, 프로듀서 내부에 저장한 후에 배치 형태로 브로커에 전송합니다. 이를 배치 전송 이라고 합니다.
flush()메서드를 호출하면 프로듀서 내부 버퍼의 레코드 배치가 브로커로 전송됩니다.
close()메서드를 호출하여 producer 인스턴스의 리소스를 종료합니다.
package com.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; public class SimpleProducer { private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class); private final static String TOPIC_NAME = "test"; private final static String BOOTSTRAP_SERVERS = "${host_ip}:39092"; public static void main(String[] args) { Properties configs = new Properties(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(configs); String messageValue = "testMessage"; ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue); producer.send(record); logger.info("{}", record); producer.flush(); producer.close(); } }
JavaScript
복사
메시지 키를 가진 데이터를 전송하는 프로듀서
메시지 키가 포함된 레코드를 전송하고 싶으면 ProducerRecord생성시 파라미터를 추가하면된다.
토픽이름, 메시지 키 , 메시지 값 순이다.
package com.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class ProducerWithKeyValue { private final static String TOPIC_NAME = "test"; private final static String BOOTSTRAP_SERVERS = "{ip}:39092"; public static void main(String[] args) { Properties configs = new Properties(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(configs); ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "Pangyo"); producer.send(record); ProducerRecord<String, String> record2 = new ProducerRecord<>(TOPIC_NAME, "Busan", "Busan"); producer.send(record2); producer.flush(); producer.close(); } }
JavaScript
복사
레코드에 파티션 번호를 지정해서 전송할 수 있습니다
int partitionNo = 0; ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, partitionNo, "Pangyo", "Pangyo"); producer.send(record);
Java
복사
커스텀 파티셔너를 가지는 프로듀서
KafkaProducer 객체에 따로 Custom Partitioner 클래스를 넣어주지 않는다면, Kafka 2.4.0 버전부터는 디폴트 값으로 UniformStickyPartitioner가 들어가게 된다. 이전의 RoundRobinPartitioner의 단점을 보완한 파티셔너로, 데이터가 배치로 모두 묶일 때까지 기다렸다가 배치로 묶인 데이터를 모두 동일한 파티션에 전송하는 방식이다.
Custom Partitioner 클래스를 작성하고 싶다면 카프카 클라이언트에서 제공하는 Partitioner 인터페이스를 구현하여 커스텀 클래스를 구현해야 한다. 그 이후 구현된 클래스를 Kafka producer 객체의 인자에 넣어주는 방식으로 Custom Partitioner 구현이 가능하다.
파티션 임의 구현
package com.example; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Utils; import java.util.List; import java.util.Map; public class CustomPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if (keyBytes == null) { throw new InvalidRecordException("Need message key"); } if (((String)key).equals("Pangyo")) return 0; List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } @Override public void configure(Map<String, ?> configs) {} @Override public void close() {} }
Java
복사
특정 키를 가지면 0번 파티션으로 갈 수 있도록 구성
그 외에는 파티션을 %를 통해 구성
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
Java
복사
프로퍼티 설정에 해당 파티셔너를 등록해주면 등록이 됩니다.
브로커 정상 전송여부를 확인하는 프로듀서
send 메서드는 Future 객체를 반환
ProducerRecord가 카프카 브로커에 정상적으로 적재되었는지에 대한 비동기 결과가 포함됨
producer.send(record).get()을 통해 동기적으로 데이터 전송 결과를 가져올 수 있습니다.
RecordMetadata metadata = (RecordMetadata)kafkaProducer.send(record).get()
Java
복사
결과 → test-0@9
"test"는 토픽을 가리킵니다.
"0"은 파티션을 가리킵니다.
"9"는 오프셋 번호를 가리킵니다.
"test" 토픽의 0번 파티션에 9번 오프셋이 부여되었다는 것을 알 수 있습니다.
하지만, 데이터를 동기적으로 받으면 전송 과정에서 지연(응답 대기)이 발생합니다.
지연을 원하지 않으면, 비동기적으로 결과를 확인하면 됩니다.
결과를 확인하려면 CallBack 인터페이스를 구현하고, 이를 사용해야 합니다.
public class ProducerCallBack implements Callback { private final static Logger logger = LoggerFactory.getLogger(ProducerCallBack.class); @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (Objects.nonNull(exception)) { logger.error("Kafka Send Error = {}", exception.getMessage(), exception); } else { logger.info("Kafka MetaData = {}", metadata); } } }
Java
복사
전송 요청시 콜백 클래스를 넣으면 비동기로 처리할 수 있습니다.
-1번 오프셋에 적재가 되었다고 나옵니다
비동기로 보냈기 때문에, 오프셋을 받아오지 못합니다.
비동기적으로 전송 결과를 받아오기 위해서는 Callback 인터페이스를 구현하는 콜백프로듀서 클래스를 작성해야 한다.
kafkaProducer.send(record, new ProducerCallBack());
Java
복사
동기 vs. 비동기
동기적으로 매번 전송 결과를 받아오게 되면, 브로커의 전송 결과에 대한 응답을 매번 기다려야 하므로 속도가 느려질 수 있습니다.
그러나 비동기적 방식이 항상 좋은 것은 아닙니다. 데이터의 순서가 중요한 경우에는 사용해서는 안 됩니다. 비동기적으로 결과를 기다리는 동안, 다음 차례의 데이터 전송이 성공하고 이전에 전송한 데이터의 결과가 실패할 경우, 재전송으로 인해 데이터 순서가 바뀔 수 있기 때문입니다.

프로듀서 어플리케이션 안전한 종료

producer.close와 flush를 통해 어큐뮤레이터에 저장된 모든 데이터를 카프카 클러스터로 전송합니다.
producer.flush(); producer.close();
Plain Text
복사