Skip to main content

Command Palette

Search for a command to run...

Kafka Producer 내부 동작 — RecordAccumulator, Sender Thread, 그리고 Sticky Partitioner

Updated
13 min read

카프카 프로듀서의 send()는 왜 비동기이며, 그 사이에 어떤 자료구조와 스레드가 끼어 있는지를 OpenJDK 소스 기준으로 풀어냅니다. 카프카 입문 시리즈를 한 번이라도 본 분, batch.sizelinger.ms를 조정해 봤지만 두 값이 어디서 만나는지 모호한 분을 대상으로 합니다.

들어가며

카프카 프로듀서의 코드를 처음 보면 send()Future<RecordMetadata>를 반환한다는 사실이 살짝 어색합니다. 호출 시점에 네트워크로 나가지 않고, 또 즉시 반환되지도 않고, 어딘가에 잠시 쌓였다가 나갑니다.

이 "잠시 쌓이는 어딘가"가 RecordAccumulator입니다. 그리고 그것을 실제로 브로커로 밀어 넣는 별도의 백그라운드 스레드가 Sender입니다. 사용자가 호출하는 애플리케이션 스레드(이하 user thread)와 Sender 스레드, 두 스레드가 공유 자료구조 위에서 produce/consume 패턴으로 협력하는 구조입니다.

이 글은 다음을 같은 그림 위에서 정리합니다.

  • KafkaProducer.send()의 진입점부터 accumulator.append()까지의 user thread 흐름
  • RecordAccumulatorTopicPartitionDeque<ProducerBatch>로 데이터를 모으는 방식
  • BufferPoolbuffer.memory를 관리하는 방식과 max.block.ms의 의미
  • Sender.runOnce()ready()drain()sendProduceRequests()로 이어지는 메인 루프
  • inFlightBatchesguaranteeMessageOrder가 순서 보장에서 하는 역할
  • 파티셔너의 진화: KIP-480 Sticky Partitioner → KIP-794 Strictly Uniform Sticky Partitioner
  • batch.size/linger.ms/compression.type/buffer.memory 튜닝 포인트와 함정

송신 파이프라인 한 눈에

전체 그림을 먼저 한 장 펼쳐 두고 본문을 따라가는 편이 이해가 빠릅니다.

flowchart LR
    App[Application Thread] -->|send record| Producer[KafkaProducer.doSend]
    Producer -->|waitOnMetadata| Meta[Metadata Cache]
    Producer -->|serialize| Ser[Serializer]
    Producer -->|partition| Part[Partitioner]
    Producer -->|append| Acc[RecordAccumulator]
    Acc -->|allocate buffer| Pool[BufferPool]
    Acc -.->|wakeup| Sender[Sender Thread]
    Sender -->|ready| Acc
    Sender -->|drain| Acc
    Sender -->|ProduceRequest| NC[NetworkClient]
    NC -->|Selector| Brokers[(Brokers)]
    Brokers -.->|ProduceResponse| NC
    NC -.->|callback| App

세로로 보면 좌측 위에서 우측 아래로 흐릅니다. 좌측 절반은 user thread, 우측 절반은 Sender thread가 담당합니다. 중간의 RecordAccumulator가 두 스레드의 만남의 광장입니다.

user thread: send()에서 accumulator.append()까지

KafkaProducer.send(ProducerRecord)는 내부적으로 doSend(record, callback)을 호출합니다. doSend의 책임은 다섯 가지로 정리할 수 있습니다.

  1. 메타데이터 확보 — 해당 토픽의 파티션 수와 리더 정보를 알아야 보낼 곳을 정할 수 있습니다.
  2. 직렬화key.serializervalue.serializer를 거쳐 byte[]로 만듭니다.
  3. 파티션 선택 — 키가 있으면 hash 기반, 없으면 파티셔너에게 위임합니다.
  4. 누적기 적재RecordAccumulator.append()로 메모리 버퍼에 기록합니다.
  5. Sender 깨우기 — 누적기 상태에 따라 sender.wakeup()을 호출해 백그라운드 스레드를 즉시 깨웁니다.

각 단계를 차례로 살펴봅니다.

