Search

카프카 핵심 가이드 ch.9 (데이터 파이프라인)

9장 데이터 파이프라인 구축하기

개요

카프카의 데이터 파이프라인 주요 활용 사례:
카프카가 엔드포인트가 되는 경우
예: 카프카 → Amazon S3로 데이터 전송
예: MongoDB → 카프카로 데이터 수집
카프카가 중간 매개체 역할을 하는 경우
예: Twitter → 카프카 → Elasticsearch
카프카의 핵심 장점:
데이터 파이프라인의 단계 사이에서 안정적인 버퍼 역할
데이터 쓰기/읽기 작업의 분리 가능
하나의 데이터 원본을 여러 대상 시스템으로 전송 가능
높은 신뢰성, 보안성, 효율성 제공
데이터 통합 시 고려사항:
단기적인 통합보다는 큰 그림을 고려해야 함
최소 2개 이상의 엔드포인트를 고려한 설계 필요
단기적 접근은 복잡성 증가와 유지보수 어려움 초래 가능
카프카 커넥트:
버전 0.9에서 추가된 기능
데이터 파이프라인 통합을 위한 API 제공
일반 프로듀서/클라이언트와 다른 특성 보유
기업들의 데이터 파이프라인 구축 용이성 제고
발전 배경:
LinkedIn 등 대형 조직들의 사용 경험을 바탕으로 발전
각 조직별 개별 개발의 비효율성 해결을 위해 API 추가
데이터 통합의 보편적 문제 해결을 위한 도구로 발전

9.1 데이터 파이프라인 구축 시 고려사항

9.1.1 적시성 (Timeliness)

데이터 처리 시간 요구사항의 스펙트럼:
밀리초 단위의 실시간 처리
하루 단위의 대량 처리
대부분은 이 둘의 중간 지점에 위치
카프카의 유연한 처리 특성:
쓰기(Write): 실시간 또는 주기적 쓰기 가능
읽기(Read): 실시간 또는 배치 처리 가능
쓰기/읽기 시간 요구사항을 독립적으로 관리 가능

9.1.2 신뢰성 (Reliability)

단일 장애점을 방지하거나, '무조건 한번'은 제공하거나, 데이터 유실이 허용할 수도 있는 다양한 형태의 데이터 파이프라인 구축 가능
데이터 전달 보장 수준:
최소 한 번 전달 보장 (At least once)
기본적으로 카프카가 제공
중복 발생 가능성 존재
정확히 한 번 전달 보장 (Exactly once)
트랜잭션 모델이나 고유 키를 지원하는 외부 저장소와 결합 시 가능
많은 오픈소스 커넥터에서 지원
카프카 커넥트 API의 장점:
외부 시스템과의 통합 지원
오프셋 관리 기능 제공
정확히 한 번 전달 보장을 위한 커넥터 개발 용이

9.1.3 높으면서도 조정 가능한 처리율

매우 높은 처리율 지원 (초당 수백 MB 처리 가능)
프로듀서와 컨슈머의 독립적 확장 가능
처리율 불균형 시 쓰는 쪽과 읽는 쪽 사이에서 버퍼 역할 수행
카프카 커넥트 API의 병렬화 지원
단일/다중 노드 모두에서 효율적 실행
다중 스레드 활용
다양한 압축 코덱 지원으로 자원 사용 최적화
위를 바탕으로 시스템 요구 조건에 따라 하나의 노드에서든 다양한 노드에서든 scale-out 가능

9.1.4 데이터 형식

데이터 파이프라인에서의 데이터 형식 고려사항
데이터 파이프라인의 핵심: 다양한 데이터 형식과 자료형 관리
시스템별로 다른 데이터 형식 지원
예시: XML/관계형 → Avro → JSON/Parquet/CSV 변환
데이터 파이프라인 설계 시 필수 고려사항
카프카의 데이터 처리 특성
형식 독립성
카프카 자체와 커넥트 API는 데이터 형식에 구애받지 않음
프로듀서와 컨슈머는 필요한 데이터 형식을 지원할 수만 있다면 어떤 시리얼라이저도 쓸 수 있다.
플러그형 컨버터로 다양한 형식 지원
스키마 관리 기능
소스/싱크 시스템의 스키마 지원
자동 스키마 업데이트 가능
예: MySQL → Snowflake 파이프라인에서 새로운 열 자동 추가
데이터 호환성 검증
커넥터의 역할
싱크 커넥터가 저장할 외부 시스템 데이터 형식 담당
형식 선택 기능 제공
예: S3 커넥터의 Avro/Parquet 형식 선택 가능

