Search

[카프카] 멱등성 프로듀서, 트랜잭션 프로듀서와 컨슈머

멱등성 프로듀서, 트랜잭션 프로듀서와 컨슈머

멱등성

동일한 연산을 여러 번 수행해도 동일한 결과를 나타내는 것을 의미합니다.
멱등성 프로듀서는 동일한 데이터를 여러 번 전송하더라도 카프카 클러스터에 단 한 번만 저장되는 것을 의미합니다.
기본 프로듀서는 적어도 한 번 전달(at least once delivery)을 지원하는 동작 방식을 가지고 있습니다. 이는 프로듀서가 클러스터에 데이터를 전송하여 저장할 때 적어도 한 번 이상 데이터를 적재할 수 있으며, 데이터가 유실되지 않는다는 것을 의미합니다. 그러나 두 번 이상 적재되어 중복이 발생할 가능성이 있습니다.
at least once : 적어도 한 번 이상 전달
at most once : 최대 한 번 전달
exactly once : 정확히 한 번 전달

멱등성 프로듀서

멱등성 프로듀서는 기본 프로듀서와는 달리 데이터를 브로커로 전달할 때 프로듀서 고유 ID(PID)와 시퀀스 번호(sequence number)를 함께 전달합니다.
따라서 브로커는 프로듀서의 PID와 시퀀스 번호를 확인하여 동일한 메시지의 적재 요청이 오더라도 단 한 번만 데이터를 적재합니다.
enable.idempotence 옵션
이 옵션은 정확히 한 번 전달을 지원합니다. 기본값은 false이며, true로 설정하면 멱등성 프로듀서로 동작하게 됩니다.
카프카 3.0.0부터는 enable.idempotence 옵션값의 기본값은 true(acks=all)로 변경되었음.
멱등성 프로듀서로 동작하지 않은 경우
프로듀서와 브로커 사이의 네트워크 장애로 인해 데이터가 한 번 이상 전달될 수 있습니다.
멱등성 프로듀서의 경우
네트워크 장애가 발생하더라도 브로커에 동일한 메시지는 적재하지 않는다.
멱등성 프로듀서가 전송하는 데이터에 PID와 시퀀스 넘버가 있는데, 브로커는 PID와 시퀀스 넘버(s id)로 중복을 체크한다.

동일한 세션이 가지는 의미

멱등성 프로듀서는 동일한 세션에서 한 번의 전달만을 보장합니다.
동일한 세션은 PID(프로세스 ID)의 생명주기를 의미합니다.
만약 멱등성 프로듀서로 작동하는 애플리케이션에 문제가 발생하여 종료되고 다시 시작되면 PID가 변경됩니다. 이 경우, 동일한 데이터를 전송하더라도, 변경된 PID로 인해 브로커는 다른 프로듀서 애플리케이션이 다른 데이터를 전송한 것으로 판단하여 중복이 발생할 수 있습니다.
멱등성 프로듀서는 장애가 발생하지 않는 경우에 한하여 한 번만 적재를 보장합니다.

멱등성 프로듀서로 설정할 경우의 옵션

enable.idempotence = true 설정에 따른 강제 설정 옵션
retries : Integer.MAX_VALUE
acks : all
이 설정은 프로듀서가 브로커에 데이터를 적어도 한 번 이상 전송하더라도 브로커가 데이터를 단 한 번만 적재하도록 보장하기 위한 것입니다.
정확히 말하자면, 멱등성 프로듀서가 브로커에 데이터를 정확히 한 번 적재하기 위해 실제로 한 번만 전송하는 것이 아닙니다. 상황에 따라 프로듀서는 여러 번 전송할 수 있지만, 브로커는 여러 번 전송된 데이터를 확인하고 중복된 데이터는 적재하지 않습니다. 하지만 이 과정에서 브로커 자체에 부하가 걸리게 됩니다.
kafka 3.0부터는 enable.idempotence = true를 설정해도, 브로커 부하가 보다 더 최적화되어 조금 더 안전하게 exactly once를 합니다.