메타데이터 확보와 max.block.ms

프로듀서가 처음 어떤 토픽으로 보낼 때, 그 토픽이 몇 개의 파티션을 가지는지, 각 파티션의 리더가 어느 브로커인지 모르는 상태입니다. waitOnMetadata(topic, partition, nowMs, maxWaitMs)는 메타데이터가 준비될 때까지 대기합니다.

이때 사용되는 타임아웃이 max.block.ms입니다. 기본값은 60초로, 다음 두 경우의 대기 상한선을 정합니다.

  • 토픽 메타데이터 미확보 상태에서 첫 메시지를 보낼 때
  • BufferPool이 가득 차서 새 버퍼를 할당받지 못할 때

max.block.ms를 짧게 잡으면 메타데이터가 늦게 도착하는 환경에서 TimeoutException이 잘 납니다. 반대로 길게 잡으면 호출 스레드(예: HTTP 요청 처리 스레드)가 오래 잠들 수 있습니다. 동기 호출이 아닌 비동기 send()도 이 단계에서는 호출 스레드를 블록할 수 있다는 점이 자주 놓치는 함정입니다.

직렬화와 파티션 선택

직렬화는 단순합니다. byte[] serializedKey = keySerializer.serialize(topic, headers, key), byte[] serializedValue = valueSerializer.serialize(topic, headers, value)로 끝납니다.

파티션 선택은 두 갈래입니다.

  • record.partition()이 지정되어 있으면 그대로 사용
  • 그렇지 않으면 partitioner.partition(...) 또는 (3.3+ 기본) 내장 BuiltInPartitioner가 결정

키가 있으면 거의 모든 구현이 Utils.murmur2(serializedKey) % numPartitions로 동일 키를 동일 파티션에 묶습니다. 키 없는 경우의 분배 전략이 KIP-480, KIP-794를 거치며 진화해 왔는데, 이는 뒤에서 따로 다룹니다.

accumulator.append() 호출

직렬화된 키·값과 선택된 파티션을 들고 RecordAccumulator.append(TopicPartition, timestamp, key, value, headers, callbacks, ...)를 호출합니다. 반환값은 RecordAppendResult로, 다음 세 가지 신호를 포함합니다.

  • futureRecordMetadata를 받게 될 Future
  • batchIsFull — 호출 결과 해당 배치가 가득 찼는지
  • newBatchCreated — 새 배치가 생성되었는지

user thread는 이 두 boolean을 보고 Sender를 깨울지 결정합니다. 둘 중 하나라도 true이면 sender.wakeup()을 호출해 Selectorwakeup()을 발동시켜 백그라운드 I/O를 즉시 재개시킵니다. 그렇지 않으면 Sender는 linger.ms만큼 잠들어 있다가 일어납니다.

여기서 user thread의 책임은 끝납니다. 메시지는 메모리에만 들어갔고, 네트워크로는 아직 한 바이트도 나가지 않았습니다.

RecordAccumulator — 두 스레드의 공유 광장

RecordAccumulator의 핵심 자료구조는 토픽·파티션별 큐입니다. 대략 다음 형태로 생각해도 무리가 없습니다.

ConcurrentMap<String /* topic */, TopicInfo> topicInfoMap;

class TopicInfo {
    ConcurrentMap<Integer /* partition */, Deque<ProducerBatch>> batches;
    BuiltInPartitioner builtInPartitioner;
}

TopicPartition마다 ProducerBatch의 deque가 있고, 새 레코드는 deque의 마지막 배치(peekLast())에 덧붙여집니다. 마지막 배치가 가득 차면 새 배치를 만들어 deque에 추가합니다.

flowchart LR
    subgraph TopicA
        TA0[Part 0 deque] --> BA01[Batch] --> BA02[Batch]
        TA1[Part 1 deque] --> BA11[Batch]
    end
    subgraph TopicB
        TB0[Part 0 deque] --> BB01[Batch] --> BB02[Batch] --> BB03[Batch]
    end
    Append[append record] --> TA0
    Drain[drain batches] --> BA01
    Drain --> BB01

append()의 실제 흐름

