9장 데이터 파이프라인 구축하기
개요
•
카프카의 데이터 파이프라인 주요 활용 사례:
◦
카프카가 엔드포인트가 되는 경우
▪
예: 카프카 → Amazon S3로 데이터 전송
▪
예: MongoDB → 카프카로 데이터 수집
◦
카프카가 중간 매개체 역할을 하는 경우
▪
예: Twitter → 카프카 → Elasticsearch
•
카프카의 핵심 장점:
◦
데이터 파이프라인의 단계 사이에서 안정적인 버퍼 역할
◦
데이터 쓰기/읽기 작업의 분리 가능
◦
하나의 데이터 원본을 여러 대상 시스템으로 전송 가능
◦
높은 신뢰성, 보안성, 효율성 제공
•
데이터 통합 시 고려사항:
◦
단기적인 통합보다는 큰 그림을 고려해야 함
◦
최소 2개 이상의 엔드포인트를 고려한 설계 필요
◦
단기적 접근은 복잡성 증가와 유지보수 어려움 초래 가능
•
카프카 커넥트:
◦
버전 0.9에서 추가된 기능
◦
데이터 파이프라인 통합을 위한 API 제공
◦
일반 프로듀서/클라이언트와 다른 특성 보유
◦
기업들의 데이터 파이프라인 구축 용이성 제고
•
발전 배경:
◦
LinkedIn 등 대형 조직들의 사용 경험을 바탕으로 발전
◦
각 조직별 개별 개발의 비효율성 해결을 위해 API 추가
◦
데이터 통합의 보편적 문제 해결을 위한 도구로 발전
9.1 데이터 파이프라인 구축 시 고려사항
9.1.1 적시성 (Timeliness)
•
데이터 처리 시간 요구사항의 스펙트럼:
◦
밀리초 단위의 실시간 처리
◦
하루 단위의 대량 처리
◦
대부분은 이 둘의 중간 지점에 위치
•
카프카의 유연한 처리 특성:
◦
쓰기(Write): 실시간 또는 주기적 쓰기 가능
◦
읽기(Read): 실시간 또는 배치 처리 가능
◦
쓰기/읽기 시간 요구사항을 독립적으로 관리 가능
9.1.2 신뢰성 (Reliability)
•
단일 장애점을 방지하거나, '무조건 한번'은 제공하거나, 데이터 유실이 허용할 수도 있는 다양한 형태의 데이터 파이프라인 구축 가능
•
데이터 전달 보장 수준:
◦
최소 한 번 전달 보장 (At least once)
▪
기본적으로 카프카가 제공
▪
중복 발생 가능성 존재
◦
정확히 한 번 전달 보장 (Exactly once)
▪
트랜잭션 모델이나 고유 키를 지원하는 외부 저장소와 결합 시 가능
▪
많은 오픈소스 커넥터에서 지원
•
카프카 커넥트 API의 장점:
◦
외부 시스템과의 통합 지원
◦
오프셋 관리 기능 제공
◦
정확히 한 번 전달 보장을 위한 커넥터 개발 용이
9.1.3 높으면서도 조정 가능한 처리율
•
매우 높은 처리율 지원 (초당 수백 MB 처리 가능)
•
프로듀서와 컨슈머의 독립적 확장 가능
•
처리율 불균형 시 쓰는 쪽과 읽는 쪽 사이에서 버퍼 역할 수행
•
카프카 커넥트 API의 병렬화 지원
◦
단일/다중 노드 모두에서 효율적 실행
◦
다중 스레드 활용
•
다양한 압축 코덱 지원으로 자원 사용 최적화
•
위를 바탕으로 시스템 요구 조건에 따라 하나의 노드에서든 다양한 노드에서든 scale-out 가능
9.1.4 데이터 형식
데이터 파이프라인에서의 데이터 형식 고려사항
•
데이터 파이프라인의 핵심: 다양한 데이터 형식과 자료형 관리
•
시스템별로 다른 데이터 형식 지원
•
예시: XML/관계형 → Avro → JSON/Parquet/CSV 변환
•
데이터 파이프라인 설계 시 필수 고려사항
•
카프카의 데이터 처리 특성
◦
형식 독립성
▪
카프카 자체와 커넥트 API는 데이터 형식에 구애받지 않음
▪
프로듀서와 컨슈머는 필요한 데이터 형식을 지원할 수만 있다면 어떤 시리얼라이저도 쓸 수 있다.
▪
플러그형 컨버터로 다양한 형식 지원
◦
스키마 관리 기능
▪
소스/싱크 시스템의 스키마 지원
▪
자동 스키마 업데이트 가능
•
예: MySQL → Snowflake 파이프라인에서 새로운 열 자동 추가
▪
데이터 호환성 검증
•
커넥터의 역할
◦
싱크 커넥터가 저장할 외부 시스템 데이터 형식 담당
◦
형식 선택 기능 제공
▪
예: S3 커넥터의 Avro/Parquet 형식 선택 가능
9.1.5 데이터 변환 방식
•
ETL(Extract-Transform-Load) 방식
◦
특징
▪
데이터 파이프라인 내에서 변환 작업 수행
▪
시간과 공간 절약 가능
◦
장점
▪
저장 전 데이터 변환으로 효율적
▪
연산과 저장 부담을 파이프라인으로 옮긴다는 점 → 단점도 됨
◦
단점
▪
파이프라인에서 변환이 일어나 파이프라인 하단에서의 데이터 처리 제한 발생
▪
요구사항 변경 시 전체 파이프라인 재구성 필요 → 이거 심각
▪
기존 데이터 재처리 필요할 수 있음
•
ELT(Extract-Load-Transform) 방식
◦
특징
▪
원본 데이터와 최대한 유사하게 유지 → 최소한의 변환만 수행 (주로 자료형 변환)
▪
대상 시스템이 가공되지 않은 raw data를 받아서 모든 필요한 처리 수행
◦
장점
▪
raw data 제공을 통한 사용자에게 최대한의 유연성 제공
▪
모든 데이터 처리가 한 시스템에서 수행되어 관리 용이
◦
단점
▪
변환 작업이 대상 시스템의 CPU와 자원을 잡아먹는다는 점
•
카프카의 데이터 변환 지원
◦
단일 메시지 변환(SMT) 기능
▪
원본 → 카프카, 카프카 → 대상 시스템로의 단일 레코드 변환 지원
▪
메시지 라우팅, 필터링, 자료형 변환, 필드 삭제 등
◦
복잡한 변환 작업은 카프카 스트림 활용
▪
조인, 집계 등 고급 변환 작업 수행 가능
9.1.6 보안
•
누가 카프카에 접근할 수 있는지
•
메시지 암호화
•
파이프라인 수정 권한
•
개인 식별 정보에 대한 법 규제
등 다양한 보안 문제가 있을 수 있다.
카프카의 경우, 아래와 같은 기능 지원
•
데이터 암호화
•
SASL인증 인가
•
허가받지 않은 접근 내용을 추적할 수 있는 감사 로그
9.1.7 장애 처리
•
카프카는 장애가 발생했을 때 이전 시점으로 돌아가서 에러를 복구 가능.
•
또한 카프카에 저장된 이벤트가 유실되었을 경우 이벤트 재생도 가능.
◦
장기간에 걸쳐 카프카 로그 설정이 가능하기에
9.1.8 결합과 민첩성
데이터 파이프라인의 중요점 중 하나는 데이터 원본과 대상을 분리할 수 있어야 한다는 것
결합의 예제
1.
임기응변 파이프라인
•
애플리케이션을 연결해야 할 때마다 커스텀 파이프라인을 구축
◦
플룸을 활용해 로그를 HDFS에 밀어 넣기
◦
인포매티카를 활용해 XML형태로 데이터를 오라클에 밀어 넣기
•
특정 엔드포인트에 데이터 파이프라인이 결합되어 있어 유지보수가 쉽지 않음
2.
메타데이터 유실
•
데이터의 스키마 메타데이터를 보존하지 않고 스키마 진화 역시 지원하지 않는다면
•
소스 쪽에서의 데이터 생성 SW와 싱크 쪽에서의 데이터 SW를 모두 강하게 결합하게 된다.
•
따라서 스키마 레지스트리와 같이 두 스키마 데이터를 모두 해석하는 방식을 지정
◦
스키마 진화를 지원해야 한다.
3.
과도한 처리
•
파이프라인에서 너무 많은 처리 → 하단의 데이터 파이프라인의 필드가 필연적으로 적어지게 됨
•
이 때 하단 어플리케이션 요구 조건 변경 시
◦
파이프라인을 덩달아 수정해야하기에 과도한 데이터 처리는 지원하지 않는 것이 좋다
◦
어플리케이션이 알아서 처리 결정
9.2 카프카 커넥트 vs 프로듀서/컨슈머
•
프로듀서/컨슈머
◦
카프카 클라이언트를 활용해 Consume하거나 Produce하는 것을 의미
◦
어플리케이션 코드를 직접 커스텀하여 사용할 수 있다는 장점
•
카프카 커넥트
◦
카프카를 직접 코드나 API를 작성하지 않고, 외부 데이터 저장소에 연결시켜야할 때 사용.
◦
카프카 커넥트를 활용하게 되면
▪
외부 데이터 저장소의 데이터를 카프카로 가져올 수도, 저장된 데이터를 외부 저장소로 내보낼 수도 있다.
◦
이러한 카프카 커넥터를 활용하고자 한다면 각 저장소에 맞는 커넥터가 필요
◦
요즘은 많은 커넥터가 나와 있기에 실제로 해야할 일은 설정 파일을 작성하는 것 뿐
◦
커넥터가 없다면?
▪
카프카 클라이언트 or 커넥트 API를 활용하여 어플리케이션을 직접 작성할 수 있지만, 해당 작업은 쉽지 않다.
▪
그나마 커넥트 API 쪽이 설정 관리, 오프셋 저장, 병렬 처리, 에러 처리, 서로 다른 데이터 형식 지원 및 REST API를 통한 표준화된 관리 기능을 제공하기에 추천
◦
따라서 오픈소스 혹은 공식 문서로 잘 만들어진 커넥터를 활용하자
9.3 카프카 커넥트
카프카 커넥트는 다른 데이터 저장소와 카프카간의 확장성과 신뢰성을 가지면서 데이터를 주고받을 수 있는 수단을 제공
•
커넥트는 커넥터 플러그인을 개발하고 실행하기 위한 API와 런타임을 제공한다.
•
커넥터 플러그인은 카프카 커넥트가 실행하는 라이브러리로, 데이터를 이동하는 것을 담당
◦
카프카 커넥트는 워커 프로세스들의 클러스터 형태로 실행
◦
사용자는 워커에 커넥터 플러그인을 설치한 뒤 REST API를 사용해서 커넥터별 설정을 잡아 주거나 관리해줌
◦
커넥터는 대용량의 데이터 이동을 병렬화해서 처리하고 워커의 유휴 자원을 더 효율적으로 활용하기 위해 태스크 task를 추가로 실행
▪
소스 커넥터 태스크: 원본 시스템으로부터 데이터를 읽어 와서 커넥트 자료 객체의 형태로 워커 프로세스에 전달
▪
싱크 커넥트 태스크: 워커로부터 커넥트 자료 객체를 받아서 대상 시스템에 쓰는 작업을 담당
◦
카프카 커넥트는 데이터를 카프카에 쓸 때, 사용 형식으로 바꿀 수 있도록 컨버터를 사용
▪
Json, Avro, Protobuf
•
외부 저장소에 쓰여지는 데이터와 카프카 내 저장 데이터를 다양한 형태 스키마로 컨버팅하여 지원한다고만 이해
9.3.1 카프카 커넥트 실행
•
설치 특징
◦
아파치 카프카에 포함되어 배포되므로 별도로 설치할 필요는 없음
◦
프로덕션 환경에서는 브로커와 별도의 서버로 분리 운영 권장
▪
대용량 데이터 처리 시
▪
다수의 커넥터 사용 시
•
실행 방법
◦
실행 명령어
bin/connect-distributed.sh config/connect-distributed.properties
Shell
복사
◦
브로커와 유사한 실행 방식
•
핵심 설정 항목
◦
bootstrap.servers
▪
연동할 카프카 브로커 목록
▪
최소 3개 이상 브로커 지정 권장
◦
group.id
▪
커넥트 클러스터 구성 단위
▪
동일 그룹 ID의 워커들이 하나의 클러스터 구성
▪
커넥터와 태스크의 자유로운 실행 위치 지정
◦
plugin.path
▪
플러그인 디렉토리 지정
▪
지원 플러그인 종류
•
커넥터
•
컨버터
•
트랜스포메이션
•
비밀 제공자
▪
권장 구조
•
일반 커넥터
◦
커넥터별 서브디렉토리 생성
▪
/opt/connectors/jdbc와 /opt/connectors/elastic를 만들고 이 안에 커넥터 jar 파일과 모든 의존성들을 저장
/opt/connectors/
├── mysql-connector/
│ ├── mysql-connector.jar
│ ├── jackson-databind-2.12.0.jar
│ └── slf4j-api-1.7.25.jar
└── elasticsearch-connector/
├── elasticsearch-connector.jar
├── jackson-databind-2.11.0.jar
└── elasticsearch-client.jar
YAML
복사
▪
의존성 파일 함께 저장
◦
복수 디렉토리 지정 가능
▪
설정 예시
plugin.path=/opt/connectors,/home/gwenshap/connectors
Plain Text
복사
•
uberJar는 직접 저장 가능 (서브디렉토리 같은 것 만들 필요 없이 plugin.path 아래 바로 저장)
▪
주의사항
•
의존성 직접 저장 불가
예시
•
클래스패스 직접 추가 비권장
◦
의존성 충돌 위험
◦
plugin.path 사용 권장
예시
◦
key.converter와 value.converter
▪
데이터 형식 지정
▪
기본값: JSONConverter
Converter 설정 방식
# 기본 설정
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 스키마 포함 여부 설정
key.converter.schemas.enable=true|false
value.converter.schemas.enable=true|false
# Avro Converter -> 스키마 레지스트리 설정
key.converter.schema.registry.url=http://schema-registry:8081
value.converter.schema.registry.url=http://schema-registry:8081
Plain Text
복사
▪
지원 형식
•
JSON (기본)
•
Avro
•
Protobuf
•
JsonSchema
▪
각 컨버터별 필요한 의존성 확인
▪
데이터 형식과 스키마 호환성 고려
•
카프카 커넥트 REST API와 실행 모드
◦
커넥터를 설정하거나 모니터링할 때는 카프카 커넥트의 REST API를 사용
▪
REST API 기본 설정
rest.host.name=localhost
rest.port=8083
Plain Text
복사
▪
버전 확인
$ curl http://localhost:8083/
{
"version": "3.0.0-SNAPSHOT",
"commit": "fae0784ce32a448a",
"kafka_cluster_id": "pfkYIGZQSXm8RylvACQHdg"
}
Plain Text
복사
▪
사용 가능한 커넥터 목록 조회
$ curl http://localhost:8083/connector-plugins
Plain Text
복사
커넥트 실행 모드
•
분산 모드 (Distributed Mode)
◦
기본 실행 방식
◦
REST API로 관리
◦
실행 명령
bin/connect-distributed.sh config/connect-distributed.properties
Shell
복사
•
독립 실행 모드 (Standalone Mode)
◦
단일 워커에서 모든 커넥터/태스크 실행
◦
명령줄로 설정 파일 전달
◦
실행 명령
bin/connect-standalone.sh config/connect-standalone.properties
Shell
복사
◦
사용 케이스
▪
특정 장비 실행 필요성 고려
▪
예: 파일 시스템 모니터링 → 특정 서버의 로그 파일 감시
JDBC 소스 커넥터 설정 예시 (MySQL Source Connector)
1.
REST API를 통한 커넥터 생성
MySQL의 login 테이블 데이터를 카프카로 가져오는 파이프라인을 구성
curl -X POST -H "Content-Type: application/json" \
http://localhost:8083/connectors -d '{
"name": "mysql-login-connector",
"config": {
"connector.class": "JdbcSourceConnector",
"connection.url": "jdbc:mysql://127.0.0.1:3306/test?user=root",
"mode": "timestamp",
"table.whitelist": "login",
"validate.non.null": "false",
"timestamp.column.name": "login_time",
"topic.prefix": "mysql."
}
}'
Shell
복사
•
기본 설정
◦
name: 커넥터 이름 (mysql-login-connector)
◦
connector.class: JDBC 소스 커넥터 클래스
•
연결 설정
◦
connection.url: MySQL 연결 정보
◦
table.whitelist: 대상 테이블 (login)
•
동작 설정
◦
mode: 타임스탬프 기반 동작
◦
timestamp.column.name: 기준 시간 컬럼
◦
validate.non.null: null 값 검증 해제
◦
topic.prefix: 토픽 접두사 (mysql.)
9.3.4 개별 메세지 변환 (SMT)
•
SMT 개요
◦
정의: 상태 없는(stateless) 개별 메시지 변환
◦
특징: 코드 작성 없이 커넥트 config 설정으로 수행 가능
◦
용도: ETL 파이프라인의 변환 단계
•
기본 제공 SMT 종류
◦
데이터 변환
▪
Cast: 필드 데이터 타입 변경
▪
Flatten: 중첩 구조 평탄화
▪
TimestampConverter: 시간 형식 변환
◦
필드 관리
▪
MaskField: 민감 정보 null 처리
▪
InsertField: 새 필드 추가
▪
ReplaceField: 필드 삭제/이름 변경
◦
라우팅
▪
Filter: 조건부 메시지 필터링
▪
RegexRouter: 토픽 이름 변경
▪
TimestampRouter: 타임스탬프 기반 토픽 변경
◦
헤더 조작
▪
HeaderFrom: 필드를 헤더로 이동/복사
▪
InsertHeader: 정적 문자열 헤더 추가
•
고오급 변환
◦
상태 기반 변환
▪
카프카 스트림즈 프레임워크 사용
▪
조인, 집계 등 복잡한 연산 수행
•
추가 리소스
◦
변환 기능 중에는 아파치 카프카 프로젝트 외부의 기여자들이 만든 것들도 많으니 참고
▪
깃허브
▪
컨플루언트 허브
◦
•
활용 예시
# MySQL 커넥터에 헤더 추가 예시
transforms=InsertHeader
transforms.InsertHeader.type=org.apache.kafka.connect.transforms.InsertHeader
transforms.InsertHeader.header.name=source
transforms.InsertHeader.header.value=mysql-connector
Plain Text
복사
MySQL 테이블에 새 레코드를 추가하면, "mysql.login" 토픽에 헤더가 추가된 새 메시지가 생성
에러 처리와 DLQ
•
error.tolerance 설정
◦
모든 싱크 커넥터에서 사용 가능
◦
에러 처리 옵션
▪
오염된 메시지 무시
▪
데드 레터 큐로 전송
•
데드 레터 큐 (Dead Letter Queue)
◦
정의: 처리 실패한 메시지를 저장하는 특별한 토픽
◦
용도
▪
실패한 메시지 보관
▪
문제 분석
▪
재처리 가능성 유지
•
설정 예시
# 에러 처리 설정
errors.tolerance=all
errors.deadletterqueue.topic.name=my-dlq
errors.deadletterqueue.context.headers.enable=true
Plain Text
복사
9.3.5 카프카 커넥트: 좀 더 자세히 알아보기
과정 개요
1.
하나 이상의 워커가 카프카 커넥트 클러스터에서 실행된다.
2.
워커는 하나 이상의 커넥터 플러그인을 갖고 있다.
•
각 plugin은 connector와 task를 갖고 있다.
3.
워커는 topic과 task간의 데이터를 이동시킨다.
4.
워커는 connector와 task를 시작시킨다.
관심사의 분리
•
데이터 이동: 커넥터/태스크 담당
•
운영/관리: 워커 담당
1.
주요 구성요소
•
카프카 커넥트를 사용하려면 워커 클러스터를 실행시킨 뒤 커넥터를 생성하거나 삭제해주어야 한다
•
구성 요소
1.
커넥터와 태스크
•
커넥터
◦
Task를 관리하여 데이터 스트리밍을 조정하는 높은 수준의 추상화
▪
커넥터에서 몇 개의 태스크가 실행되어야 하는지 결정
▪
데이터 복사 작업을 각 태스크에 어떻게 분할해 줄지 결정
▪
워커로부터 태스크 설정을 얻어와서 태스크에 전달
•
태스크: 데이터를 실제로 카프카에 넣거나 가져오는 작업을 담당
◦
소스 태스크 : 외부 시스템을 폴링해서 워커가 카프카 브로커로 보낼 레코드 리스트를 리턴
◦
싱크 태스크 : 워커를 통해 카프카 레코드를 받아서 외부 시스템에 쓰는 작업을 담당
2.
워커
•
Connector 및 Task을 실행하는 실행 프로세스 (컨테이너 프로세스)
◦
커넥터/태스크 실행 관리
◦
커넥터와 설정 정의하는 HTTP 요청 처리
◦
설정 관리
▪
커넥터 설정을 내부 카프카 토픽에 저장하고, 커넥터와 태스크를 실행시키고, 여기에 적절한 설정값을 전달해주는 역할
◦
워커 프로세스들의 고가용성/장애 복구 지원
▪
예시. 소스와 싱크 커넥터의 오프셋을 내부 카프카 토픽에 자동으로 커밋하는 작업과 태스크 에서 에러가 발생할 경우, 재시도하는 작업 역시 담당
3.
컨버터 및 데이터 모델
•
데이터 API
◦
구성요소
▪
데이터 객체
▪
스키마 정보
◦
예시 (JDBC 소스 커넥터)
▪
ConnectSchema 객체 생성
▪
Struct 객체에 데이터 저장
◦
컨버터
▪
Connect와 데이터를 보내거나 받는 시스템 간의 데이터 변환을 위한 코드
•
과정
◦
소스 커넥터:
▪
원본 시스템 → Schema, Value 순서쌍 생성
▪
데이터 API 객체 → 워커 → 컨버터 변환 → 카프카
◦
싱크 커넥터:
▪
카프카 → 컨버터 변환 → 데이터 API 레코드
▪
Schema, Value 파싱 → 대상 시스템
•
다양한 데이터 형식 지원 (JSON, Avro, Protobuf 등)
•
예. MySQL 행을 커넥터가 카프카에 쓰는 JSON 레코드 형태로 변환하는 작업
4.
오프셋 관리
•
기본 개념
◦
목적: 데이터(이벤트) 처리 현황 추적
◦
기능: REST API 통한 배포/설정 관리
◦
특징: 워커 프로세스가 제공하는 편의 기능
•
소스 커넥터 오프셋
◦
구성
▪
논리적 파티션
▪
소스 시스템 오프셋
◦
예시
▪
파일 소스: 파일(파티션), 줄/문자 위치(오프셋)
▪
JDBC 소스: 테이블(파티션), ID/타임스탬프(오프셋)
•
오프셋 처리 과정
◦
소스 커넥터
▪
레코드 리턴 (파티션/오프셋 포함) → 워커가 카프카로 전송 → 성공 응답 후 오프셋 저장
◦
싱크 커넥터
▪
카프카 레코드에 이미 토픽, 파티션, 오프셋 식별자 포함
▪
카프카 레코드 읽기 → put() 메서드로 대상 시스템 저장 → 성공 시 오프셋 커밋
•
저장소 관리
◦
설정 가능 토픽 (오프셋이 저장될 토픽 이름)
▪
offset.storage.topic: 오프셋 저장
▪
config.storage.topic: 커넥터 설정
▪
status.storage.topic: 커넥터 상태
reference
(참고) WAP 패턴에서의 카프카 유스 케이스
숨고의 커넥트 전환기 (config 관리 팁)
카프카 커넥트 커스텀 만들기