9.1.5 데이터 변환 방식

ETL(Extract-Transform-Load) 방식
특징
데이터 파이프라인 내에서 변환 작업 수행
시간과 공간 절약 가능
장점
저장 전 데이터 변환으로 효율적
연산과 저장 부담을 파이프라인으로 옮긴다는 점 → 단점도 됨
단점
파이프라인에서 변환이 일어나 파이프라인 하단에서의 데이터 처리 제한 발생
요구사항 변경 시 전체 파이프라인 재구성 필요 → 이거 심각
기존 데이터 재처리 필요할 수 있음
ELT(Extract-Load-Transform) 방식
특징
원본 데이터와 최대한 유사하게 유지 → 최소한의 변환만 수행 (주로 자료형 변환)
대상 시스템이 가공되지 않은 raw data를 받아서 모든 필요한 처리 수행
장점
raw data 제공을 통한 사용자에게 최대한의 유연성 제공
모든 데이터 처리가 한 시스템에서 수행되어 관리 용이
단점
변환 작업이 대상 시스템의 CPU와 자원을 잡아먹는다는 점
카프카의 데이터 변환 지원
단일 메시지 변환(SMT) 기능
원본 → 카프카, 카프카 → 대상 시스템로의 단일 레코드 변환 지원
메시지 라우팅, 필터링, 자료형 변환, 필드 삭제 등
복잡한 변환 작업은 카프카 스트림 활용
조인, 집계 등 고급 변환 작업 수행 가능

9.1.6 보안

누가 카프카에 접근할 수 있는지
메시지 암호화
파이프라인 수정 권한
개인 식별 정보에 대한 법 규제
등 다양한 보안 문제가 있을 수 있다.
카프카의 경우, 아래와 같은 기능 지원
데이터 암호화
SASL인증 인가
허가받지 않은 접근 내용을 추적할 수 있는 감사 로그

9.1.7 장애 처리

카프카는 장애가 발생했을 때 이전 시점으로 돌아가서 에러를 복구 가능.
또한 카프카에 저장된 이벤트가 유실되었을 경우 이벤트 재생도 가능.
장기간에 걸쳐 카프카 로그 설정이 가능하기에

9.1.8 결합과 민첩성

데이터 파이프라인의 중요점 중 하나는 데이터 원본과 대상을 분리할 수 있어야 한다는 것
결합의 예제
1.
임기응변 파이프라인
애플리케이션을 연결해야 할 때마다 커스텀 파이프라인을 구축
플룸을 활용해 로그를 HDFS에 밀어 넣기
인포매티카를 활용해 XML형태로 데이터를 오라클에 밀어 넣기
특정 엔드포인트에 데이터 파이프라인이 결합되어 있어 유지보수가 쉽지 않음
2.
메타데이터 유실
데이터의 스키마 메타데이터를 보존하지 않고 스키마 진화 역시 지원하지 않는다면
소스 쪽에서의 데이터 생성 SW와 싱크 쪽에서의 데이터 SW를 모두 강하게 결합하게 된다.
따라서 스키마 레지스트리와 같이 두 스키마 데이터를 모두 해석하는 방식을 지정
스키마 진화를 지원해야 한다.
3.
과도한 처리
파이프라인에서 너무 많은 처리 → 하단의 데이터 파이프라인의 필드가 필연적으로 적어지게 됨
이 때 하단 어플리케이션 요구 조건 변경 시
파이프라인을 덩달아 수정해야하기에 과도한 데이터 처리는 지원하지 않는 것이 좋다
어플리케이션이 알아서 처리 결정

9.2 카프카 커넥트 vs 프로듀서/컨슈머

프로듀서/컨슈머
카프카 클라이언트를 활용해 Consume하거나 Produce하는 것을 의미
어플리케이션 코드를 직접 커스텀하여 사용할 수 있다는 장점
카프카 커넥트
카프카를 직접 코드나 API를 작성하지 않고, 외부 데이터 저장소에 연결시켜야할 때 사용. 
카프카 커넥트를 활용하게 되면
외부 데이터 저장소의 데이터를 카프카로 가져올 수도, 저장된 데이터를 외부 저장소로 내보낼 수도 있다.
이러한 카프카 커넥터를 활용하고자 한다면 각 저장소에 맞는 커넥터가 필요
요즘은 많은 커넥터가 나와 있기에 실제로 해야할 일은 설정 파일을 작성하는 것 뿐
커넥터가 없다면?
카프카 클라이언트 or 커넥트 API를 활용하여 어플리케이션을 직접 작성할 수 있지만, 해당 작업은 쉽지 않다.
그나마 커넥트 API 쪽이 설정 관리, 오프셋 저장, 병렬 처리, 에러 처리, 서로 다른 데이터 형식 지원 및 REST API를 통한 표준화된 관리 기능을 제공하기에 추천
따라서 오픈소스 혹은 공식 문서로 잘 만들어진 커넥터를 활용하자