append()는 동일 deque에 대해 동시 호출되더라도 안전해야 합니다. 큐 단위로 잠금을 잡고 다음 순서로 진행합니다.

  1. Deque<ProducerBatch> 획득(getOrCreateDeque)
  2. 잠금 안에서 tryAppend(timestamp, key, value, headers, callback, deque) 시도
  3. 마지막 배치에 자리가 있으면 거기 추가하고 RecordAppendResult 반환
  4. 자리가 없으면 잠금을 풀고 BufferPool.allocate(batchSize, maxBlockTimeMs)로 새 버퍼 확보
  5. 새 버퍼로 다시 잠금을 잡고 새 ProducerBatch를 만들어 deque에 addLast
  6. 새 배치에 tryAppend로 레코드 추가

BufferPool 할당은 잠금 밖에서 수행하는 것이 포인트입니다. 메모리가 부족해서 user thread가 max.block.ms까지 잠들더라도, 같은 파티션을 노리는 다른 user thread는 계속 진행할 수 있어야 하기 때문입니다.

여기서 미묘한 점은 4-5단계 사이에 다른 스레드가 먼저 새 배치를 만들었을 수 있다는 것입니다. 그래서 5단계로 다시 잠금을 잡았을 때 tryAppend를 한 번 더 시도하고, 거기서 성공하면 방금 할당한 버퍼는 BufferPool에 즉시 돌려줍니다. 흔히 "double-checked allocation"이라 부르는 패턴입니다.

ProducerBatchMemoryRecordsBuilder

ProducerBatch는 단순한 컬렉션이 아니라, 곧바로 와이어 포맷이 될 수 있는 MemoryRecordsBuilder를 안에 들고 있습니다. tryAppend는 다음 두 조건을 점검합니다.

  • MemoryRecordsBuilder.hasRoomFor(timestamp, key, value, headers) — 이 레코드가 들어갈 자리가 있는가
  • 배치 자체가 종료 상태인가(isClosed)

자리가 있으면 builder에 직접 쓰기 때문에, 별도 staging 영역 없이 곧바로 와이어 포맷(RecordBatch)으로 누적됩니다. 이 설계 덕분에 Sender가 보낼 시점에 다시 직렬화할 필요가 없습니다.

압축은 언제 일어나는가

compression.typenone이 아니면, 배치가 닫힐 때(close()) builder가 누적된 레코드들을 한꺼번에 압축합니다. 즉, 레코드 단위가 아니라 배치 단위 압축입니다. 이것이 batch.size를 키울수록 압축비가 좋아지는 이유입니다. 작은 배치는 압축 사전(dictionary)의 학습 기회가 적어 압축 효율이 떨어집니다.

지원되는 압축은 none, gzip, snappy, lz4, zstd입니다. CPU 비용과 압축비 사이의 트레이드오프가 다르며, 대부분의 워크로드에서는 lz4 또는 zstd가 좋은 균형점이 됩니다.

BufferPool — 고정 메모리 위의 버퍼 재사용

buffer.memory(기본 32MB)는 프로듀서가 사용할 수 있는 누적기 메모리의 상한선입니다. 이 값이 BufferPooltotalMemory로 들어가 다음 두 가지를 함께 책임집니다.

  • 전체 메모리 상한선 강제
  • batch.size와 동일한 크기의 ByteBuffer를 재사용하는 free deque 관리

두 종류의 메모리

BufferPool 안의 메모리는 두 영역으로 나뉩니다.

  • free — 정확히 poolableSize(= batch.size) 크기의 재사용 가능한 ByteBuffer 목록
  • availableMemory — 아직 할당되지 않은 가용 바이트 수

할당 요청이 들어오면 다음 순서로 시도합니다.

  1. 요청 크기가 poolableSize와 같으면 free.pollFirst()로 재사용 시도
  2. availableMemory >= size이면 그 자리에서 ByteBuffer.allocate(size)
  3. 둘 다 안 되면 Condition.await(maxBlockTimeMs)로 대기

