Kafka Producer 내부 동작 — RecordAccumulator, Sender Thread, 그리고 Sticky Partitioner
카프카 프로듀서의
send()는 왜 비동기이며, 그 사이에 어떤 자료구조와 스레드가 끼어 있는지를 OpenJDK 소스 기준으로 풀어냅니다. 카프카 입문 시리즈를 한 번이라도 본 분,batch.size와linger.ms를 조정해 봤지만 두 값이 어디서 만나는지 모호한 분을 대상으로 합니다.
들어가며
카프카 프로듀서의 코드를 처음 보면 send()가 Future<RecordMetadata>를 반환한다는 사실이 살짝 어색합니다. 호출 시점에 네트워크로 나가지 않고, 또 즉시 반환되지도 않고, 어딘가에 잠시 쌓였다가 나갑니다.
이 "잠시 쌓이는 어딘가"가 RecordAccumulator입니다. 그리고 그것을 실제로 브로커로 밀어 넣는 별도의 백그라운드 스레드가 Sender입니다. 사용자가 호출하는 애플리케이션 스레드(이하 user thread)와 Sender 스레드, 두 스레드가 공유 자료구조 위에서 produce/consume 패턴으로 협력하는 구조입니다.
이 글은 다음을 같은 그림 위에서 정리합니다.
KafkaProducer.send()의 진입점부터accumulator.append()까지의 user thread 흐름RecordAccumulator가TopicPartition별Deque<ProducerBatch>로 데이터를 모으는 방식BufferPool이buffer.memory를 관리하는 방식과max.block.ms의 의미Sender.runOnce()가ready()→drain()→sendProduceRequests()로 이어지는 메인 루프inFlightBatches와guaranteeMessageOrder가 순서 보장에서 하는 역할- 파티셔너의 진화: KIP-480 Sticky Partitioner → KIP-794 Strictly Uniform Sticky Partitioner
batch.size/linger.ms/compression.type/buffer.memory튜닝 포인트와 함정
송신 파이프라인 한 눈에
전체 그림을 먼저 한 장 펼쳐 두고 본문을 따라가는 편이 이해가 빠릅니다.
flowchart LR
App[Application Thread] -->|send record| Producer[KafkaProducer.doSend]
Producer -->|waitOnMetadata| Meta[Metadata Cache]
Producer -->|serialize| Ser[Serializer]
Producer -->|partition| Part[Partitioner]
Producer -->|append| Acc[RecordAccumulator]
Acc -->|allocate buffer| Pool[BufferPool]
Acc -.->|wakeup| Sender[Sender Thread]
Sender -->|ready| Acc
Sender -->|drain| Acc
Sender -->|ProduceRequest| NC[NetworkClient]
NC -->|Selector| Brokers[(Brokers)]
Brokers -.->|ProduceResponse| NC
NC -.->|callback| App
세로로 보면 좌측 위에서 우측 아래로 흐릅니다. 좌측 절반은 user thread, 우측 절반은 Sender thread가 담당합니다. 중간의 RecordAccumulator가 두 스레드의 만남의 광장입니다.
user thread: send()에서 accumulator.append()까지
KafkaProducer.send(ProducerRecord)는 내부적으로 doSend(record, callback)을 호출합니다. doSend의 책임은 다섯 가지로 정리할 수 있습니다.
- 메타데이터 확보 — 해당 토픽의 파티션 수와 리더 정보를 알아야 보낼 곳을 정할 수 있습니다.
- 직렬화 —
key.serializer와value.serializer를 거쳐byte[]로 만듭니다. - 파티션 선택 — 키가 있으면 hash 기반, 없으면 파티셔너에게 위임합니다.
- 누적기 적재 —
RecordAccumulator.append()로 메모리 버퍼에 기록합니다. - Sender 깨우기 — 누적기 상태에 따라
sender.wakeup()을 호출해 백그라운드 스레드를 즉시 깨웁니다.
각 단계를 차례로 살펴봅니다.
메타데이터 확보와 max.block.ms
프로듀서가 처음 어떤 토픽으로 보낼 때, 그 토픽이 몇 개의 파티션을 가지는지, 각 파티션의 리더가 어느 브로커인지 모르는 상태입니다. waitOnMetadata(topic, partition, nowMs, maxWaitMs)는 메타데이터가 준비될 때까지 대기합니다.
이때 사용되는 타임아웃이 max.block.ms입니다. 기본값은 60초로, 다음 두 경우의 대기 상한선을 정합니다.
- 토픽 메타데이터 미확보 상태에서 첫 메시지를 보낼 때
BufferPool이 가득 차서 새 버퍼를 할당받지 못할 때
max.block.ms를 짧게 잡으면 메타데이터가 늦게 도착하는 환경에서 TimeoutException이 잘 납니다. 반대로 길게 잡으면 호출 스레드(예: HTTP 요청 처리 스레드)가 오래 잠들 수 있습니다. 동기 호출이 아닌 비동기 send()도 이 단계에서는 호출 스레드를 블록할 수 있다는 점이 자주 놓치는 함정입니다.
직렬화와 파티션 선택
직렬화는 단순합니다. byte[] serializedKey = keySerializer.serialize(topic, headers, key), byte[] serializedValue = valueSerializer.serialize(topic, headers, value)로 끝납니다.
파티션 선택은 두 갈래입니다.
record.partition()이 지정되어 있으면 그대로 사용- 그렇지 않으면
partitioner.partition(...)또는 (3.3+ 기본) 내장BuiltInPartitioner가 결정
키가 있으면 거의 모든 구현이 Utils.murmur2(serializedKey) % numPartitions로 동일 키를 동일 파티션에 묶습니다. 키 없는 경우의 분배 전략이 KIP-480, KIP-794를 거치며 진화해 왔는데, 이는 뒤에서 따로 다룹니다.
accumulator.append() 호출
직렬화된 키·값과 선택된 파티션을 들고 RecordAccumulator.append(TopicPartition, timestamp, key, value, headers, callbacks, ...)를 호출합니다. 반환값은 RecordAppendResult로, 다음 세 가지 신호를 포함합니다.
future—RecordMetadata를 받게 될FuturebatchIsFull— 호출 결과 해당 배치가 가득 찼는지newBatchCreated— 새 배치가 생성되었는지
user thread는 이 두 boolean을 보고 Sender를 깨울지 결정합니다. 둘 중 하나라도 true이면 sender.wakeup()을 호출해 Selector의 wakeup()을 발동시켜 백그라운드 I/O를 즉시 재개시킵니다. 그렇지 않으면 Sender는 linger.ms만큼 잠들어 있다가 일어납니다.
여기서 user thread의 책임은 끝납니다. 메시지는 메모리에만 들어갔고, 네트워크로는 아직 한 바이트도 나가지 않았습니다.
RecordAccumulator — 두 스레드의 공유 광장
RecordAccumulator의 핵심 자료구조는 토픽·파티션별 큐입니다. 대략 다음 형태로 생각해도 무리가 없습니다.
ConcurrentMap<String /* topic */, TopicInfo> topicInfoMap;
class TopicInfo {
ConcurrentMap<Integer /* partition */, Deque<ProducerBatch>> batches;
BuiltInPartitioner builtInPartitioner;
}
각 TopicPartition마다 ProducerBatch의 deque가 있고, 새 레코드는 deque의 마지막 배치(peekLast())에 덧붙여집니다. 마지막 배치가 가득 차면 새 배치를 만들어 deque에 추가합니다.
flowchart LR
subgraph TopicA
TA0[Part 0 deque] --> BA01[Batch] --> BA02[Batch]
TA1[Part 1 deque] --> BA11[Batch]
end
subgraph TopicB
TB0[Part 0 deque] --> BB01[Batch] --> BB02[Batch] --> BB03[Batch]
end
Append[append record] --> TA0
Drain[drain batches] --> BA01
Drain --> BB01
append()의 실제 흐름
append()는 동일 deque에 대해 동시 호출되더라도 안전해야 합니다. 큐 단위로 잠금을 잡고 다음 순서로 진행합니다.
Deque<ProducerBatch>획득(getOrCreateDeque)- 잠금 안에서
tryAppend(timestamp, key, value, headers, callback, deque)시도 - 마지막 배치에 자리가 있으면 거기 추가하고
RecordAppendResult반환 - 자리가 없으면 잠금을 풀고
BufferPool.allocate(batchSize, maxBlockTimeMs)로 새 버퍼 확보 - 새 버퍼로 다시 잠금을 잡고 새
ProducerBatch를 만들어 deque에addLast - 새 배치에
tryAppend로 레코드 추가
BufferPool 할당은 잠금 밖에서 수행하는 것이 포인트입니다. 메모리가 부족해서 user thread가 max.block.ms까지 잠들더라도, 같은 파티션을 노리는 다른 user thread는 계속 진행할 수 있어야 하기 때문입니다.
여기서 미묘한 점은 4-5단계 사이에 다른 스레드가 먼저 새 배치를 만들었을 수 있다는 것입니다. 그래서 5단계로 다시 잠금을 잡았을 때 tryAppend를 한 번 더 시도하고, 거기서 성공하면 방금 할당한 버퍼는 BufferPool에 즉시 돌려줍니다. 흔히 "double-checked allocation"이라 부르는 패턴입니다.
ProducerBatch와 MemoryRecordsBuilder
ProducerBatch는 단순한 컬렉션이 아니라, 곧바로 와이어 포맷이 될 수 있는 MemoryRecordsBuilder를 안에 들고 있습니다. tryAppend는 다음 두 조건을 점검합니다.
MemoryRecordsBuilder.hasRoomFor(timestamp, key, value, headers)— 이 레코드가 들어갈 자리가 있는가- 배치 자체가 종료 상태인가(
isClosed)
자리가 있으면 builder에 직접 쓰기 때문에, 별도 staging 영역 없이 곧바로 와이어 포맷(RecordBatch)으로 누적됩니다. 이 설계 덕분에 Sender가 보낼 시점에 다시 직렬화할 필요가 없습니다.
압축은 언제 일어나는가
compression.type이 none이 아니면, 배치가 닫힐 때(close()) builder가 누적된 레코드들을 한꺼번에 압축합니다. 즉, 레코드 단위가 아니라 배치 단위 압축입니다. 이것이 batch.size를 키울수록 압축비가 좋아지는 이유입니다. 작은 배치는 압축 사전(dictionary)의 학습 기회가 적어 압축 효율이 떨어집니다.
지원되는 압축은 none, gzip, snappy, lz4, zstd입니다. CPU 비용과 압축비 사이의 트레이드오프가 다르며, 대부분의 워크로드에서는 lz4 또는 zstd가 좋은 균형점이 됩니다.
BufferPool — 고정 메모리 위의 버퍼 재사용
buffer.memory(기본 32MB)는 프로듀서가 사용할 수 있는 누적기 메모리의 상한선입니다. 이 값이 BufferPool의 totalMemory로 들어가 다음 두 가지를 함께 책임집니다.
- 전체 메모리 상한선 강제
batch.size와 동일한 크기의ByteBuffer를 재사용하는 free deque 관리
두 종류의 메모리
BufferPool 안의 메모리는 두 영역으로 나뉩니다.
free— 정확히poolableSize(=batch.size) 크기의 재사용 가능한ByteBuffer목록availableMemory— 아직 할당되지 않은 가용 바이트 수
할당 요청이 들어오면 다음 순서로 시도합니다.
- 요청 크기가
poolableSize와 같으면free.pollFirst()로 재사용 시도 availableMemory >= size이면 그 자리에서ByteBuffer.allocate(size)- 둘 다 안 되면
Condition.await(maxBlockTimeMs)로 대기
대기 큐는 가장 오래 기다린 스레드가 먼저 깨어나도록 FIFO입니다. 메모리가 일부만 회수되더라도 가장 앞 스레드가 자신의 요청 크기를 채울 때까지 누적해 받아 갑니다. 이 fairness 덕분에 큰 요청이 작은 요청들에 의해 영원히 starve 되지 않습니다.
반환과 재사용
배치가 전송 완료(또는 실패)되면 MemoryRecordsBuilder.close()가 호출되고, ProducerBatch가 자기 버퍼를 BufferPool.deallocate()로 돌려줍니다. 크기가 poolableSize와 같으면 free 끝에 다시 넣어 재사용하고, 그렇지 않으면 availableMemory로 회수합니다.
buffer.memory가 부족할 때
buffer.memory가 작거나 브로커가 느려서 배치가 잘 비워지지 않으면 user thread가 BufferPool.allocate() 내부에서 max.block.ms만큼 잠듭니다. 그 시간이 지나도 자리가 없으면 BufferExhaustedException이 던져집니다.
운영 환경에서 이 예외가 보이면 다음 셋 중 하나가 원인입니다.
- 브로커가 느려 in-flight 배치가 누적되고 있다
- 프로듀서가 너무 빠르게 적재해서 누적 속도가 송신 속도를 추월한다
buffer.memory가 처리량 대비 작게 잡혀 있다
Sender 스레드 — 백그라운드 I/O 엔진
Sender는 KafkaProducer가 생성될 때 단 한 개 만들어지는 백그라운드 스레드입니다. run()은 단순합니다.
public void run() {
while (running)
runOnce();
// shutdown sequence
}
runOnce()는 다음 셋을 순서대로 실행합니다.
- (트랜잭션이 활성화된 경우) 트랜잭션 상태 처리
sendProducerData(now)— 누적기에서 배치 모아 produce 요청 발행client.poll(timeout, now)—Selector로 소켓 I/O 진행, 응답 처리
핵심은 2번 sendProducerData입니다.
sendProducerData의 단계
flowchart TD
A[runOnce start] --> B[accumulator.ready clusterMetadata]
B --> C{any node ready?}
C -- no --> Z[poll timeout]
C -- yes --> D[accumulator.drain ready nodes maxRequestSize]
D --> E[add drained batches to inFlightBatches]
E --> F[expire batches past deliveryTimeoutMs]
F --> G[sendProduceRequests per node]
G --> H[client.send ProduceRequest]
H --> Z
ready(clusterMetadata, now) — 어느 브로커가 보낼 준비 됐나
각 TopicPartition별 deque의 첫 번째 배치를 보고 "이 배치가 지금 보내야 하는가"를 판단합니다. 다섯 가지 조건 중 하나라도 만족하면 ready입니다.
- 배치가 가득 참(
isFull()) linger.ms가 지남BufferPool이 가득 차서 다른 사용자가 대기 중- 프로듀서가 닫히는 중(
flush()등) - 재시도 backoff가 끝남
이렇게 ready로 판정된 파티션들의 리더 노드 집합이 결과입니다. 같은 브로커가 여러 파티션의 리더이면 한 번에 묶입니다.
drain(readyNodes, maxRequestSize, now) — 배치 추출
ready인 노드들에 대해 deque에서 첫 배치를 꺼내(pollFirst()) maxRequestSize(기본 1MB) 한도 안에서 모읍니다. 노드 단위로 한 요청에 묶을 배치들이 결정됩니다.
이때 파티션 간 fairness를 위해 drainIndex로 시작 위치를 회전시킵니다. 그렇지 않으면 파티션 ID가 낮은 쪽이 항상 먼저 비워져 starvation이 생길 수 있습니다.
sendProduceRequests — 와이어로 내보내기
노드별로 ProduceRequest를 만들어 NetworkClient.send()로 전달합니다. 실제 소켓 쓰기는 이어지는 client.poll()에서 Selector가 비동기로 수행합니다. 응답이 돌아오면 handleProduceResponse가 ProducerBatch.done(baseOffset, ...)을 호출해 RecordMetadata를 채우고 사용자 콜백을 실행합니다.
inFlightBatches와 delivery.timeout.ms
Sender는 Map<TopicPartition, List<ProducerBatch>> 형태로 in-flight 상태인 배치들을 추적합니다. 응답이 오면 거기서 제거하고, 너무 오래 머무는 배치는 delivery.timeout.ms(기본 2분)를 기준으로 TimeoutException과 함께 실패시킵니다.
delivery.timeout.ms는 사용자가 send() 호출 시점부터 콜백 호출 시점까지의 상한선입니다. request.timeout.ms(기본 30초)는 단일 produce 요청에 대한 브로커 응답 대기 상한입니다. 두 값의 관계가 delivery.timeout.ms >= linger.ms + request.timeout.ms 라는 제약을 만듭니다.
guaranteeMessageOrder — 순서 보장 모드
max.in.flight.requests.per.connection이 1이거나 멱등 프로듀서가 활성화되어 있으면 Sender는 같은 파티션의 in-flight 배치 순서를 엄격히 보장해야 합니다. 이를 위해 drain 직후 해당 파티션을 mute()로 잠가, 응답이 돌아오기 전에는 같은 파티션에서 다음 배치를 뽑지 못하게 합니다.
응답이 도착해 unmute()되기까지 그 파티션은 ready 후보에서 제외됩니다. 이 짧은 잠금이 재시도 중 순서가 뒤집히는 시나리오를 차단합니다.
멱등 프로듀서(기본 활성)는 max.in.flight.requests.per.connection을 5 이하로 강제합니다. 브로커가 PID별·파티션별로 마지막 5개의 sequence number를 캐시해 중복·순서 어긋남을 검출하기 때문입니다.
파티셔너의 진화 — Sticky Partitioner를 둘러싼 두 KIP
파티션 선택은 외형은 단순하지만 처리량에 가장 큰 영향을 주는 부분 중 하나입니다. 키가 없는 메시지를 어떻게 나누느냐에 따라 배치가 얼마나 잘 채워지는지가 결정되기 때문입니다.
옛 기본: Round-Robin
초창기 DefaultPartitioner는 키 없는 메시지를 라운드로빈으로 흩뿌렸습니다. 100개의 파티션이 있다면, 100개의 레코드가 100개의 deque에 하나씩 들어갑니다. 결과적으로 100개의 1-record 배치가 만들어지고, 압축 효율도 네트워크 효율도 최악이 됩니다. linger.ms를 키워도 한 deque에 다음 레코드가 들어오기까지 시간이 오래 걸리니 의미가 옅습니다.
KIP-480: Sticky Partitioner
해결은 단순했습니다. 키가 없으면 일정 시간 한 파티션에 "딱 붙어 있는다"는 아이디어입니다. KIP-480이 도입한 UniformStickyPartitioner는 다음 규칙을 따릅니다.
- 현재 sticky partition으로 계속 적재
RecordAccumulator가 그 파티션에 새 배치를 만들어야 할 때 sticky partition을 다시 뽑음
배치 단위로 sticky가 바뀌므로 한 배치는 통째로 한 파티션에 쌓이고, 시간이 지나면 결과적으로 파티션들이 골고루 채워집니다. 이 변경으로 동일 linger.ms에서 배치 크기가 극적으로 커지고, 처리량은 올라가며 평균 레이턴시는 오히려 내려가는 결과가 나왔습니다.
flowchart LR
subgraph RoundRobin
R1[Record 1] --> P1A[Part 0]
R2[Record 2] --> P2A[Part 1]
R3[Record 3] --> P3A[Part 2]
R4[Record 4] --> P4A[Part 3]
end
subgraph Sticky
S1[Record 1] --> P1B[Part 0]
S2[Record 2] --> P1B
S3[Record 3] --> P1B
S4[New batch trigger] --> P3B[Part 2]
end
KIP-480의 약점: 느린 브로커에 쏠리는 역설
운영 환경에서 KIP-480이 의도와 다르게 동작하는 케이스가 발견되었습니다. 한 브로커가 느려지면 그쪽 파티션의 배치가 늦게 비워집니다. 배치 단위로 sticky가 바뀌므로 배치 회전 주기가 길어지면 그 sticky 기간이 길어지고, 결과적으로 느린 브로커로 더 많은 레코드가 흐릅니다. 분배가 균일해지길 기대했는데 오히려 편향이 강해지는 역설입니다.
KIP-794: Strictly Uniform Sticky Partitioner
Kafka 3.3에 들어간 KIP-794는 이 문제를 두 갈래로 해결합니다.
(1) 균일 분배 보장
배치 크기에 의존하지 않고, 누적 바이트 수가 일정 임계치(batch.size의 일정 배수)에 도달하면 다음 파티션으로 이동합니다. 느린 브로커의 sticky 기간이 길어지더라도, 누적 바이트가 차면 강제로 다음 sticky로 넘어가므로 더 이상 편향이 축적되지 않습니다.
(2) 적응형 파티션 전환
partitioner.adaptive.partitioning.enable이 true(기본)이면 브로커의 지연을 추적해, 느린 브로커의 파티션을 sticky 후보에서 일시적으로 배제합니다. partitioner.availability.timeout.ms(기본 0, 즉 사용 안 함)를 설정하면 일정 시간 응답 없는 브로커를 더 적극적으로 회피합니다.
또한 KIP-794는 기존 DefaultPartitioner와 UniformStickyPartitioner를 deprecate 했습니다. 3.3 이후로는 partitioner.class를 명시하지 않는 것이 권장되며, 그 경우 새로운 내장 BuiltInPartitioner가 사용됩니다.
키 기반 파티셔닝은 영향받지 않습니다. 키가 있으면 여전히 murmur2(key) % numPartitions로 결정됩니다. 이는 의도적으로 유지된 호환성으로, 키-파티션 매핑에 의존하는 모든 컨슈머 로직을 깨지 않기 위해서입니다.
partitioner.ignore.keys
3.3 이전과 동일하게 "키를 무시하고 sticky로만 흘리고 싶다"는 요구가 있어 partitioner.ignore.keys=true가 추가되었습니다. 키 해시 매핑이 보장이 아니라 우연이었던 경우(예: 키는 로깅 용도로만 채우는 토픽)에 처리량을 더 끌어올릴 수 있습니다.
운영 튜닝 포인트
내부 구조를 이해하고 나면 흔히 만나는 튜닝 노브들의 의미가 일관성 있게 보입니다.
batch.size와 linger.ms의 만남
두 값은 "어느 쪽이 먼저 충족되면 보낼 것인가"의 OR 조건입니다.
batch.size(기본 16KB)는 배치 한 통의 상한선linger.ms는 가득 차지 않은 배치를 기다리는 최대 시간
batch.size만 키우고 linger.ms를 0으로 두면, 트래픽이 한산할 때는 여전히 거의 매번 작은 배치가 나갑니다. 반대로 linger.ms만 키우고 batch.size를 작게 두면, 트래픽이 몰릴 때 작은 배치들이 빠르게 연달아 나갑니다. 둘은 짝으로 잡아야 의미가 있습니다.
처리량 최적화의 일반적 출발선은 batch.size를 64KB256KB로, 50ms로 올리는 것입니다. 정확한 값은 메시지 평균 크기와 도착 패턴에 따라 다릅니다.linger.ms를 5
운영 환경에서는 linger.ms를 5ms 정도로 설정해 작은 배치가 갖는 비효율을 줄이고 batching 효과를 얻는 것이 일반적입니다.
compression.type
배치 단위 압축임을 기억하면 선택이 단순해집니다. batch.size가 클수록 압축비가 좋아지므로, 압축을 켤 거라면 batch.size도 함께 키우는 편이 좋습니다. CPU 비용과 압축비의 균형점은 워크로드마다 다르지만, 일반적으로 lz4가 빠르고 충분한 압축비를 내며, zstd는 CPU를 조금 더 쓰는 대신 더 잘 줄입니다.
압축은 프로듀서에서 한 번, 컨슈머에서 한 번만 풉니다. 브로커는 압축된 배치를 그대로 디스크에 쓰고 그대로 컨슈머에 보냅니다(로그 컴팩션 같은 일부 경우 제외). 즉, 압축으로 줄어드는 비용은 네트워크와 디스크 양쪽 모두입니다.
buffer.memory와 max.block.ms
buffer.memory(기본 32MB)는 누적기 전체 메모리 상한입니다. 처리량을 올리려면 같이 키워야 합니다. 그렇지 않으면 user thread가 BufferPool에서 자주 잠들고, max.block.ms가 짧으면 BufferExhaustedException을 만나기 시작합니다.
대략적인 가늠은 buffer.memory ~= batch.size * numPartitions * 안전계수 정도입니다. 모든 파티션에 한 배치씩 동시에 채워 둘 여유가 있어야 sticky 회전이 매끄럽기 때문입니다.
acks, enable.idempotence, retries
Kafka 3.0부터 acks=all, enable.idempotence=true가 기본입니다. 이 기본 조합은 다음을 함께 잠금니다.
max.in.flight.requests.per.connection을 5 이하로 강제retries를Integer.MAX_VALUE로 강제- 모든 ISR이 복제할 때까지 ack 지연
acks=1로 낮추면 리더만 받자마자 ack가 떨어져 지연은 줄지만, 리더 장애 시 메시지 손실 가능성이 생깁니다. acks=all의 지연 부담은 대부분 워크로드에서 측정 가능한 만큼 크지 않다는 것이 KIP-679의 결정 근거였습니다.
delivery.timeout.ms, request.timeout.ms
두 값의 차이를 한 문장으로 정리하면 다음과 같습니다.
request.timeout.ms(기본 30s) — 단일 produce 요청에 대해 브로커 응답을 기다리는 상한delivery.timeout.ms(기본 120s) — 사용자가send()를 호출한 순간부터 콜백이 호출되기까지의 상한
delivery.timeout.ms가 짧으면 재시도 여유가 적어 일시적 장애에서 실패가 빨리 납니다. 길게 잡으면 콜백이 늦게 호출되어 호출자가 오해할 수 있습니다. 일반적으로 delivery.timeout.ms >= linger.ms + request.timeout.ms + 안전여유 관계를 유지합니다.
자주 만나는 실수
producer.send()만 호출하고 flush()/close()를 안 합니다.
JVM이 종료되면 Sender 스레드는 daemon으로 함께 죽고, 누적기에 남은 배치는 그대로 사라집니다. 프로세스 종료 직전 close(Duration) 또는 flush()로 비워야 합니다.
send()의 반환 Future.get()을 즉시 호출합니다.
비동기 설계가 무너져 동기 호출이 됩니다. 콜백을 등록하거나, Future를 다른 스레드/단계로 미루는 편이 의도에 맞습니다.
max.in.flight.requests.per.connection을 5 초과로 올립니다.
멱등 프로듀서가 활성화된 상태에서는 클라이언트가 ConfigException을 던집니다. 비활성화해서 강제로 6 이상으로 올리면 재시도 중 순서가 뒤집힐 수 있습니다.
batch.size만 키우고 linger.ms는 0으로 둡니다.
앞서 본 대로 한산한 시간에는 효과가 없습니다. 짝으로 조정해야 합니다.
buffer.memory는 그대로 두고 batch.size만 크게 키웁니다.
파티션 수가 많은 토픽에 보낼 때 동시에 채워야 할 배치 수가 많아져 BufferPool이 빨리 차고, user thread가 자주 블록됩니다.
Callback 안에서 producer.send()를 다시 호출하면서 get()을 부릅니다.
콜백은 Sender 스레드에서 실행됩니다. 그 안에서 Future.get()을 부르면 Sender 자기 자신을 기다리는 데드락이 됩니다.
마무리
카프카 프로듀서는 "user thread는 메모리에 쓰고, Sender thread는 네트워크로 비운다"는 단순한 producer/consumer 모델 위에 서 있습니다. 그 사이를 RecordAccumulator와 BufferPool이 잇고, 노브들(batch.size, linger.ms, compression.type, buffer.memory, max.block.ms, delivery.timeout.ms)은 모두 이 두 스레드의 만남을 조율합니다.
파티셔너의 KIP-480 → KIP-794 진화는 같은 추상화 안에서 어떻게 분배 정책이 처리량을 좌우하는지를 보여주는 좋은 예입니다. 작은 변경 한 줄이 배치 크기를 두 자리 수로 키울 수 있고, 또 그 변경이 다른 환경에서는 반대로 작동할 수도 있다는 점이 인상적입니다.
배치 크기, 압축, 그리고 sticky의 회전 주기를 함께 머리에 그릴 수 있다면, 같은 클러스터에서도 워크로드별로 합리적인 시작점을 잡고 측정·조정해 갈 수 있습니다.
참고자료
- Apache Kafka Producer Configs
- Apache Kafka Documentation — Producer
- KafkaProducer.java (apache/kafka trunk)
- RecordAccumulator.java (apache/kafka trunk)
- Sender.java (apache/kafka trunk)
- BufferPool.java (apache/kafka trunk)
- KIP-480: Sticky Partitioner
- KIP-794: Strictly Uniform Sticky Partitioner
- KIP-679: Producer will enable the strongest delivery guarantee by default