멱등성 프로듀서 사용 시 발생 가능한 오류

OutofOrderSequenceException
멱등성 프로듀서의 시퀀스 번호는 0부터 시작하여 +1씩 증가합니다. 브로커에서는 멱등성 프로듀서가 전송한 데이터의 PID와 시퀀스 번호를 확인합니다. 시퀀스 번호가 일정하지 않은 경우, 에러가 발생할 수 있습니다.
예) 시퀀스 번호 0 다음에는 1이 와야 하는데, 2가 오면 에러가 발생합니다.
하지만, 이 에러를 피하기 위해 멱등성 프로듀서를 운영하게 된다면, 오히려 아키텍처 단에서 더 브로커에 부하를 주는 운영이 될 수 있습니다. 예를 들자면, 컨슈머 단에서 idempotence하게 운영을 한다면, 프로듀서는 부하를 많이 덜 수 있게 됩니다.

트랜잭션 프로듀서

데이터를 여러 파티션에 저장할 경우, 모든 데이터에 대해 동일한 원자성을 유지하기 위해 이를 사용합니다. 원자성을 유지한다는 것은 여러 데이터를 하나의 트랜잭션으로 묶어 전체 데이터를 처리하거나 전혀 처리하지 않는 것을 의미합니다.
트랜잭션 프로듀서사용자가 보낸 데이터를 레코드로 파티션에 저장할 뿐만 아니라, 트랜잭션의 시작과 끝을 표현하기 위해 트랜잭션 레코드를 한개 더 보내게 됩니다.

트랜잭션 컨슈머의 동작

트랜잭션 컨슈머는 파티션에 저장된 트랜잭션 레코드를 확인하여 트랜잭션이 완료되었는지 판단하고, 완료된 경우에만 데이터를 가져갑니다. 커밋되지 않은 데이터는 브로커에서 가져가지 않습니다. 트랜잭션 프로듀서가 커밋하면, 컨슈머는 커밋 레코드를 기반으로 트랜잭션 레코드들을 처리합니다.
트랜잭션 프로듀서
사용자가 보낸 데이터를 파티션에 레코드로 저장하고, 트랜잭션의 시작과 종료를 나타내기 위해 추가적인 트랜잭션 레코드를 생성합니다.
트랜잭션 레코드
실질적인 데이터를 포함하지 않고, 트랜잭션이 종료된 상태를 표시하는 정보만을 가집니다.
레코드의 속성은 그대로 유지하며, 파티션에 저장되어 오프셋을 하나 차지합니다.
트랜잭션 컨슈머
커밋이 완료된 데이터만 파티션에서 가져갑니다.
데이터는 있지만 트랜잭션 레코드가 없는 경우, 트랜잭션이 아직 완료되지 않았다고 판단하여 데이터를 가져가지 않습니다.

코드

컨슈머는 원칙적으로 프로듀서가 보내는 데이터를 파티션에 쌓이는 대로 가져와 처리합니다.
그러나 트랜잭션으로 묶인 데이터를 브로커에서 가져올 때는 설정을 통해 다르게 동작하게 할 수 있습니다.
프로듀서와 컨슈머는 트랜잭션으로 처리가 완료된 데이터만 읽고 쓰도록 설정됩니다.
enable.idempotence를 true로 설정하고 transactional.id를 임의의 문자열 값으로 정의합니다.
프로듀서 별로 고유 id 값을 사용
init, begin, commit 순으로 수행
여러 개의 레코드를 하나의 원자 단위로 묶어서 보냅니다.
컨슈머의 isolation.level을 read_committed로 설정합니다.
기본은 read uncommitted입니다
read_committed 로 하게 되면, 커밋이 완료된 레코드들만 읽어 처리하게 됩니다.

참고

위 그림과 글은 아래 글을 많이 참고하였습니다.