대기 큐는 가장 오래 기다린 스레드가 먼저 깨어나도록 FIFO입니다. 메모리가 일부만 회수되더라도 가장 앞 스레드가 자신의 요청 크기를 채울 때까지 누적해 받아 갑니다. 이 fairness 덕분에 큰 요청이 작은 요청들에 의해 영원히 starve 되지 않습니다.

반환과 재사용

배치가 전송 완료(또는 실패)되면 MemoryRecordsBuilder.close()가 호출되고, ProducerBatch가 자기 버퍼를 BufferPool.deallocate()로 돌려줍니다. 크기가 poolableSize와 같으면 free 끝에 다시 넣어 재사용하고, 그렇지 않으면 availableMemory로 회수합니다.

buffer.memory가 부족할 때

buffer.memory가 작거나 브로커가 느려서 배치가 잘 비워지지 않으면 user thread가 BufferPool.allocate() 내부에서 max.block.ms만큼 잠듭니다. 그 시간이 지나도 자리가 없으면 BufferExhaustedException이 던져집니다.

운영 환경에서 이 예외가 보이면 다음 셋 중 하나가 원인입니다.

  • 브로커가 느려 in-flight 배치가 누적되고 있다
  • 프로듀서가 너무 빠르게 적재해서 누적 속도가 송신 속도를 추월한다
  • buffer.memory가 처리량 대비 작게 잡혀 있다

Sender 스레드 — 백그라운드 I/O 엔진

SenderKafkaProducer가 생성될 때 단 한 개 만들어지는 백그라운드 스레드입니다. run()은 단순합니다.

public void run() {
    while (running)
        runOnce();
    // shutdown sequence
}

runOnce()는 다음 셋을 순서대로 실행합니다.

  1. (트랜잭션이 활성화된 경우) 트랜잭션 상태 처리
  2. sendProducerData(now) — 누적기에서 배치 모아 produce 요청 발행
  3. client.poll(timeout, now)Selector로 소켓 I/O 진행, 응답 처리

핵심은 2번 sendProducerData입니다.

sendProducerData의 단계

flowchart TD
    A[runOnce start] --> B[accumulator.ready clusterMetadata]
    B --> C{any node ready?}
    C -- no --> Z[poll timeout]
    C -- yes --> D[accumulator.drain ready nodes maxRequestSize]
    D --> E[add drained batches to inFlightBatches]
    E --> F[expire batches past deliveryTimeoutMs]
    F --> G[sendProduceRequests per node]
    G --> H[client.send ProduceRequest]
    H --> Z

ready(clusterMetadata, now) — 어느 브로커가 보낼 준비 됐나

TopicPartition별 deque의 첫 번째 배치를 보고 "이 배치가 지금 보내야 하는가"를 판단합니다. 다섯 가지 조건 중 하나라도 만족하면 ready입니다.

  • 배치가 가득 참(isFull())
  • linger.ms가 지남
  • BufferPool이 가득 차서 다른 사용자가 대기 중
  • 프로듀서가 닫히는 중(flush() 등)
  • 재시도 backoff가 끝남

이렇게 ready로 판정된 파티션들의 리더 노드 집합이 결과입니다. 같은 브로커가 여러 파티션의 리더이면 한 번에 묶입니다.

drain(readyNodes, maxRequestSize, now) — 배치 추출

ready인 노드들에 대해 deque에서 첫 배치를 꺼내(pollFirst()) maxRequestSize(기본 1MB) 한도 안에서 모읍니다. 노드 단위로 한 요청에 묶을 배치들이 결정됩니다.

이때 파티션 간 fairness를 위해 drainIndex로 시작 위치를 회전시킵니다. 그렇지 않으면 파티션 ID가 낮은 쪽이 항상 먼저 비워져 starvation이 생길 수 있습니다.

sendProduceRequests — 와이어로 내보내기

노드별로 ProduceRequest를 만들어 NetworkClient.send()로 전달합니다. 실제 소켓 쓰기는 이어지는 client.poll()에서 Selector가 비동기로 수행합니다. 응답이 돌아오면 handleProduceResponseProducerBatch.done(baseOffset, ...)을 호출해 RecordMetadata를 채우고 사용자 콜백을 실행합니다.

inFlightBatchesdelivery.timeout.ms