9.3 카프카 커넥트

카프카 커넥트는 다른 데이터 저장소와 카프카간의 확장성과 신뢰성을 가지면서 데이터를 주고받을 수 있는 수단을 제공
커넥트는 커넥터 플러그인을 개발하고 실행하기 위한 API와 런타임을 제공한다.
커넥터 플러그인은 카프카 커넥트가 실행하는 라이브러리로, 데이터를 이동하는 것을 담당
카프카 커넥트는 워커 프로세스들의 클러스터 형태로 실행
사용자는 워커에 커넥터 플러그인을 설치한 뒤 REST API를 사용해서 커넥터별 설정을 잡아 주거나 관리해줌
커넥터는 대용량의 데이터 이동을 병렬화해서 처리하고 워커의 유휴 자원을 더 효율적으로 활용하기 위해 태스크 task를 추가로 실행
소스 커넥터 태스크: 원본 시스템으로부터 데이터를 읽어 와서 커넥트 자료 객체의 형태로 워커 프로세스에 전달
싱크 커넥트 태스크: 워커로부터 커넥트 자료 객체를 받아서 대상 시스템에 쓰는 작업을 담당
카프카 커넥트는 데이터를 카프카에 쓸 때, 사용 형식으로 바꿀 수 있도록 컨버터를 사용
Json, Avro, Protobuf
외부 저장소에 쓰여지는 데이터와 카프카 내 저장 데이터를 다양한 형태 스키마로 컨버팅하여 지원한다고만 이해

9.3.1 카프카 커넥트 실행

설치 특징
아파치 카프카에 포함되어 배포되므로 별도로 설치할 필요는 없음
프로덕션 환경에서는 브로커와 별도의 서버로 분리 운영 권장
대용량 데이터 처리 시
다수의 커넥터 사용 시
실행 방법
실행 명령어
bin/connect-distributed.sh config/connect-distributed.properties
Shell
복사
브로커와 유사한 실행 방식
핵심 설정 항목
bootstrap.servers
연동할 카프카 브로커 목록
최소 3개 이상 브로커 지정 권장
group.id
커넥트 클러스터 구성 단위
동일 그룹 ID의 워커들이 하나의 클러스터 구성
커넥터와 태스크의 자유로운 실행 위치 지정
plugin.path
플러그인 디렉토리 지정
지원 플러그인 종류
커넥터
컨버터
트랜스포메이션
비밀 제공자
권장 구조
일반 커넥터
커넥터별 서브디렉토리 생성
/opt/connectors/jdbc와 /opt/connectors/elastic를 만들고 이 안에 커넥터 jar 파일과 모든 의존성들을 저장
/opt/connectors/ ├── mysql-connector/ │ ├── mysql-connector.jar │ ├── jackson-databind-2.12.0.jar │ └── slf4j-api-1.7.25.jar └── elasticsearch-connector/ ├── elasticsearch-connector.jar ├── jackson-databind-2.11.0.jar └── elasticsearch-client.jar
YAML
복사
의존성 파일 함께 저장
복수 디렉토리 지정 가능
설정 예시
plugin.path=/opt/connectors,/home/gwenshap/connectors
Plain Text
복사
uberJar는 직접 저장 가능 (서브디렉토리 같은 것 만들 필요 없이 plugin.path 아래 바로 저장)
주의사항
의존성 직접 저장 불가
예시
클래스패스 직접 추가 비권장
의존성 충돌 위험
plugin.path 사용 권장
예시
key.convertervalue.converter
데이터 형식 지정
기본값: JSONConverter
Converter 설정 방식
# 기본 설정 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # 스키마 포함 여부 설정 key.converter.schemas.enable=true|false value.converter.schemas.enable=true|false # Avro Converter -> 스키마 레지스트리 설정 key.converter.schema.registry.url=http://schema-registry:8081 value.converter.schema.registry.url=http://schema-registry:8081
Plain Text
복사
지원 형식
JSON (기본)
Avro
Protobuf
JsonSchema
각 컨버터별 필요한 의존성 확인
데이터 형식과 스키마 호환성 고려
카프카 커넥트 REST API와 실행 모드
커넥터를 설정하거나 모니터링할 때는 카프카 커넥트의 REST API를 사용
REST API 기본 설정
rest.host.name=localhost rest.port=8083
Plain Text
복사
버전 확인
$ curl http://localhost:8083/ { "version": "3.0.0-SNAPSHOT", "commit": "fae0784ce32a448a", "kafka_cluster_id": "pfkYIGZQSXm8RylvACQHdg" }
Plain Text
복사
사용 가능한 커넥터 목록 조회
$ curl http://localhost:8083/connector-plugins
Plain Text
복사
커넥트 실행 모드
분산 모드 (Distributed Mode)
기본 실행 방식
REST API로 관리
실행 명령
bin/connect-distributed.sh config/connect-distributed.properties
Shell
복사
독립 실행 모드 (Standalone Mode)
단일 워커에서 모든 커넥터/태스크 실행
명령줄로 설정 파일 전달
실행 명령
bin/connect-standalone.sh config/connect-standalone.properties
Shell
복사
사용 케이스
특정 장비 실행 필요성 고려
예: 파일 시스템 모니터링 → 특정 서버의 로그 파일 감시

