Search

카프카 프로듀서 (카프카 핵심 가이드 ch 3)

3장. 카프카 프로듀서

카프카의 용도
큐, 메시지 버스, 데이터 저장 플랫폼 등으로 활용
애플리케이션 유형
프로듀서 (데이터 쓰기)
컨슈머 (데이터 읽기)
프로듀서와 컨슈머 기능을 모두 수행하는 애플리케이션
카프카 클라이언트 API
개발자들이 카프카와 상호작용하는 애플리케이션을 개발할 때 사용
프로듀서 사용 방법
KafkaProducer와 ProducerRecord 객체 생성
카프카에 레코드 전송 방법
에러 처리 방법
프로듀서 설정
작동을 제어하기 위한 다양한 설정 옵션
주요 컴포넌트
파티셔너: 파티션 할당 방식 정의
시리얼라이저: 객체의 직렬화 방식 정의
추가 학습 내용
파티셔너와 시리얼라이저 작성 방법
프로듀서 사용 상황
사용자 행동 기록
성능 매트릭 기록
로그 메시지 저장
스마트 가전 정보 수집
비동기적 통신
데이터베이스 저장 전 버퍼링
프로듀서 요구 조건
메시지 유실 허용 여부
중복 허용 여부
지연(latency)과 처리율(throughput) 요구사항
프로듀서 동작 과정
a. ProducerRecord 객체 생성
필수: 토픽, 밸류
선택: 키, 파티션
b. 직렬화 및 파티션 할당
키와 값을 바이트 배열로 직렬화
파티셔너를 통해 파티션 결정 (키 기반)
c. 레코드 배치 및 전송
같은 토픽과 파티션의 레코드를 배치로 그룹화
별도 스레드가 브로커에 전송
d. 브로커 응답 처리
성공: RecordMetadata 객체 반환 (토픽, 파티션, 오프셋 정보)
실패: 에러 반환
e. 에러 처리
실패 시 재전송 시도 가능
카프카 프로듀서 필수 속성값
a. bootstrap.servers
카프카 클러스터 연결을 위한 브로커의 host:port 목록
2개 이상 지정 권장
b. key.serializer
레코드 키값 직렬화를 위한 시리얼라이저 클래스 이름
Serializer 인터페이스 구현 클래스 지정
카프카 client 패키지에 기본 시리얼라이저 포함
c. value.serializer
레코드 밸류값 직렬화를 위한 시리얼라이저 클래스 이름
프로듀서 생성 예시
Properties 객체를 사용하여 필수 속성 설정
KafkaProducer 객체 생성
메시지 전송 방법
a. 파이어 엔 포겟 (Fire and forget)
메시지 전송 후 성공/실패 여부 무시
일종의 non blocking 느낌이라고 이해
자동 재전송 시도, but 일부 에러 상황에서 정보 미수신
ProducerRecord 객체 생성 (토픽, 키, 밸류 지정)
producer.send() 메서드 사용, 반환값 무시
성공/실패 여부 확인 불가능
예외 처리 필요 (SerializationException, TimeoutException, InterruptException)
b. 동기적 전송 (Synchronous send)
send() 메서드가 Future 객체 반환
get() 메서드로 작업 완료 및 성공 여부 확인
에러 종류 a) 재시도 가능한 에러 (자동 재전송 설정 가능) b) 재시도 불가능한 에러 (즉시 예외 발생)
c. 비동기적 전송 (Asynchronous send)
콜백 함수와 함께 send() 메서드 사용
Callback 인터페이스 구현 (onCompletion 메서드 정의)
콜백은 메인 스레드에서 실행되므로 빠르게 처리해야 함
콜백 내에서 블로킹 작업 수행은 권장되지 않음
브로커 응답 시 자동으로 콜백 함수 호출
추가 정보
프로듀서 객체는 다수의 스레드 동시 사용 가능
VoidSerializer를 사용해 키 없는 메시지 전송 가능
카프카 프로듀서 주요 설정
1.
client.id
프로듀서와 애플리케이션을 구분하는 논리적 식별자
트러블슈팅에 유용
2.
acks (신뢰성과 성능에 영향) a. acks = 0
프로듀서가 브로커의 응답을 기다리지 않음
가장 빠른 처리량, 낮은 신뢰도
b. acks = 1 (기본값)
리더 레플리카가 메시지를 받으면 성공으로 간주
리더 크래시 시 메시지 유실 가능성 있음
c. acks = all
모든 In Sync Replica에 메시지가 전달된 후 성공으로 간주
실제로는 min.insync.replicas 설정값 이상의 레플리카에 복제 후 성공
3.
주요 고려사항
acks 설정을 낮추면 프로듀서 지연은 줄일 수 있으나, 종단 지연(end-to-end latency)은 동일
카프카는 일관성을 위해 모든 인-싱크 레플리카에 복제된 후에만 컨슈머가 읽을 수 있도록 제한
4.
설정 선택 시 고려사항
신뢰성 vs 성능 트레이드오프
대량 데이터 처리 시 전송 속도가 중요할 수 있음 (예: 온라인 코딩 테스트 시스템의 마우스 로그)
메시지 전달 시간의 두 구간
send()
호출부터 결과 반환까지의 시간
비동기 호출 시 측정 가능
이 동안 호출 스레드는 블록됨
send() 반환부터 콜백 호출까지의 시간
실제 메시지 전송 및 응답 대기 시간
주요 설정
1.
max.block.ms
프로듀서가 블록되는 최대 시간 설정
send() 또는 partitionsFor() 호출 시 적용
버퍼가 가득 차거나 메타데이터 미사용 시 블록
시간 초과 시 예외 발생
2.
delivery.timeout.ms
레코드 전송 준비 완료부터 응답 수신 또는 포기까지의 제한 시간
linger.ms + request.timeout.ms보다 커야 함
시간 초과 시 마지막 에러와 함께 콜백 호출
배치 전 시간 초과 시 타임아웃 예외와 함께 콜백 호출
권장 사항
delivery.timeout.ms를 사용자가 허용 가능한 최대 대기 시간으로 설정
retries를 기본값(무제한)으로 유지
3.
request.timeout.ms
서버 응답 대기 시간 설정
타임아웃 시 재전송 시도 또는 TimeoutException 발생
4.
retries 및 retry.backoff.ms
retries: 재전송 시도 횟수
retry.backoff.ms: 재전송 사이 대기 시간 (기본 100ms)
권장: delivery.timeout.ms를 브로커 복구 시간보다 길게 설정
재전송 비활성화: retries = 0
retry 관련 값 직접 조정보다는 delivery.timeout.ms 조정 권장
5.
linger.ms
배치 전송 전 대기 시간
조건: 배치 가득 참 또는 linger.ms 시간 도달
기본값: 0 (즉시 전송)
값 증가 시 처리율 향상 가능
linger.ms 증가로 처리율 개선 가능 (특히 압축 사용 시 효과적)
6.
buffer.memory
메시지 대기 버퍼 크기 설정
버퍼 가득 참 시 send() 호출은 max.block.ms 동안 블록
공간 미확보 시 예외 발생
버퍼 메모리 관리는 애플리케이션의 메시지 생성 속도와 서버 처리 속도 균형에 중요
7.
compression.type
기본값: 압축 없음
옵션: snappy, gzip, lz4, zstd
Snappy: CPU 부하 적음, 성능 좋음
Gzip: CPU 사용량 높지만 압축률 좋음
압축 사용 시 네트워크 사용량과 저장공간 절약
8.
batch.size
배치에 사용될 메모리 양 (바이트 단위)
작게 설정 시 오버헤드 발생
9.
max.in.flight.requests.per.connection
응답 대기 중 전송 가능한 최대 메시지 수
기본값: 5
단일 데이터 센터에서는 2일 때 최적 성능
10.
max.request.size
쓰기 요청의 크기 제한
message.max.byte와 동일하게 설정 권장
11.
receive.buffer.bytes, send.buffer.bytes
TCP 송수신 버퍼 크기
1: 운영체제 기본값 사용
원거리 브로커 통신 시 값 증가 권장
12.
enable.idempotence
'정확히 한 번' 의미구조 지원
true 설정 시 멱등적 프로듀서 기능 활성화
레코드에 순차적 번호 부여로 중복 방지
활성화 조건:
max.in.flight.request.per.connection ≤ 5
retries ≥ 1
acks = all
시리얼라이저
기본 제공: String, Integer, Byte 등
커스텀 시리얼라이저 옵션:
a) 범용 직렬화 라이브러리 사용 (권장)
b) 커스텀 직렬화 로직 작성
아래와 같은 이유로 아파치 에이브로 사용 권장
스키마 진화(Schema Evolution)
데이터 구조 변경 시 이전 버전과의 호환성 유지
프로듀서와 컨슈머 간 독립적인 업데이트 가능
압축 효율성
데이터를 효율적으로 압축하여 저장 공간과 네트워크 대역폭 절약
언어 독립성
다양한 프로그래밍 언어에서 사용 가능
서로 다른 시스템 간 데이터 교환 용이
스키마 레지스트리 지원
중앙 집중식 스키마 관리 가능
데이터와 스키마 분리로 효율적인 관리
빠른 직렬화/역직렬화
바이너리 포맷으로 인한 빠른 처리 속도
파티션 접착성(Sticky Partitioning)
효율적인 배치 처리로 성능 향상
파티셔너 종류:
기본 파티셔너: 키 기반 해시 할당
RoundRobinPartitioner: 랜덤 할당
UniformStickyPartitioner: 전체 파티션에 대해서 균등한 분포를 가지도 록 파티션이 할당됨
토픽 생성 시 충분한 파티션 생성 권장
커스텀 파티셔너
특별한 파티션 할당 로직 구현 가능
보통의 경우 파티션 결정은 키값의 해시처리에 의한 파티션 결정
헤더
메타데이터 저장용
메시지 라우팅, 출처 추적에 사용
인터셉터
ProducerInterceptor 인터페이스 사용
onSend(): 레코드 전송 전 호출
onAcknowledgement(): 브로커 응답 수신 시 호출
쿼터와 스로틀링
쓰기/읽기 속도 제한 기능
한도(quota)를 설정해 주면 됨.
client.id 또는 사용자별 설정 가능
보안 기능과 연계 가능
사용자에 대해 설정된 쿼터는 보안 기능과 클라이언트 인증 기능이 활성화되어있는 클라이 언트만 작동
LAG를 줄이기 위한 커스터마이징으로 이해