SenderMap<TopicPartition, List<ProducerBatch>> 형태로 in-flight 상태인 배치들을 추적합니다. 응답이 오면 거기서 제거하고, 너무 오래 머무는 배치는 delivery.timeout.ms(기본 2분)를 기준으로 TimeoutException과 함께 실패시킵니다.

delivery.timeout.ms는 사용자가 send() 호출 시점부터 콜백 호출 시점까지의 상한선입니다. request.timeout.ms(기본 30초)는 단일 produce 요청에 대한 브로커 응답 대기 상한입니다. 두 값의 관계가 delivery.timeout.ms >= linger.ms + request.timeout.ms 라는 제약을 만듭니다.

guaranteeMessageOrder — 순서 보장 모드

max.in.flight.requests.per.connection이 1이거나 멱등 프로듀서가 활성화되어 있으면 Sender는 같은 파티션의 in-flight 배치 순서를 엄격히 보장해야 합니다. 이를 위해 drain 직후 해당 파티션을 mute()로 잠가, 응답이 돌아오기 전에는 같은 파티션에서 다음 배치를 뽑지 못하게 합니다.

응답이 도착해 unmute()되기까지 그 파티션은 ready 후보에서 제외됩니다. 이 짧은 잠금이 재시도 중 순서가 뒤집히는 시나리오를 차단합니다.

멱등 프로듀서(기본 활성)는 max.in.flight.requests.per.connection을 5 이하로 강제합니다. 브로커가 PID별·파티션별로 마지막 5개의 sequence number를 캐시해 중복·순서 어긋남을 검출하기 때문입니다.

파티셔너의 진화 — Sticky Partitioner를 둘러싼 두 KIP

파티션 선택은 외형은 단순하지만 처리량에 가장 큰 영향을 주는 부분 중 하나입니다. 키가 없는 메시지를 어떻게 나누느냐에 따라 배치가 얼마나 잘 채워지는지가 결정되기 때문입니다.

옛 기본: Round-Robin

초창기 DefaultPartitioner는 키 없는 메시지를 라운드로빈으로 흩뿌렸습니다. 100개의 파티션이 있다면, 100개의 레코드가 100개의 deque에 하나씩 들어갑니다. 결과적으로 100개의 1-record 배치가 만들어지고, 압축 효율도 네트워크 효율도 최악이 됩니다. linger.ms를 키워도 한 deque에 다음 레코드가 들어오기까지 시간이 오래 걸리니 의미가 옅습니다.

KIP-480: Sticky Partitioner

해결은 단순했습니다. 키가 없으면 일정 시간 한 파티션에 "딱 붙어 있는다"는 아이디어입니다. KIP-480이 도입한 UniformStickyPartitioner는 다음 규칙을 따릅니다.

  • 현재 sticky partition으로 계속 적재
  • RecordAccumulator가 그 파티션에 새 배치를 만들어야 할 때 sticky partition을 다시 뽑음

배치 단위로 sticky가 바뀌므로 한 배치는 통째로 한 파티션에 쌓이고, 시간이 지나면 결과적으로 파티션들이 골고루 채워집니다. 이 변경으로 동일 linger.ms에서 배치 크기가 극적으로 커지고, 처리량은 올라가며 평균 레이턴시는 오히려 내려가는 결과가 나왔습니다.

flowchart LR
    subgraph RoundRobin
        R1[Record 1] --> P1A[Part 0]
        R2[Record 2] --> P2A[Part 1]
        R3[Record 3] --> P3A[Part 2]
        R4[Record 4] --> P4A[Part 3]
    end
    subgraph Sticky
        S1[Record 1] --> P1B[Part 0]
        S2[Record 2] --> P1B
        S3[Record 3] --> P1B
        S4[New batch trigger] --> P3B[Part 2]
    end

KIP-480의 약점: 느린 브로커에 쏠리는 역설

운영 환경에서 KIP-480이 의도와 다르게 동작하는 케이스가 발견되었습니다. 한 브로커가 느려지면 그쪽 파티션의 배치가 늦게 비워집니다. 배치 단위로 sticky가 바뀌므로 배치 회전 주기가 길어지면 그 sticky 기간이 길어지고, 결과적으로 느린 브로커로 더 많은 레코드가 흐릅니다. 분배가 균일해지길 기대했는데 오히려 편향이 강해지는 역설입니다.