JDBC 소스 커넥터 설정 예시 (MySQL Source Connector)

1.
REST API를 통한 커넥터 생성
MySQL의 login 테이블 데이터를 카프카로 가져오는 파이프라인을 구성
curl -X POST -H "Content-Type: application/json" \ http://localhost:8083/connectors -d '{ "name": "mysql-login-connector", "config": { "connector.class": "JdbcSourceConnector", "connection.url": "jdbc:mysql://127.0.0.1:3306/test?user=root", "mode": "timestamp", "table.whitelist": "login", "validate.non.null": "false", "timestamp.column.name": "login_time", "topic.prefix": "mysql." } }'
Shell
복사
기본 설정
name: 커넥터 이름 (mysql-login-connector)
connector.class: JDBC 소스 커넥터 클래스
연결 설정
connection.url: MySQL 연결 정보
table.whitelist: 대상 테이블 (login)
동작 설정
mode: 타임스탬프 기반 동작
timestamp.column.name: 기준 시간 컬럼
validate.non.null: null 값 검증 해제
topic.prefix: 토픽 접두사 (mysql.)

9.3.4 개별 메세지 변환 (SMT)

SMT 개요
정의: 상태 없는(stateless) 개별 메시지 변환
특징: 코드 작성 없이 커넥트 config 설정으로 수행 가능
용도: ETL 파이프라인의 변환 단계
기본 제공 SMT 종류
데이터 변환
Cast: 필드 데이터 타입 변경
Flatten: 중첩 구조 평탄화
TimestampConverter: 시간 형식 변환
필드 관리
MaskField: 민감 정보 null 처리
InsertField: 새 필드 추가
ReplaceField: 필드 삭제/이름 변경
라우팅
Filter: 조건부 메시지 필터링
RegexRouter: 토픽 이름 변경
TimestampRouter: 타임스탬프 기반 토픽 변경
헤더 조작
HeaderFrom: 필드를 헤더로 이동/복사
InsertHeader: 정적 문자열 헤더 추가
고오급 변환
상태 기반 변환
카프카 스트림즈 프레임워크 사용
조인, 집계 등 복잡한 연산 수행
추가 리소스
변환 기능 중에는 아파치 카프카 프로젝트 외부의 기여자들이 만든 것들도 많으니 참고
깃허브
컨플루언트 허브
학습 자료
커스텀 변환 개발 가이드
활용 예시
# MySQL 커넥터에 헤더 추가 예시 transforms=InsertHeader transforms.InsertHeader.type=org.apache.kafka.connect.transforms.InsertHeader transforms.InsertHeader.header.name=source transforms.InsertHeader.header.value=mysql-connector
Plain Text
복사
MySQL 테이블에 새 레코드를 추가하면, "mysql.login" 토픽에 헤더가 추가된 새 메시지가 생성

에러 처리와 DLQ

error.tolerance 설정
모든 싱크 커넥터에서 사용 가능
에러 처리 옵션
오염된 메시지 무시
데드 레터 큐로 전송
데드 레터 큐 (Dead Letter Queue)
정의: 처리 실패한 메시지를 저장하는 특별한 토픽
용도
실패한 메시지 보관
문제 분석
재처리 가능성 유지
설정 예시
# 에러 처리 설정 errors.tolerance=all errors.deadletterqueue.topic.name=my-dlq errors.deadletterqueue.context.headers.enable=true
Plain Text
복사

