Kafka Consumer 내부 동작 — Fetcher, GroupCoordinator, 그리고 KIP-848 차세대 리밸런스 프로토콜
카프카 컨슈머가
poll()한 줄로 데이터를 끌어오는 동안 클라이언트와 브로커가 실제로 어떤 합의를 거치는지 정리합니다. 프로듀서 편(RecordAccumulator/Sender)과 짝을 이루는 컨슈머 측 내부 구조를apache/kafkatrunk 소스 기준으로 풀어내고, Kafka 4.0에서 GA, 4.1에서 기본값이 된 KIP-848 새 그룹 프로토콜이 기존 JoinGroup/SyncGroup 모델을 어떻게 바꿔 놓았는지까지 같은 그림 위에 얹습니다. 컨슈머가 잠깐 멈추는 동안 코디네이터에서 무엇이 일어나는지를 들여다보고 싶은 분을 위한 글입니다.
컨슈머가 하는 일
KafkaConsumer가 외부에서 보면 하는 일은 단순합니다.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> r : records) handle(r);
}
하지만 이 한 줄 뒤에는 세 개의 독립적인 책임이 동시에 굴러갑니다.
- Group membership — 같은
group.id를 가진 다른 컨슈머와 협상해서 파티션 일부를 배정받습니다. 협상 결과가 바뀌면 partition을 양보(revoke)·인수(assign)합니다. - Fetch — 배정된 각 파티션의 leader 브로커에서 메시지를 끌어옵니다. 가능하면 한 번의 RPC로 여러 파티션을 묶어 받고, 같은 파티션 집합이면 다음 RPC에서는 변경분만 보냅니다(KIP-227).
- Offset commit — 어디까지 처리했는지를
__consumer_offsets토픽에 기록합니다.
이 세 책임이 poll() 한 호출 안에서 어떻게 엮이는지가 컨슈머 내부의 핵심이고, KIP-848은 (1)을 통째로 다시 그렸습니다.
poll() 한 번이 하는 일
trunk의 AsyncKafkaConsumer.poll()는 대략 다음 순서로 동작합니다. 새 AsyncKafkaConsumer는 KIP-848과 함께 도입된 구현으로, 백그라운드 네트워크 스레드(ConsumerNetworkThread)에 I/O를 위임하고 애플리케이션 스레드는 큐에 이벤트를 던지는 방식으로 분리되어 있습니다. 기존 ClassicKafkaConsumer는 한 스레드에서 모든 일을 처리했는데, 새 구현은 user thread와 background thread 사이에서 ApplicationEventHandler/BackgroundEventHandler 두 큐가 양방향으로 메시지를 옮깁니다.
flowchart LR
UT["user thread<br/>poll()"] -- enqueue --> AEH[ApplicationEventHandler queue]
AEH --> CNT[ConsumerNetworkThread]
CNT --> RM["RequestManagers<br/>(Heartbeat / Fetch / Commit /<br/>Membership / OffsetFetch)"]
RM -- send/receive --> Broker
Broker -- response --> RM
RM --> BEH[BackgroundEventHandler queue]
BEH --> UT
poll() 본문은 다음 4단계로 정리할 수 있습니다.
- wakeup/closed 검사 — 다른 스레드가
wakeup()을 호출했는지, 컨슈머가 닫혔는지 먼저 본다. - updateAssignmentMetadataIfNeeded() — 메타데이터 갱신, 그룹 코디네이터 폴링, 그리고 모든 assigned partition이 fetch position을 가지도록 보정한다(없으면
auto.offset.reset적용). - pollForFetches() —
FetchBuffer에서 이미 도착한 데이터가 있으면 즉시 디코드해서 반환. 없으면fetcher.sendFetches()로 새 fetch 요청을 큐에 넣고client.poll(timeout)으로 기다린다. - prefetch — 반환할 records를 가지고 있으면 다음 fetch 요청을 한 번 더 보내 파이프라이닝한다. 그래야 다음
poll()진입 시 데이터가 이미 도착해 있다.
여기서 (2)가 그룹 프로토콜과 만나는 지점, (3)이 데이터 평면과 만나는 지점입니다.
코드로 보면 AsyncKafkaConsumer.poll()의 골격은 다음과 같습니다(trunk, 가독성을 위해 축약).
public ConsumerRecords<K, V> poll(Duration timeout) {
acquireAndEnsureOpen();
try {
wakeupTrigger.maybeTriggerWakeup();
Timer timer = time.timer(timeout);
do {
// 1) 메타데이터 + 그룹 상태 + fetch position
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
// 2) 이미 도착한 데이터가 있으면 즉시 반환
final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
// 3) prefetch: 다음 poll에서 쓸 데이터 미리 요청
if (sendFetches() > 0 || cachedSubscriptionHasAllFetchPositions) {
client.transmitSends();
}
return new ConsumerRecords<>(fetch.records(), fetch.nextOffsets());
}
} while (timer.notExpired());
return ConsumerRecords.empty();
} finally {
release();
}
}
(1)의 updateAssignmentMetadataIfNeeded는 background thread에 "그룹 상태 폴링해라" 이벤트를 enqueue하고 응답을 기다립니다. 이 안에서 KIP-848 heartbeat가 보내지고, 응답으로 받은 assignment delta가 user thread의 콜백(ConsumerRebalanceListener.onPartitionsRevoked/Assigned)을 호출합니다. (2)의 pollForFetches는 background thread가 미리 채워 둔 FetchBuffer에서 records를 꺼내고, 비어 있으면 새 fetch 요청을 쏘고 응답을 timeout 한도 안에서 기다립니다.
Fetcher와 FetchSessionHandler — 데이터 평면
Fetcher는 broker 단위로 묶인 fetch 요청을 만들고, 응답을 FetchBuffer에 쌓고, user thread의 collectFetch()에서 디코드해 반환하는 역할을 합니다. 한 브로커가 leader로 들고 있는 여러 파티션을 한 번의 FetchRequest로 묶어 보내는데, 매번 모든 파티션 메타데이터(topic, partition, offset, max bytes)를 보내면 파티션이 많을수록 요청 자체가 무거워집니다.
KIP-227이 이 문제를 풀려고 도입한 게 incremental fetch session입니다. FetchSessionHandler가 broker별로 다음 상태를 들고 있습니다.
sessionId— 32-bit 식별자. 처음에는INVALID_SESSION_ID(0)이고, 첫 FULL 요청 응답에서 broker가 새 sessionId를 발급합니다.epoch— 같은 sessionId 안에서 단조 증가. broker가 partition 상태를 잃은 적이 없음을 양쪽이 합의한 카운터입니다.sessionPartitions— broker가 이미 알고 있다고 양쪽이 동의한 partition 집합.
요청은 두 종류로 갈립니다.
| 종류 | 언제 | 본문 |
|---|---|---|
| FULL | sessionId=0이거나 직전 응답이 INVALID | 모든 partition 메타데이터 |
| INCREMENTAL | 정상 세션 진행 중 | added / removed / altered만 |
INCREMENTAL이면 같은 partition 묶음을 계속 받는 동안에는 빈 요청이 갈 수도 있습니다. broker는 sessionId + epoch로 누구의 어떤 시점 상태인지 식별하고, 자기가 가진 partition 메타데이터를 그대로 재사용합니다. 파티션 수가 수천 개로 늘어나도 fetch 요청 크기는 거의 변하지 않습니다.
응답이 오면 FetchSessionHandler.handleResponse()가 nextMetadata를 갱신해 다음 요청의 sessionId/epoch를 결정합니다. broker가 세션을 잃었다고 알려오면 INVALID_SESSION_ID로 떨어지고 다음 요청은 다시 FULL이 됩니다.
데이터 양 조절에 쓰이는 4개 설정은 다음처럼 자리를 잡습니다.
| 설정 | 기본 | 누가 본다 |
|---|---|---|
fetch.min.bytes |
1 | broker: 응답이 이만큼 모일 때까지 보관 |
fetch.max.wait.ms |
500 | broker: fetch.min.bytes가 안 차도 이만큼 대기 후 응답 |
fetch.max.bytes |
52428800(50MiB) | broker: 한 응답의 상한 |
max.partition.fetch.bytes |
1048576(1MiB) | broker: 파티션 한 개의 상한 |
max.partition.fetch.bytes는 hard cap이 아니라 진척(progress) 보장용입니다. 단일 메시지가 이 값보다 크면 broker는 그 메시지가 어떤 partition의 첫 번째 메시지일 때만은 cap을 무시하고 돌려줍니다. 그렇지 않으면 컨슈머가 영원히 막힐 수 있기 때문입니다.
Incremental fetch session 한 사이클
처음에 partition orders-0,1,2를 같은 leader broker에서 받기 시작하는 컨슈머가 있다고 합시다.
[1] FULL fetch → request: sessionId=0, partitions={orders-0,1,2 + offsets}
response: sessionId=12345, epoch=1, records=[...]
[2] INCR fetch → request: sessionId=12345, epoch=2, added={}, removed={}, altered={}
response: sessionId=12345, epoch=2, records=[...]
[3] INCR fetch → 같은 형태로 빈 변경분만 반복
[4] partition 추가됨
INCR fetch → added={orders-3}, removed={}, altered={...offset changes}
[5] broker가 세션 evict (memory pressure)
INCR fetch → response: INVALID_SESSION_ID
[6] 다음 사이클은 FULL로 다시 시작
fetch-session-evictions JMX 메트릭이 0이 아니면 broker가 세션을 잃는 일이 발생한다는 신호이고, 그러면 다음 fetch가 FULL로 떨어져 일시적으로 대역폭이 늘어납니다. max.incremental.fetch.session.cache.slots(broker, 기본 1000)가 부족하면 빈번해질 수 있습니다.
그룹 프로토콜 옛길 — Eager · Cooperative
KIP-848을 이해하려면 옛 길을 먼저 봐야 합니다. KIP-848 이전에는 그룹 합의가 4-RPC 사이클이었습니다.
flowchart TB
A[FindCoordinator] --> B[JoinGroup]
B --> C{is leader?}
C -- yes --> D["compute assignment<br/>(client-side)"]
C -- no --> E[wait]
D --> F[SyncGroup with assignment]
E --> F
F --> G[Heartbeat loop]
G -- group change --> B
핵심 부분 — assignment 계산은 클라이언트가 한다. 첫 JoinGroup이 끝나면 코디네이터가 그룹 멤버 중 한 명을 leader로 선택하고, leader가 모든 멤버의 subscription을 가지고 PartitionAssignor를 돌려 결과를 SyncGroup으로 다시 코디네이터에 넘기면, 코디네이터가 각 멤버에게 본인 몫만 전달합니다.
이 모델의 두 가지 거친 부분이 있습니다.
1. Eager rebalance — 전원이 멈춘다. 어떤 멤버가 join 하거나 떠나면 전체 그룹이 partition을 모두 revoke하고, leader가 새 assignment를 계산하고, 다시 받아갑니다. partition 100개를 9개 컨슈머가 나눠 갖던 그룹에 1개 컨슈머가 추가되면 — 100개 partition 전부가 잠깐 누구도 처리하지 않는 상태가 됩니다. 컨슈머 1개를 추가했을 뿐인데 정지 시간은 그룹 전체 크기에 비례합니다.
2. Cooperative rebalance(KIP-429)로 두 번에 나눠 잡았다.
2.4부터 도입된 CooperativeStickyAssignor는 한 번의 JoinGroup → SyncGroup으로 끝내지 않고, "이 partition만 revoke해라" → 멤버가 revoke 후 다시 join → leader가 새 assignment 계산 → SyncGroup, 이렇게 두 번을 돌립니다. 정지하는 partition은 실제로 옮겨가는 일부만 — 진척이지만 본질적으로 client-driven · global-barrier 모델은 그대로입니다.
flowchart LR
subgraph eager[Eager]
E1[member join] --> E2["all members revoke ALL"]
E2 --> E3[leader assigns]
E3 --> E4[all members assigned]
end
subgraph coop[Cooperative]
C1[member join] --> C2["leader assigns,<br/>revoke list issued"]
C2 --> C3["affected members<br/>revoke subset, rejoin"]
C3 --> C4["leader assigns again"]
end
여전히 leader 컨슈머가 다운되면 그룹이 멈추고, JoinGroup에 모든 멤버가 모일 때까지 코디네이터가 기다리는 global synchronization barrier가 남아 있었습니다. 그래서 4.0의 KIP-848은 이 모델을 통째로 갈아엎습니다.
KIP-848 — 서버 주도의 비동기 협상
KIP-848은 3가지를 동시에 바꿉니다.
- Assignment 계산을 broker로 옮겼다. 새 protocol에서 leader 컨슈머라는 개념이 없어지고, 코디네이터가 직접
PartitionAssignor구현을 들고 있습니다.range,uniform두 가지를 기본 제공하고group.coordinator.rebalance.protocolsbroker 설정으로 켭니다. 클라이언트는group.remote.assignor로 어느 것을 쓸지 요청만 합니다. - JoinGroup/SyncGroup이 ConsumerGroupHeartbeat 하나로 합쳐졌다. 컨슈머는 처음부터 끝까지 같은 RPC만 보냅니다 — 처음에는 join 의도를 담아 보내고, 평소에는 빈 heartbeat를 보내고, 떠날 때는 epoch -1을 담아 보냅니다.
- Reconciliation이 점진적이다. 그룹 단위로 한 번에 멈추지 않습니다. 코디네이터는 target assignment를 계산해 두고, 각 멤버에게 heartbeat 응답으로 "당신은 이 partition을 revoke하고 그다음 이걸 assign받으세요"라고 차례로 말합니다. 영향을 받지 않는 멤버는 계속 처리합니다.
flowchart TB
M["Consumer (member)"] -- heartbeat req --> GC[Group Coordinator]
GC -- heartbeat resp<br/>with assignment delta --> M
GC -- read/write --> CO["__consumer_offsets<br/>(ConsumerGroupRecord,<br/>TargetAssignment,<br/>CurrentMemberAssignment)"]
GC -- pluggable --> SA["server-side<br/>PartitionAssignor"]
3개의 epoch
KIP-848의 모든 동작은 3개의 단조증가 카운터 위에서 굴러갑니다.
- group epoch — 그룹의 입력이 바뀔 때마다 +1. 멤버 join/leave, subscription 변경, topic의 partition 수 변경 같은 사건이 트리거. 코디네이터가 들고 있는 그룹 차원의 시계.
- assignment epoch — assignor가 새 assignment를 계산할 때마다 +1. 정상 흐름에서는 group epoch와 같이 올라가지만, assignor가 즉시 계산을 못 끝낼 때 일시적으로 분리될 수 있습니다.
- member epoch — 각 멤버가 받아들인 assignment의 버전. 멤버는 자기 epoch와 코디네이터가 알려주는 target epoch가 같아지면 STABLE.
flowchart LR
GE[group epoch] -- triggers --> AE[assignment epoch]
AE -- target for each --> ME1["member 1 epoch"]
AE -- target for each --> ME2["member 2 epoch"]
AE -- target for each --> MEN["member N epoch"]
heartbeat 한 번이 끝나면 어떤 멤버는 새 epoch에, 어떤 멤버는 직전 epoch에 머물 수 있습니다. 모두가 동시에 같은 시점에 있을 필요가 없는 게 핵심 — 그래서 global barrier가 사라집니다.
ConsumerGroupHeartbeat RPC
ConsumerHeartbeatRequestManager가 빌드하는 RPC는 다음 필드를 가집니다(전부 보내는 게 아니라 변경분만 보내는 점이 중요합니다).
| 필드 | 항상 | join 시 | 변경 시 |
|---|---|---|---|
GroupId, MemberId, MemberEpoch |
O | ||
InstanceId |
(있을 때만) | ||
RebalanceTimeoutMs |
O | 변경 시 | |
SubscribedTopicNames |
O | 구독 바뀔 때 | |
SubscribedTopicRegex |
O | 패턴 바뀔 때 | |
ServerAssignor |
O | group.remote.assignor 바뀔 때 |
|
TopicPartitions |
O | reconcile 끝나고 ack 시 | |
RackId |
O | (이후엔 안 보냄) |
MemberEpoch의 특수값 세 개를 기억해 두면 흐름이 잡힙니다.
0— 처음 join할 때. 응답에서 양수 epoch를 받아오면 JOINING → RECONCILING.-1— dynamic 멤버가 그룹을 떠날 때. 코디네이터는 즉시 멤버를 제거합니다.-2— static 멤버(group.instance.id있는)가 셧다운하지만 다음 인스턴스가 재진입할 자리는 보존하고 싶을 때.
heartbeat 간격은 broker가 응답의 heartbeatIntervalMs로 알려줍니다 — 컨슈머가 결정하지 않습니다. 옛 프로토콜에서 클라이언트 설정이던 heartbeat.interval.ms, session.timeout.ms가 이제 broker 설정 group.consumer.heartbeat.interval.ms(기본 5000), group.consumer.session.timeout.ms(기본 45000)로 이동했고 클라이언트 값은 무시됩니다.
멤버 상태머신
MemberState.java 10개 상태가 한 멤버의 일생입니다.
flowchart LR
UNS[UNSUBSCRIBED] --> JOIN[JOINING]
JOIN --> REC[RECONCILING]
REC --> ACK[ACKNOWLEDGING]
ACK --> ST[STABLE]
ST --> REC
ST --> PL[PREPARE_LEAVING]
PL --> LV[LEAVING]
ST --> FEN[FENCED]
FEN --> JOIN
ST --> STL[STALE]
STL --> JOIN
JOIN --> FAT[FATAL]
REC --> FAT
ST --> FAT
- UNSUBSCRIBED —
group.id는 있지만 구독이 비어 있다. 오프셋 commit은 가능하지만 그룹에 active 멤버는 아니다. - JOINING —
MemberEpoch=0으로 heartbeat를 보내며 양수 epoch를 기다린다. - RECONCILING — 새 target assignment가 도착했고, 자신의 current와 비교해서 revoke·assign 콜백을 처리하는 중.
- ACKNOWLEDGING — reconciliation은 끝났고 다음 heartbeat로 코디네이터에게 "ack" 신호를 보낼 준비.
- STABLE — 자기 epoch가 target epoch와 같다. 정기 heartbeat만 보낸다.
- FENCED — 코디네이터가
UNKNOWN_MEMBER_ID또는FENCED_MEMBER_EPOCH에러로 멤버를 잘라낸 상태. revoke 콜백 후 다시 JOINING. - PREPARE_LEAVING / LEAVING —
unsubscribe()/close()흐름. - STALE —
max.poll.interval.ms(기본 300000) 안에 user thread가poll()을 다시 부르지 않은 상태. 그룹을 떠났다가 다음 poll에서 재진입. - FATAL —
UNSUPPORTED_VERSION(broker가 KIP-848 미지원),UNRELEASED_INSTANCE_ID,FENCED_INSTANCE_ID같은 회복 불가 오류 후. 더는 요청을 보내지 않는다.
max.poll.interval.ms ↔ session timeout의 분리가 KIP-848에서도 그대로 유지됩니다. heartbeat는 background thread가 보내므로 user thread가 poll() 사이에서 오래 일을 해도 session은 살아 있고, 다만 user thread가 max.poll.interval.ms를 넘기면 STALE로 떨어져 자발적으로 그룹을 떠납니다 — 무거운 작업을 하는 컨슈머가 다른 멤버까지 끌고 죽는 일을 막기 위함입니다.
코디네이터 측 reconciliation
코디네이터는 멤버별로 두 개의 assignment를 들고 있습니다.
- Target assignment — assignor가 계산한 최종 목적지.
__consumer_offsets에TargetAssignment레코드로 영속화. - Current assignment — 멤버가 실제로 가진 partition. 멤버가 ack 한 후 갱신.
reconciliation은 3-phase로 진행됩니다.
flowchart TB
P0["target ≠ current"] --> P1["P1: send revoke list<br/>(intersect old, no longer in target)"]
P1 --> P2["member acks revoke<br/>via next heartbeat"]
P2 --> P3["P2: durably write<br/>new current = target"]
P3 --> P4["P3: send assign list<br/>via heartbeat resp"]
P4 --> P5["member ack-empties<br/>TopicPartitions"]
P5 --> P0
핵심 포인트 — 코디네이터는 revoke가 ack된 후에야 새 partition을 다른 멤버에게 넘깁니다. 두 멤버가 동시에 같은 partition을 가졌다고 생각하는 구간이 없는 게 fundamental safety guarantee.
또 하나 — 이 모든 상태 전이는 __consumer_offsets에 KIP-848이 추가한 새 record 종류(ConsumerGroupMetadataRecord, ConsumerGroupMemberMetadataRecord, ConsumerGroupPartitionMetadataRecord, ConsumerGroupTargetAssignmentRecord, ConsumerGroupCurrentMemberAssignmentRecord)로 영속화됩니다. 코디네이터가 죽었다 살아도 같은 epoch에서 이어 갈 수 있는 이유입니다.
한 사이클 예시 — 멤버 1개 추가
컨슈머 9개(M1..M9)가 partition 90개를 균등 분담하고 있다고 합시다. M10이 join할 때 어떤 일이 일어나는지를 epoch와 함께 따라가 봅니다.
초기 상태:
group epoch = 7, assignment epoch = 7
M1..M9 각 epoch = 7, 각자 10개 partition
t=0: M10이 join(MemberEpoch=0) heartbeat 전송
코디네이터: group epoch 8로 증가
assignor 실행 → 새 target: M1..M9 각 9개, M10에 9개
(각 멤버에서 1개씩 M10으로 이동)
assignment epoch = 8
t=heartbeat 1ms 후: M1 heartbeat 도착
resp: revoke partition X1
M1: RECONCILING, onPartitionsRevoked(X1) 콜백
M2..M9는 아직 STABLE (영향 없음, 다음 heartbeat까지 기다림)
t=interval 후: M1이 ack heartbeat 보냄(TopicPartitions 9개)
코디네이터: M1.current = target (epoch 8), durably persist
M1: STABLE
t=다음: M2 heartbeat → revoke X2 → ack → STABLE ...
...순차적으로...
t=마지막: M10 heartbeat → assign {X1..X9}
M10: STABLE, group reconciled
핵심 — M10이 join한 순간부터 M1만 잠깐 X1 처리를 멈춥니다. M2..M9는 동시에 멈추지 않습니다. 동일 시나리오에서 옛 eager 모드라면 90개 partition 전부가, cooperative 모드라면 9개 partition이 동시에 멈췄을 자리에서, KIP-848은 한 번에 1개씩 멈추고 풀립니다. 모든 멤버가 동시에 같은 epoch에 있지 않아도 일관성이 유지되는 게 declarative target + reconciliation loop 모델의 보장입니다.
서버 사이드 assignor
group.coordinator.rebalance.protocols에 consumer가 들어 있으면 broker는 두 assignor를 갖춥니다.
| Assignor | 동작 | 특징 |
|---|---|---|
uniform |
파티션을 균등 분배, 같은 토픽 안에서 컨슈머별 차이를 1 이내로 | sticky · rack-aware. 기본 권장. |
range |
토픽별로 partition 0..N을 컨슈머 순서대로 잘라 할당 | 여러 토픽을 같은 키로 co-partition하고 싶을 때. |
클라이언트는 group.remote.assignor로 둘 중 하나를 지정합니다. 안 지정하면 broker 설정 group.consumer.assignors의 첫 번째 항목을 씁니다.
assignor 인터페이스는 broker JAR(org.apache.kafka.coordinator.group.api.assignor)에 정의되어 있고, 사용자가 직접 구현해 broker classpath에 넣을 수도 있습니다. 옛 모델과 달리 컨슈머 JAR을 바꿔 배포할 필요가 없는 게 운영상 장점입니다.
클래식과 새 프로토콜이 공존하는 동안
KIP-848은 빅뱅 마이그레이션을 강요하지 않습니다. broker 측 group.coordinator.rebalance.protocols에 classic,consumer 둘 다 켜 두면, 같은 group.id의 그룹이 어떤 프로토콜로 운영될지는 첫 멤버의 group.protocol이 결정합니다(이미 살아 있는 그룹은 첫 멤버의 프로토콜을 유지). 그래서 일반적인 마이그레이션 순서는:
- broker를 4.0 이상으로 올린다.
group.coordinator.rebalance.protocols=classic,consumer설정. - consumer 애플리케이션 라이브러리를 4.0 이상으로 올린다.
group.protocol=classic(기본)으로 일단 유지 — 옛 동작 그대로.- 새 그룹부터 또는 컨슈머를 전부 한 번 셧다운 후
group.protocol=consumer로 켠다. - 안정화되면
group.coordinator.rebalance.protocols=consumer로 좁힌다.
같은 그룹 안에 classic 멤버와 consumer 멤버를 섞을 수는 없습니다. 코디네이터가 한 그룹은 하나의 프로토콜로만 운영합니다.
설정 매트릭스
| 설정 | 기본 | 위치 | 어디서 본다 |
|---|---|---|---|
group.protocol |
classic |
client | consumer JAR이 어느 RPC 흐름을 쓸지 결정 |
group.remote.assignor |
(null) | client | KIP-848 모드에서 사용할 assignor 이름 요청 |
group.coordinator.rebalance.protocols |
classic |
broker | 4.0에서 classic,consumer로 점진 전환 |
group.consumer.heartbeat.interval.ms |
5000 | broker | KIP-848 heartbeat 응답에 실어 보냄 |
group.consumer.session.timeout.ms |
45000 | broker | session 만료 임계 |
group.consumer.min.session.timeout.ms |
45000 | broker | 클라이언트가 요청한 값의 floor |
group.consumer.max.session.timeout.ms |
60000 | broker | 클라이언트가 요청한 값의 cap |
heartbeat.interval.ms |
3000 | client | KIP-848에선 무시(broker가 결정) |
session.timeout.ms |
45000 | client | KIP-848에선 무시 |
max.poll.interval.ms |
300000 | client | user thread 진행 보장 — 양 프로토콜 공통 |
partition.assignment.strategy |
RangeAssignor,CooperativeStickyAssignor |
client | classic 전용 |
fetch.min.bytes |
1 | client→broker | broker가 응답 누적 임계로 본다 |
fetch.max.wait.ms |
500 | client→broker | broker가 대기 최대치로 본다 |
max.partition.fetch.bytes |
1048576 | client→broker | partition별 상한(단일 메시지 예외) |
enable.auto.commit |
true | client | offset commit 백그라운드 자동화 |
auto.offset.reset |
latest | client | fetch position 없을 때 기준점 |
KIP-848로 전환하면 client 설정 5개(heartbeat.interval.ms, session.timeout.ms, group.min.session.timeout.ms, group.max.session.timeout.ms, partition.assignment.strategy)는 사실상 의미가 사라집니다. 클라이언트는 그저 broker가 시키는 대로 따릅니다.
운영 함정
KIP-848 도입 후에도 살아남는 함정과 새로 생기는 함정을 같이 정리합니다.
| 증상 | 원인 | 대응 |
|---|---|---|
| 한 멤버 추가했는데 그룹 전체가 1초 멈춤 | classic + RangeAssignor 기본값 |
CooperativeStickyAssignor로 옮기거나 group.protocol=consumer로 전환 |
| consumer 4.0 라이브러리로 올렸는데 변화 없음 | group.protocol=classic 기본값 유지 |
명시적으로 consumer로 설정 |
group.protocol=consumer인데 UNSUPPORTED_VERSION |
broker 측에 consumer 프로토콜 미허용 |
broker group.coordinator.rebalance.protocols에 consumer 추가 |
max.poll.interval.ms 안에 처리 못 끝내 그룹에서 잘림 |
KIP-848에서도 user thread 진행 보장은 그대로 적용 | 배치 처리량을 줄이거나 (max.poll.records 감소) 컨슈머를 늘림 |
| static membership 재시작인데 partition 재배정 발생 | MemberEpoch=-1로 떠남(dynamic leave) |
셧다운 시 MemberEpoch=-2(static leave with stay)로 떠나도록 graceful close 호출 |
| fetch에 partition 수 만 개인데 RPC가 무겁다 | classic full fetch 모드로 떨어졌을 수 있음 | broker 로그에서 INVALID_SESSION_ID 빈도, fetch-session-evictions 메트릭 확인 |
| 컨슈머가 STABLE인데 records가 안 옴 | broker가 fetch session을 evict 했고 클라이언트는 INCREMENTAL을 계속 보냄 | 일시적 — FetchSessionHandler가 다음 응답에서 FULL로 자동 복구. 빈번하면 broker max.incremental.fetch.session.cache.slots 상향 |
rebalance 후 일부 partition이 auto.offset.reset 적용돼 latest로 뛰어 메시지 누락 |
새 partition에 committed offset이 없는데 reset=latest | 명시적 reset 정책 결정, 신뢰가 필요한 토픽은 reset=earliest로 |
| Kafka Streams 그룹이 consumer 프로토콜로 안 옮겨감 | Streams는 별도 프로토콜(KIP-1071 진행 중) 필요 | 현재는 Streams 그룹은 classic 유지 |
정리
KIP-848 이전에는 컨슈머가 똑똑하고 코디네이터가 단순했습니다. assignor는 컨슈머가 들고 있었고, 합의는 모든 멤버가 모이는 JoinGroup-SyncGroup barrier로 잡혔으며, leader 컨슈머가 죽으면 그룹이 정지했습니다.
KIP-848 이후에는 반대입니다. 코디네이터가 무거워졌고, 컨슈머는 자기 epoch와 자기 partition만 알면 됩니다. ConsumerGroupHeartbeat 한 RPC가 join · sync · keepalive · leave 네 가지 역할을 모두 하고, 코디네이터는 멤버별로 revoke → ack → assign을 순차적으로 풀어내며 다른 멤버의 처리를 막지 않습니다. 1000개 멤버 그룹에서도 한 명이 추가될 때 멈추는 partition은 실제로 옮겨가는 일부뿐입니다.
데이터 평면(Fetcher · FetchSessionHandler)은 KIP-227 이후 거의 그대로 — partition 수가 많아져도 incremental fetch session으로 RPC 크기를 일정하게 유지하는 모델이 잘 굴러갑니다. 컨슈머의 변화는 거의 control plane에 집중되어 있다는 점이, KIP-848을 "rebalance protocol"이라고 부르는 이유입니다.
참고자료
- KIP-848: The Next Generation of the Consumer Rebalance Protocol — https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol
- KIP-227: Introduce Incremental FetchRequests — https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
- KIP-429: Kafka Consumer Incremental Rebalance Protocol — https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol
- Apache Kafka 4.1 Consumer Rebalance Protocol — https://kafka.apache.org/41/operations/consumer-rebalance-protocol/
- Apache Kafka Consumer Configs — https://kafka.apache.org/41/documentation/#consumerconfigs
MemberState.java(trunk) — https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.javaAsyncKafkaConsumer.java(trunk) — https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.javaConsumerHeartbeatRequestManager.java(trunk) — https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.javaFetchSessionHandler.java(trunk) — https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.javaFetcher.java(trunk) — https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