KIP-794: Strictly Uniform Sticky Partitioner

Kafka 3.3에 들어간 KIP-794는 이 문제를 두 갈래로 해결합니다.

(1) 균일 분배 보장

배치 크기에 의존하지 않고, 누적 바이트 수가 일정 임계치(batch.size의 일정 배수)에 도달하면 다음 파티션으로 이동합니다. 느린 브로커의 sticky 기간이 길어지더라도, 누적 바이트가 차면 강제로 다음 sticky로 넘어가므로 더 이상 편향이 축적되지 않습니다.

(2) 적응형 파티션 전환

partitioner.adaptive.partitioning.enabletrue(기본)이면 브로커의 지연을 추적해, 느린 브로커의 파티션을 sticky 후보에서 일시적으로 배제합니다. partitioner.availability.timeout.ms(기본 0, 즉 사용 안 함)를 설정하면 일정 시간 응답 없는 브로커를 더 적극적으로 회피합니다.

또한 KIP-794는 기존 DefaultPartitionerUniformStickyPartitioner를 deprecate 했습니다. 3.3 이후로는 partitioner.class를 명시하지 않는 것이 권장되며, 그 경우 새로운 내장 BuiltInPartitioner가 사용됩니다.

키 기반 파티셔닝은 영향받지 않습니다. 키가 있으면 여전히 murmur2(key) % numPartitions로 결정됩니다. 이는 의도적으로 유지된 호환성으로, 키-파티션 매핑에 의존하는 모든 컨슈머 로직을 깨지 않기 위해서입니다.

partitioner.ignore.keys

3.3 이전과 동일하게 "키를 무시하고 sticky로만 흘리고 싶다"는 요구가 있어 partitioner.ignore.keys=true가 추가되었습니다. 키 해시 매핑이 보장이 아니라 우연이었던 경우(예: 키는 로깅 용도로만 채우는 토픽)에 처리량을 더 끌어올릴 수 있습니다.

운영 튜닝 포인트

내부 구조를 이해하고 나면 흔히 만나는 튜닝 노브들의 의미가 일관성 있게 보입니다.

batch.sizelinger.ms의 만남

두 값은 "어느 쪽이 먼저 충족되면 보낼 것인가"의 OR 조건입니다.

  • batch.size(기본 16KB)는 배치 한 통의 상한선
  • linger.ms는 가득 차지 않은 배치를 기다리는 최대 시간

batch.size만 키우고 linger.ms를 0으로 두면, 트래픽이 한산할 때는 여전히 거의 매번 작은 배치가 나갑니다. 반대로 linger.ms만 키우고 batch.size를 작게 두면, 트래픽이 몰릴 때 작은 배치들이 빠르게 연달아 나갑니다. 둘은 짝으로 잡아야 의미가 있습니다.

처리량 최적화의 일반적 출발선은 batch.size를 64KB256KB로, linger.ms를 550ms로 올리는 것입니다. 정확한 값은 메시지 평균 크기와 도착 패턴에 따라 다릅니다.

운영 환경에서는 linger.ms를 5ms 정도로 설정해 작은 배치가 갖는 비효율을 줄이고 batching 효과를 얻는 것이 일반적입니다.

compression.type

배치 단위 압축임을 기억하면 선택이 단순해집니다. batch.size가 클수록 압축비가 좋아지므로, 압축을 켤 거라면 batch.size도 함께 키우는 편이 좋습니다. CPU 비용과 압축비의 균형점은 워크로드마다 다르지만, 일반적으로 lz4가 빠르고 충분한 압축비를 내며, zstd는 CPU를 조금 더 쓰는 대신 더 잘 줄입니다.

압축은 프로듀서에서 한 번, 컨슈머에서 한 번만 풉니다. 브로커는 압축된 배치를 그대로 디스크에 쓰고 그대로 컨슈머에 보냅니다(로그 컴팩션 같은 일부 경우 제외). 즉, 압축으로 줄어드는 비용은 네트워크와 디스크 양쪽 모두입니다.

