3장. 카프카 프로듀서
•
카프카의 용도
◦
큐, 메시지 버스, 데이터 저장 플랫폼 등으로 활용
•
애플리케이션 유형
◦
프로듀서 (데이터 쓰기)
◦
컨슈머 (데이터 읽기)
◦
프로듀서와 컨슈머 기능을 모두 수행하는 애플리케이션
•
카프카 클라이언트 API
◦
개발자들이 카프카와 상호작용하는 애플리케이션을 개발할 때 사용
•
프로듀서 사용 방법
◦
KafkaProducer와 ProducerRecord 객체 생성
◦
카프카에 레코드 전송 방법
◦
에러 처리 방법
•
프로듀서 설정
◦
작동을 제어하기 위한 다양한 설정 옵션
•
주요 컴포넌트
◦
파티셔너: 파티션 할당 방식 정의
◦
시리얼라이저: 객체의 직렬화 방식 정의
•
추가 학습 내용
◦
파티셔너와 시리얼라이저 작성 방법
프로듀서 사용 상황
•
사용자 행동 기록
•
성능 매트릭 기록
•
로그 메시지 저장
•
스마트 가전 정보 수집
•
비동기적 통신
•
데이터베이스 저장 전 버퍼링
프로듀서 요구 조건
•
메시지 유실 허용 여부
•
중복 허용 여부
•
지연(latency)과 처리율(throughput) 요구사항
프로듀서 동작 과정
a. ProducerRecord 객체 생성
•
필수: 토픽, 밸류
•
선택: 키, 파티션
b. 직렬화 및 파티션 할당
•
키와 값을 바이트 배열로 직렬화
•
파티셔너를 통해 파티션 결정 (키 기반)
c. 레코드 배치 및 전송
•
같은 토픽과 파티션의 레코드를 배치로 그룹화
•
별도 스레드가 브로커에 전송
d. 브로커 응답 처리
•
성공: RecordMetadata 객체 반환 (토픽, 파티션, 오프셋 정보)
•
실패: 에러 반환
e. 에러 처리
•
실패 시 재전송 시도 가능
카프카 프로듀서 필수 속성값
a. bootstrap.servers
•
카프카 클러스터 연결을 위한 브로커의 host:port 목록
•
2개 이상 지정 권장
b. key.serializer
•
레코드 키값 직렬화를 위한 시리얼라이저 클래스 이름
•
Serializer 인터페이스 구현 클래스 지정
•
카프카 client 패키지에 기본 시리얼라이저 포함
c. value.serializer
•
레코드 밸류값 직렬화를 위한 시리얼라이저 클래스 이름
프로듀서 생성 예시
•
Properties 객체를 사용하여 필수 속성 설정
•
KafkaProducer 객체 생성
메시지 전송 방법
•
a. 파이어 엔 포겟 (Fire and forget)
◦
메시지 전송 후 성공/실패 여부 무시
▪
일종의 non blocking 느낌이라고 이해
◦
자동 재전송 시도, but 일부 에러 상황에서 정보 미수신
◦
ProducerRecord 객체 생성 (토픽, 키, 밸류 지정)
◦
producer.send() 메서드 사용, 반환값 무시
◦
성공/실패 여부 확인 불가능
◦
예외 처리 필요 (SerializationException, TimeoutException, InterruptException)
•
b. 동기적 전송 (Synchronous send)
◦
send() 메서드가 Future 객체 반환
◦
get() 메서드로 작업 완료 및 성공 여부 확인
◦
에러 종류
a) 재시도 가능한 에러 (자동 재전송 설정 가능)
b) 재시도 불가능한 에러 (즉시 예외 발생)
•
c. 비동기적 전송 (Asynchronous send)
◦
콜백 함수와 함께 send() 메서드 사용
▪
Callback 인터페이스 구현 (onCompletion 메서드 정의)
▪
콜백은 메인 스레드에서 실행되므로 빠르게 처리해야 함
▪
콜백 내에서 블로킹 작업 수행은 권장되지 않음
◦
브로커 응답 시 자동으로 콜백 함수 호출
•
추가 정보
◦
프로듀서 객체는 다수의 스레드 동시 사용 가능
◦
VoidSerializer를 사용해 키 없는 메시지 전송 가능
카프카 프로듀서 주요 설정
1.
client.id
•
프로듀서와 애플리케이션을 구분하는 논리적 식별자
•
트러블슈팅에 유용
3.
주요 고려사항
•
acks 설정을 낮추면 프로듀서 지연은 줄일 수 있으나, 종단 지연(end-to-end latency)은 동일
•
카프카는 일관성을 위해 모든 인-싱크 레플리카에 복제된 후에만 컨슈머가 읽을 수 있도록 제한
4.
설정 선택 시 고려사항
•
신뢰성 vs 성능 트레이드오프
•
대량 데이터 처리 시 전송 속도가 중요할 수 있음
(예: 온라인 코딩 테스트 시스템의 마우스 로그)
메시지 전달 시간의 두 구간
•
send()
◦
호출부터 결과 반환까지의 시간
◦
비동기 호출 시 측정 가능
◦
이 동안 호출 스레드는 블록됨
•
send() 반환부터 콜백 호출까지의 시간
◦
실제 메시지 전송 및 응답 대기 시간
주요 설정
1.
max.block.ms
•
프로듀서가 블록되는 최대 시간 설정
•
send() 또는 partitionsFor() 호출 시 적용
•
버퍼가 가득 차거나 메타데이터 미사용 시 블록
•
시간 초과 시 예외 발생
2.
delivery.timeout.ms
•
레코드 전송 준비 완료부터 응답 수신 또는 포기까지의 제한 시간
•
linger.ms + request.timeout.ms보다 커야 함
•
시간 초과 시 마지막 에러와 함께 콜백 호출
•
배치 전 시간 초과 시 타임아웃 예외와 함께 콜백 호출
•
권장 사항
◦
delivery.timeout.ms를 사용자가 허용 가능한 최대 대기 시간으로 설정
◦
retries를 기본값(무제한)으로 유지
3.
request.timeout.ms
•
서버 응답 대기 시간 설정
•
타임아웃 시 재전송 시도 또는 TimeoutException 발생
4.
retries 및 retry.backoff.ms
•
retries: 재전송 시도 횟수
•
retry.backoff.ms: 재전송 사이 대기 시간 (기본 100ms)
•
권장: delivery.timeout.ms를 브로커 복구 시간보다 길게 설정
•
재전송 비활성화: retries = 0
•
retry 관련 값 직접 조정보다는 delivery.timeout.ms 조정 권장
5.
linger.ms
•
배치 전송 전 대기 시간
•
조건: 배치 가득 참 또는 linger.ms 시간 도달
•
기본값: 0 (즉시 전송)
•
값 증가 시 처리율 향상 가능
◦
linger.ms 증가로 처리율 개선 가능 (특히 압축 사용 시 효과적)
6.
buffer.memory
•
메시지 대기 버퍼 크기 설정
•
버퍼 가득 참 시 send() 호출은 max.block.ms 동안 블록
•
공간 미확보 시 예외 발생
•
버퍼 메모리 관리는 애플리케이션의 메시지 생성 속도와 서버 처리 속도 균형에 중요
7.
compression.type
•
기본값: 압축 없음
•
옵션: snappy, gzip, lz4, zstd
•
Snappy: CPU 부하 적음, 성능 좋음
•
Gzip: CPU 사용량 높지만 압축률 좋음
•
압축 사용 시 네트워크 사용량과 저장공간 절약
8.
batch.size
•
배치에 사용될 메모리 양 (바이트 단위)
•
작게 설정 시 오버헤드 발생
9.
max.in.flight.requests.per.connection
•
응답 대기 중 전송 가능한 최대 메시지 수
•
기본값: 5
•
단일 데이터 센터에서는 2일 때 최적 성능
10.
max.request.size
•
쓰기 요청의 크기 제한
•
message.max.byte와 동일하게 설정 권장
11.
receive.buffer.bytes, send.buffer.bytes
•
TCP 송수신 버퍼 크기
•
1: 운영체제 기본값 사용
•
원거리 브로커 통신 시 값 증가 권장
12.
enable.idempotence
•
'정확히 한 번' 의미구조 지원
•
true 설정 시 멱등적 프로듀서 기능 활성화
•
레코드에 순차적 번호 부여로 중복 방지
•
활성화 조건:
◦
max.in.flight.request.per.connection ≤ 5
◦
retries ≥ 1
◦
acks = all
시리얼라이저
•
기본 제공: String, Integer, Byte 등
•
커스텀 시리얼라이저 옵션:
◦
a) 범용 직렬화 라이브러리 사용 (권장)
◦
b) 커스텀 직렬화 로직 작성
•
아래와 같은 이유로 아파치 에이브로 사용 권장
◦
스키마 진화(Schema Evolution)
▪
데이터 구조 변경 시 이전 버전과의 호환성 유지
▪
프로듀서와 컨슈머 간 독립적인 업데이트 가능
◦
압축 효율성
▪
데이터를 효율적으로 압축하여 저장 공간과 네트워크 대역폭 절약
◦
언어 독립성
▪
다양한 프로그래밍 언어에서 사용 가능
▪
서로 다른 시스템 간 데이터 교환 용이
◦
스키마 레지스트리 지원
▪
중앙 집중식 스키마 관리 가능
▪
데이터와 스키마 분리로 효율적인 관리
◦
빠른 직렬화/역직렬화
▪
바이너리 포맷으로 인한 빠른 처리 속도
파티션 접착성(Sticky Partitioning)
•
효율적인 배치 처리로 성능 향상
•
파티셔너 종류:
◦
기본 파티셔너: 키 기반 해시 할당
◦
RoundRobinPartitioner: 랜덤 할당
◦
UniformStickyPartitioner: 전체 파티션에 대해서 균등한 분포를 가지도 록 파티션이 할당됨
•
토픽 생성 시 충분한 파티션 생성 권장
•
커스텀 파티셔너
◦
특별한 파티션 할당 로직 구현 가능
◦
보통의 경우 파티션 결정은 키값의 해시처리에 의한 파티션 결정
헤더
•
메타데이터 저장용
•
메시지 라우팅, 출처 추적에 사용
인터셉터
•
ProducerInterceptor 인터페이스 사용
•
onSend(): 레코드 전송 전 호출
•
onAcknowledgement(): 브로커 응답 수신 시 호출
쿼터와 스로틀링
•
쓰기/읽기 속도 제한 기능
◦
한도(quota)를 설정해 주면 됨.
•
client.id 또는 사용자별 설정 가능
◦
보안 기능과 연계 가능
◦
사용자에 대해 설정된 쿼터는 보안 기능과 클라이언트 인증 기능이 활성화되어있는 클라이 언트만 작동
◦
LAG를 줄이기 위한 커스터마이징으로 이해