9.3.5 카프카 커넥트: 좀 더 자세히 알아보기

과정 개요
1.
하나 이상의 워커가 카프카 커넥트 클러스터에서 실행된다.
2.
워커는 하나 이상의 커넥터 플러그인을 갖고 있다.
각 plugin은 connector와 task를 갖고 있다.
3.
워커는 topic과 task간의 데이터를 이동시킨다.
4.
워커는 connector와 task를 시작시킨다.
관심사의 분리
데이터 이동: 커넥터/태스크 담당
운영/관리: 워커 담당
1.
주요 구성요소
카프카 커넥트를 사용하려면 워커 클러스터를 실행시킨 뒤 커넥터를 생성하거나 삭제해주어야 한다
구성 요소
1.
커넥터와 태스크
커넥터
Task를 관리하여 데이터 스트리밍을 조정하는 높은 수준의 추상화
커넥터에서 몇 개의 태스크가 실행되어야 하는지 결정
데이터 복사 작업을 각 태스크에 어떻게 분할해 줄지 결정
워커로부터 태스크 설정을 얻어와서 태스크에 전달
태스크: 데이터를 실제로 카프카에 넣거나 가져오는 작업을 담당
소스 태스크 : 외부 시스템을 폴링해서 워커가 카프카 브로커로 보낼 레코드 리스트를 리턴
싱크 태스크 : 워커를 통해 카프카 레코드를 받아서 외부 시스템에 쓰는 작업을 담당
2.
워커
Connector 및 Task을 실행하는 실행 프로세스 (컨테이너 프로세스)
커넥터/태스크 실행 관리
커넥터와 설정 정의하는 HTTP 요청 처리
설정 관리
커넥터 설정을 내부 카프카 토픽에 저장하고, 커넥터와 태스크를 실행시키고, 여기에 적절한 설정값을 전달해주는 역할
워커 프로세스들의 고가용성/장애 복구 지원
예시. 소스와 싱크 커넥터의 오프셋을 내부 카프카 토픽에 자동으로 커밋하는 작업과 태스크 에서 에러가 발생할 경우, 재시도하는 작업 역시 담당
3.
컨버터 및 데이터 모델
데이터 API
구성요소
데이터 객체
스키마 정보
예시 (JDBC 소스 커넥터)
ConnectSchema 객체 생성
Struct 객체에 데이터 저장
컨버터
Connect와 데이터를 보내거나 받는 시스템 간의 데이터 변환을 위한 코드
과정
소스 커넥터:
원본 시스템 → Schema, Value 순서쌍 생성
데이터 API 객체 → 워커 → 컨버터 변환 → 카프카
싱크 커넥터:
카프카 → 컨버터 변환 → 데이터 API 레코드
Schema, Value 파싱 → 대상 시스템
다양한 데이터 형식 지원 (JSON, Avro, Protobuf 등)
예. MySQL 행을 커넥터가 카프카에 쓰는 JSON 레코드 형태로 변환하는 작업
4.
오프셋 관리
기본 개념
목적: 데이터(이벤트) 처리 현황 추적
기능: REST API 통한 배포/설정 관리
특징: 워커 프로세스가 제공하는 편의 기능
소스 커넥터 오프셋
구성
논리적 파티션
소스 시스템 오프셋
예시
파일 소스: 파일(파티션), 줄/문자 위치(오프셋)
JDBC 소스: 테이블(파티션), ID/타임스탬프(오프셋)
오프셋 처리 과정
소스 커넥터
레코드 리턴 (파티션/오프셋 포함) → 워커가 카프카로 전송 → 성공 응답 후 오프셋 저장
싱크 커넥터
카프카 레코드에 이미 토픽, 파티션, 오프셋 식별자 포함
카프카 레코드 읽기 → put() 메서드로 대상 시스템 저장 → 성공 시 오프셋 커밋
저장소 관리
설정 가능 토픽 (오프셋이 저장될 토픽 이름)
offset.storage.topic: 오프셋 저장
config.storage.topic: 커넥터 설정
status.storage.topic: 커넥터 상태

reference

(참고) WAP 패턴에서의 카프카 유스 케이스
숨고의 커넥트 전환기 (config 관리 팁)
카프카 커넥트 커스텀 만들기