buffer.memorymax.block.ms

buffer.memory(기본 32MB)는 누적기 전체 메모리 상한입니다. 처리량을 올리려면 같이 키워야 합니다. 그렇지 않으면 user thread가 BufferPool에서 자주 잠들고, max.block.ms가 짧으면 BufferExhaustedException을 만나기 시작합니다.

대략적인 가늠은 buffer.memory ~= batch.size * numPartitions * 안전계수 정도입니다. 모든 파티션에 한 배치씩 동시에 채워 둘 여유가 있어야 sticky 회전이 매끄럽기 때문입니다.

acks, enable.idempotence, retries

Kafka 3.0부터 acks=all, enable.idempotence=true가 기본입니다. 이 기본 조합은 다음을 함께 잠금니다.

  • max.in.flight.requests.per.connection을 5 이하로 강제
  • retriesInteger.MAX_VALUE로 강제
  • 모든 ISR이 복제할 때까지 ack 지연

acks=1로 낮추면 리더만 받자마자 ack가 떨어져 지연은 줄지만, 리더 장애 시 메시지 손실 가능성이 생깁니다. acks=all의 지연 부담은 대부분 워크로드에서 측정 가능한 만큼 크지 않다는 것이 KIP-679의 결정 근거였습니다.

delivery.timeout.ms, request.timeout.ms

두 값의 차이를 한 문장으로 정리하면 다음과 같습니다.

  • request.timeout.ms(기본 30s) — 단일 produce 요청에 대해 브로커 응답을 기다리는 상한
  • delivery.timeout.ms(기본 120s) — 사용자가 send()를 호출한 순간부터 콜백이 호출되기까지의 상한

delivery.timeout.ms가 짧으면 재시도 여유가 적어 일시적 장애에서 실패가 빨리 납니다. 길게 잡으면 콜백이 늦게 호출되어 호출자가 오해할 수 있습니다. 일반적으로 delivery.timeout.ms >= linger.ms + request.timeout.ms + 안전여유 관계를 유지합니다.

자주 만나는 실수

producer.send()만 호출하고 flush()/close()를 안 합니다. JVM이 종료되면 Sender 스레드는 daemon으로 함께 죽고, 누적기에 남은 배치는 그대로 사라집니다. 프로세스 종료 직전 close(Duration) 또는 flush()로 비워야 합니다.

send()의 반환 Future.get()을 즉시 호출합니다. 비동기 설계가 무너져 동기 호출이 됩니다. 콜백을 등록하거나, Future를 다른 스레드/단계로 미루는 편이 의도에 맞습니다.

max.in.flight.requests.per.connection을 5 초과로 올립니다. 멱등 프로듀서가 활성화된 상태에서는 클라이언트가 ConfigException을 던집니다. 비활성화해서 강제로 6 이상으로 올리면 재시도 중 순서가 뒤집힐 수 있습니다.

batch.size만 키우고 linger.ms는 0으로 둡니다. 앞서 본 대로 한산한 시간에는 효과가 없습니다. 짝으로 조정해야 합니다.

buffer.memory는 그대로 두고 batch.size만 크게 키웁니다. 파티션 수가 많은 토픽에 보낼 때 동시에 채워야 할 배치 수가 많아져 BufferPool이 빨리 차고, user thread가 자주 블록됩니다.

Callback 안에서 producer.send()를 다시 호출하면서 get()을 부릅니다. 콜백은 Sender 스레드에서 실행됩니다. 그 안에서 Future.get()을 부르면 Sender 자기 자신을 기다리는 데드락이 됩니다.

마무리

카프카 프로듀서는 "user thread는 메모리에 쓰고, Sender thread는 네트워크로 비운다"는 단순한 producer/consumer 모델 위에 서 있습니다. 그 사이를 RecordAccumulatorBufferPool이 잇고, 노브들(batch.size, linger.ms, compression.type, buffer.memory, max.block.ms, delivery.timeout.ms)은 모두 이 두 스레드의 만남을 조율합니다.

파티셔너의 KIP-480 → KIP-794 진화는 같은 추상화 안에서 어떻게 분배 정책이 처리량을 좌우하는지를 보여주는 좋은 예입니다. 작은 변경 한 줄이 배치 크기를 두 자리 수로 키울 수 있고, 또 그 변경이 다른 환경에서는 반대로 작동할 수도 있다는 점이 인상적입니다.

배치 크기, 압축, 그리고 sticky의 회전 주기를 함께 머리에 그릴 수 있다면, 같은 클러스터에서도 워크로드별로 합리적인 시작점을 잡고 측정·조정해 갈 수 있습니다.

참고자료

More from this blog

JVM은 컨테이너의 CPU와 메모리 한계를 어떻게 알아낼까

8코어 노드에 컨테이너를 띄웠는데 ForkJoinPool이 스레드를 한두 개만 만들어요. 메모리는 넉넉히 줬는데 컨테이너가 자꾸 OOMKilled로 죽고요. 분명히 같은 JAR인데 로컬에서는 멀쩡하다가 쿠버네티스에만 올리면 이상해져요. 이 글은 "왜 컨테이너 속 JVM은 다르게 행동하는가"를 cgroup이라는 진짜 경계선과, JVM이 그 경계를 읽어내는 내

May 21, 202615 min read

ThreadPoolExecutor는 언제 스레드를 새로 만들까 — execute()의 3단계

Executors.newFixedThreadPool(10) 한 줄을 쓰면서도, 11번째 작업이 오면 스레드가 11개로 늘어날 거라고 막연히 기대해 본 적 없으신가요. 실제로는 큐가 먼저 무한히 쌓이고 스레드는 영원히 10개에 머물러요. 이 글은 ThreadPoolExecutor가 작업을 받았을 때 "스레드를 새로 만들지, 큐에 넣을지, 거부할지"를 결정하는

May 21, 202617 min read

자바 synchronized는 어떻게 동작할까 — 모니터, 락 인플레이션, 그리고 사라진 biased locking

synchronized 키워드 하나로 스레드 안전을 얻는 동안, JVM 안에서는 객체 헤더의 비트를 뒤집고, 스택에 락 레코드를 쌓고, 경합이 생기면 네이티브 모니터로 승격하는 일이 벌어져요. 이 글은 그 한 번의 잠금이 객체 헤더부터 ObjectMonitor까지 어떤 경로를 거치는지, 그리고 한때 있었다가 JDK 18에서 사라진 biased locking

May 19, 202616 min read

JVM 객체 할당의 비밀 — TLAB, Bump-the-Pointer, 그리고 할당이 거의 공짜인 이유

Java에서 new를 호출하면 무슨 일이 벌어질까요? "힙에 메모리를 잡는다"는 한 문장 뒤에는 스레드마다 자기만의 분양 구역을 나눠 갖는 정교한 설계가 숨어 있어요. 이 글은 HotSpot JVM이 객체 할당을 어떻게 "거의 공짜"로 만드는지 그 내부를 따라가 보려는 글이에요. JVM 메모리 동작 원리에 관심 있는 분께 권해요. 자바를 쓰다 보면 객체를

May 15, 202614 min read

Java Zero-Copy — FileChannel.transferTo, sendfile, 그리고 Kafka가 디스크를 네트워크로 흘려보내는 방법

"파일을 읽어서 소켓으로 보낸다." 한 줄짜리 요구사항이에요. 그런데 이 한 줄 뒤에서 데이터는 메모리를 네 번이나 복사하고, CPU는 커널과 유저 공간을 네 번이나 들락거려요. Kafka처럼 초당 수십만 건을 흘려보내야 하는 시스템에서 이 비용은 그냥 넘길 수가 없어요. 이 글은 그 복사를 한 겹씩 벗겨내는 zero-copy의 동작 원리를 따라가요. 전통

May 15, 202617 min read

끄적끄적 테크 블로그

165 posts

물류 회사에 다니고 있는 개발자 블로그입니다. 개발을 너무 좋아해서 정신없이 작업하다가 중간에 끄적거리며 내용들을 몇개 적어봅니다 ㅎㅎ