Kafka Broker Log Storage 내부 구조 — Segment, Index, Compaction
카프카 브로커가 디스크 위에 메시지를 어떻게 쌓고, 어떻게 찾아 읽으며, 어떻게 비워 내는지를
apache/kafkatrunk 소스 기준으로 정리합니다. Producer/Consumer 측 동작 원리는 별도 글에서 다뤘으니 이 글은 브로커 안에서 한 파티션이 디스크에 닿는 그 지점만 깊게 봅니다. 카프카 운영을 하면서 segment 파일 이름이 왜 저렇게 생겼는지,cleanup.policy=compact로 바꿨는데 왜 줄어들지 않는지가 궁금했던 분을 대상으로 합니다.
왜 Broker Log 구조를 알아야 하는가
Producer는 RecordAccumulator에서 배치를 만들어 보내고, Consumer는 fetch.min.bytes 단위로 가져갑니다. 하지만 그 사이에서 브로커가 실제로 데이터를 디스크에 어떻게 저장하느냐가 카프카 성능과 운영 행동의 절반을 결정합니다.
특히 다음 질문은 모두 storage layer를 알아야 답할 수 있습니다.
retention.ms를 1시간으로 설정했는데 왜 2시간이 지난 메시지가 남아 있을까?cleanup.policy=compact로 바꿨는데 왜 디스크 사용량이 그대로일까?- segment 파일 개수가 수만 개가 되어 broker 시작이 느려졌다 — 무엇을 잘못 설정한 걸까?
- 토픽을 삭제했는데 디스크는 즉시 비워지지 않는다 — 어디에서 지연되는 걸까?
이 글은 LogSegment, OffsetIndex/TimeIndex, LogCleaner의 세 축으로 나눠 답을 정리합니다.
한 파티션이 디스크에 닿는 모습
브로커 호스트의 log.dirs (기본값 /tmp/kafka-logs) 아래에는 토픽-파티션마다 디렉토리가 하나 있습니다. my-topic의 파티션 0이라면 my-topic-0/입니다.
그 안에는 여러 개의 segment 파일 묶음이 있습니다. 한 segment는 항상 같은 base offset(그 segment의 첫 메시지 offset)을 prefix로 한 파일 4~5개로 구성됩니다.
my-topic-0/
├── 00000000000000000000.log
├── 00000000000000000000.index
├── 00000000000000000000.timeindex
├── 00000000000000000000.snapshot
├── 00000000000012345678.log <- baseOffset = 12345678
├── 00000000000012345678.index
├── 00000000000012345678.timeindex
├── 00000000000012345678.snapshot
├── 00000000000012345678.txnindex <- 트랜잭션 토픽일 때만
└── leader-epoch-checkpoint
.log— 실제 record batch가 append-only로 쌓이는 본문 파일.index— offset → 파일 내 byte position 매핑 (sparse).timeindex— timestamp → offset 매핑 (sparse, KIP-33).snapshot— producer ID/sequence/transaction 상태 스냅샷 (멱등 프로듀서 복구용).txnindex— 중단된 트랜잭션의 (firstOffset, lastOffset, lastStableOffset) 인덱스
각 파일 이름의 숫자는 그 segment의 base offset을 0으로 패딩한 20자리 정수입니다. 그래서 ls 결과만으로도 segment가 시간 순으로 정렬되어 보입니다.
가장 큰 base offset을 가진 segment가 active segment입니다. 새로 들어오는 record batch는 모두 active segment의 .log에 append 됩니다. 그 외 segment는 닫힌 채로 읽기 전용으로 남아 있습니다.
flowchart LR
P[Producer Request] --> AS["Active Segment<br/>(00000000000012345678.log)"]
AS -.append.-> AS
OLD1["Sealed Segment<br/>(00000000000000000000.log)"]
OLD2["Sealed Segment<br/>(00000000000005678901.log)"]
OLD1 --read-only--> C[Consumer Fetch]
OLD2 --read-only--> C
AS --read-only--> C
LogSegment — append와 read의 단위
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java가 그 한 묶음을 표현하는 클래스입니다. 핵심 필드만 보면 다음과 같습니다.
public class LogSegment {
private final FileRecords log; // .log
private final LazyIndex<OffsetIndex> lazyOffsetIndex; // .index
private final LazyIndex<TimeIndex> lazyTimeIndex; // .timeindex
private final TransactionIndex txnIndex; // .txnindex
private final long baseOffset;
private volatile TimestampOffset maxTimestampAndOffsetSoFar;
// ...
}
인덱스 두 종류가 LazyIndex로 감싸져 있는 점이 눈에 띕니다. mmap 비용을 아끼기 위해 처음 접근 시점에 매핑을 만들고, 닫힌 segment는 GC에 맡깁니다.
append()의 흐름
Producer가 보낸 ProduceRequest가 leader broker의 Partition을 거쳐 결국 LogSegment.append(long largestOffset, MemoryRecords records)로 떨어집니다.
public void append(long largestOffset, MemoryRecords records) throws IOException {
if (records.sizeInBytes() > 0) {
ensureOffsetInRange(largestOffset);
long physicalPosition = log.sizeInBytes();
log.append(records); // 1. .log에 그대로 append
for (RecordBatch batch : records.batches()) {
// 2. maxTimestamp 갱신
if (batch.maxTimestamp() > maxTimestampAndOffsetSoFar.timestamp) {
maxTimestampAndOffsetSoFar = new TimestampOffset(
batch.maxTimestamp(), batch.lastOffset());
}
// 3. log.index.interval.bytes(기본 4 KiB)마다 sparse 인덱스 추가
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex().append(batch.lastOffset(), physicalPosition);
timeIndex().maybeAppend(batch.maxTimestamp(), batch.lastOffset());
bytesSinceLastIndexEntry = 0;
}
bytesSinceLastIndexEntry += batch.sizeInBytes();
}
}
}
세 가지가 중요합니다.
.log에는 wire format 그대로 적힙니다. Producer가 보낸 record batch는 broker에서 다시 직렬화되지 않고 그대로 디스크에 떨어집니다. 압축도 producer가 적용한 그대로 유지됩니다. 이것이 카프카의 압축 효율과 zero-copy 전송을 모두 떠받치는 토대입니다.- 인덱스는 sparse합니다. 모든 메시지에 대해 인덱스를 만들지 않고,
log.index.interval.bytes(기본 4 KiB) 이상 진행했을 때만 새 entry를 추가합니다. 디스크/메모리에서 인덱스 비용을 작게 유지하기 위한 핵심 설계입니다. - TimeIndex는 OffsetIndex의 보조입니다. 시간 → offset → 위치의 2단 조회 구조입니다.
read()와 translateOffset
Consumer가 OFFSET 12380000부터 읽고 싶다고 요청하면 LogSegment.read(startOffset, maxSize, ...)이 호출됩니다. 본문 1줄만 보면 됩니다.
LogOffsetPosition startOffsetAndSize = translateOffset(startOffset);
translateOffset(offset)는 OffsetIndex로 가장 가까운 인덱스 entry를 찾고, 거기서부터 .log를 선형 스캔해서 정확한 batch boundary를 잡는 일을 합니다. sparse 인덱스이기 때문에 정확한 위치는 인덱스만으로 알 수 없고, 인덱스로 출발점만 찾은 뒤 .log의 batch header를 읽어 가며 도착점을 좁힙니다.
이렇게 찾은 byte position 이후를 FileRecords로 슬라이스해서 응답 객체에 담아 반환합니다. 응답을 클라이언트로 보낼 때는 일반 read() 대신 FileChannel.transferTo() (Linux sendfile(2))를 써서 유저 공간을 거치지 않고 page cache → socket buffer로 직접 흘려보냅니다. zero-copy의 정체가 바로 이것입니다.
OffsetIndex — 8 byte entry의 정체
OffsetIndex.java는 인덱스 한 entry를 8 byte로 정의합니다.
public class OffsetIndex extends AbstractIndex {
private static final int ENTRY_SIZE = 8;
// (4 bytes: relativeOffset) + (4 bytes: physicalPosition)
}
핵심 트릭은 relative offset 인코딩입니다. segment의 baseOffset이 50이고 어떤 메시지의 absolute offset이 55라면, 인덱스에는 그 차이인 5만 4-byte int로 저장됩니다.
baseOffset이 클수록 8-byte long을 그대로 적으면 8 byte를 다 쓰지만, 한 segment 안의 offset 차이는 대부분 Integer.MAX_VALUE 안에 들어옵니다. 그래서 4 byte로 줄여 인덱스 크기를 절반으로 만든 것이 이 설계의 의도입니다. segment rolling 트리거 중 하나인 "offset overflow 위험" 체크는 이 4-byte 범위를 넘는 케이스를 미리 잘라 내기 위해 존재합니다.
lookup — binary search 변형
flowchart LR
Q["Query offset = 12380000"] --> BS[Binary search in .index]
BS --> NEAR["Nearest entry ≤ 12380000<br/>e.g. (relOff=34322, pos=15728640)"]
NEAR --> SCAN[Linear scan .log<br/>from byte 15728640]
SCAN --> HIT["Exact batch at offset 12380000"]
binary search는 mmap된 인덱스 영역 위에서 직접 수행됩니다. mmap을 쓰기 때문에 OS가 알아서 인덱스 페이지를 page cache에 올려 두고, JVM은 그 영역을 마치 일반 메모리 배열처럼 접근합니다.
찾고자 하는 offset보다 같거나 작은 가장 큰 entry를 반환하는데, "보다 같거나 작은"이라는 조건 때문에 인덱스 entry가 하나도 없거나 모두 큰 경우 (baseOffset, 0)을 돌려줘서 segment 시작부터 스캔하게 합니다.
인덱스는 망가져도 된다
/* No attempt is made to checksum the contents of this file, in the event
* of a crash it is rebuilt. */
OffsetIndex 주석의 이 한 줄이 운영적으로 중요합니다. 인덱스 파일은 체크섬이 없고, broker가 비정상 종료되면 .log를 처음부터 읽어 인덱스를 다시 만듭니다. 그래서 broker 재시작이 오래 걸린다면 대체로 segment 개수 × 인덱스 재구축 시간이 원인입니다.
TimeIndex — KIP-33이 만든 보조 인덱스
.timeindex는 12 byte entry입니다.
- 8 byte: timestamp (epoch millis)
- 4 byte: relative offset (그 timestamp가 발생한 메시지의 offset)
timestamp → offset 매핑만 갖고 있고, 그 offset이 어디 있는지는 다시 .index를 거쳐 찾습니다. KIP-33 도입 이전에는 "어떤 시각 이후의 메시지를 다오"라는 요청을 segment의 mtime으로 추정해야 했지만, KIP-33 이후로는 정확한 매핑이 가능합니다.
flowchart LR
T["fetch from t=1715500000000"] --> TI[.timeindex binary search]
TI --> OFF["nearest entry: (ts, offset=12380000)"]
OFF --> OI[.index binary search]
OI --> POS["(offset, position)"]
POS --> LOG[.log scan]
여기서 운영 함정이 하나 숨어 있습니다. .timeindex 한 entry는 .index 한 entry보다 1.5배 큽니다 (12 vs 8). 즉 같은 segment.index.bytes 한도에서 .timeindex가 먼저 차고, TimeIndex가 가득 차는 시점이 segment rolling을 유발합니다. 이상하게 segment가 작은 크기로 자주 rolling 된다면 timestamp 분포와 segment.index.bytes(기본 10 MiB)를 함께 보세요.
Segment Rolling — 새 segment를 만드는 4가지 조건
LogSegment.shouldRoll(RollParams)이 한 자리에 모아 둔 네 가지 조건 중 하나라도 참이면 active segment가 닫히고 새 segment가 열립니다.
public boolean shouldRoll(RollParams rollParams) {
boolean reachedRollMs = timeWaitedForRoll(...) > rollParams.maxSegmentMs() - rollJitterMs;
return size > rollParams.maxSegmentBytes() - rollParams.messagesSize()
|| (size > 0 && reachedRollMs)
|| offsetIndex().isFull()
|| timeIndex().isFull()
|| !canConvertToRelativeOffset(rollParams.maxOffsetInMessages());
}
- 크기 한도 —
segment.bytes(기본 1 GiB)에 도달 - 시간 한도 —
segment.ms(기본 7일)에 도달 - 인덱스 가득 — OffsetIndex 또는 TimeIndex가
segment.index.bytes(기본 10 MiB) 초과 - offset overflow — relative offset이 4-byte 한도를 넘길 위험
여기서 rollJitterMs가 등장하는데, log.roll.jitter.ms로 설정 가능한 0~지정값 사이 무작위 지연입니다. 모든 토픽이 같은 절대 시각에 rolling 되어 fsync IO가 동시에 폭발하는 thundering herd를 막기 위한 장치입니다.
Retention — cleanup.policy=delete로 비우기
cleanup.policy=delete(기본값)는 가장 단순한 정책입니다. 오래된 segment를 통째로 지웁니다.
판정 기준은 두 가지입니다.
- 시간 —
retention.ms(기본 7일)보다 segment 안의 maxTimestamp가 더 오래되었으면 삭제 대상 - 용량 — 파티션 전체 크기가
retention.bytes를 초과하면 가장 오래된 segment부터 삭제
여기서 가장 헷갈리는 부분이 시간 기준입니다. 삭제 판정은 segment의 maxTimestamp 기준이지 개별 메시지 기준이 아닙니다. 즉 segment 하나에 1시간치 메시지가 들어 있고 retention.ms=1h라도, segment의 maxTimestamp가 30분 전이라면 그 segment는 아직 살아 있습니다. 결과적으로 "1시간 retention"이 실제로는 최대 retention.ms + segment.ms 까지 길어질 수 있습니다.
또 한 가지, active segment는 절대 삭제 대상이 아닙니다. active segment는 새 메시지가 계속 append 되므로 retention 대상에서 빠집니다. 트래픽이 거의 없는 토픽에서 retention이 영영 발동하지 않는 현상의 원인은 대부분 이것입니다 — segment.ms를 짧게 설정해서 주기적으로 rolling 되게 만들어야 합니다.
삭제 자체도 두 단계입니다.
flowchart LR
A["00000000000000000000.log"] --rename--> B["00000000000000000000.log.deleted"]
B --wait file.delete.delay.ms (default 60s)--> C[unlink]
- 파일 이름 끝에
.deleted를 붙여 rename file.delete.delay.ms(기본 60초) 후에 실제 unlink
이 1단계 rename은 이미 fetch 진행 중인 consumer 측 FileChannel이 깨지지 않게 하기 위함입니다. POSIX는 unlink 된 파일도 열려 있는 동안 계속 읽을 수 있게 해 주지만, 진행 중인 IO를 안전하게 끝낸 뒤 unlink 하는 편이 운영 가시성에 좋습니다.
Log Compaction — cleanup.policy=compact의 진짜 동작
cleanup.policy=compact는 의미가 완전히 다릅니다. 같은 key에 대해 가장 최근 value 하나만 남기고 그 이전 값을 모두 지웁니다. Kafka Streams의 changelog 토픽, __consumer_offsets, Connect의 config/status 토픽이 모두 이 정책으로 동작합니다.
LogCleaner — 별도 스레드 풀
compaction은 메인 broker IO 경로가 아니라 LogCleaner라는 별도 스레드 풀이 처리합니다. log.cleaner.threads(기본 1) 만큼 CleanerThread가 떠 있고, 각 스레드는 무한 루프에서 다음을 반복합니다.
flowchart TD
A[grabFilthiestCompactedLog] --> B{선택됨?}
B -- No --> Z[sleep & retry]
B -- Yes --> C[buildOffsetMap on dirty section]
C --> D[groupSegmentsBySize]
D --> E[cleanInto: 각 그룹을 새 segment로 다시 쓰기]
E --> F[swap .cleaned -> .log]
F --> A
dirty section과 cleanable point
핵심 개념이 first dirty offset / cleanable point입니다.
- first dirty offset — 아직 compaction이 닿지 않은 가장 오래된 offset. 직전 compaction이 어디까지 끝났는지 기억하는 값입니다. (
cleaner-offset-checkpoint파일에 저장) - cleanable point — 이번 compaction이 처리할 수 있는 가장 새로운 offset. 보통 active segment 직전까지입니다.
[ clean | dirty | active ]
0 D C N
^---- this run's target ----^
active segment가 cleanable 범위에서 빠지는 이유는 단순합니다 — 새 메시지가 계속 들어오는 segment를 건드리면 lock과 race가 끔찍해지기 때문입니다. 그래서 active segment에만 메시지가 쌓이는 저트래픽 토픽은 compaction이 영영 발동하지 않습니다. 이때도 해결은 segment.ms를 짧게 설정해 주기적으로 rolling 되게 만드는 것입니다.
또 다른 두 다이얼이 있습니다.
min.compaction.lag.ms— 메시지가 producer 시각으로 이 시간 이내에 producer 된 것이면 compaction 대상에서 제외. 0이 기본이라 즉시 compaction 가능합니다. 트랜잭션 환경에서 short-lived 메시지를 잠시 보호하고 싶을 때 씁니다.max.compaction.lag.ms— 반대로 이 시간이 지나면 강제로 compaction. 기본Long.MAX_VALUE라 사실상 무한. GDPR 같은 컴플라이언스 요구에 대응할 때 유한값으로 줄입니다.
dirtiest log를 어떻게 고르는가
LogCleanerManager.grabFilthiestCompactedLog가 후보 파티션 중 dirty ratio가 가장 높은 파티션을 하나 고릅니다. dirty ratio는 dirty bytes / (clean bytes + dirty bytes)로 정의되고, min.cleanable.dirty.ratio(기본 0.5) 이상인 후보만 통과합니다. 즉 dirty가 전체의 절반 이상일 때까지 기다렸다가 한 번에 정리하는 batch 전략입니다.
기본값 0.5의 의미를 풀어 보면, 같은 key 업데이트를 자주 하는 changelog 토픽일수록 디스크가 두 배 가까이 부풀어 있다가 한 번에 줄어드는 모양이 됩니다. 디스크 그래프에서 톱니파가 보인다면 정상입니다. 이 비율을 줄이면 자주 깎이는 대신 cleaner CPU 비용이 올라갑니다.
Offset Map — 키 → 최신 offset 사전
compaction의 핵심은 dirty 구간을 한 번 훑어서 key → 최신 offset 매핑을 만드는 일입니다. 이 매핑은 OffsetMap 인터페이스의 구현체인 SkimpyOffsetMap이 들고 있습니다.
SkimpyOffsetMap은 이름 그대로 검소합니다.
- 키 자체를 저장하지 않고 키의 cryptographic hash (MD5) 만 저장
- value는 8 byte long (offset)
- linear probing 기반 open-addressing hash table
키를 저장하지 않기 때문에 충돌이 났을 때 진짜로 같은 키인지는 확인 불가능합니다. 대신 강한 해시(MD5)를 써서 충돌 확률을 무시할 수 있을 만큼 낮춥니다. log.cleaner.io.buffer.load.factor(기본 0.9)로 채워지면 더 이상 추가하지 않고 현재 상태로 compaction에 들어갑니다.
메모리 사용량은 log.cleaner.dedupe.buffer.size(기본 128 MiB) 한 cleaner thread 전체가 나눠 씁니다. 한 entry는 24 byte (MD5 16 + offset 8) 정도이므로 128 MiB로 약 4~500만 키를 한 번에 처리할 수 있습니다. 한 파티션의 dirty key 수가 이를 넘으면 compaction이 분할되어 한 번에 끝나지 않습니다.
cleanInto — 새 segment로 다시 쓰기
Offset Map이 완성되면 dirty 구간의 segment들을 모아 그룹으로 묶고 (groupSegmentsBySize), 각 그룹을 cleanInto로 처리합니다.
cleanInto는 그룹 안의 segment들을 순회하면서 record 하나하나에 대해 결정을 내립니다.
flowchart TD
R["record (key, offset)"] --> Q1{tombstone?<br/>key=any, value=null}
Q1 -- No --> Q2{key in offset map?}
Q1 -- Yes --> Q3{deleteHorizonMs reached?}
Q2 -- Yes --> Q4{offset == latest?}
Q2 -- No --> KEEP[유지]
Q4 -- Yes --> KEEP
Q4 -- No --> DROP[버림]
Q3 -- No --> KEEP
Q3 -- Yes --> DROP
유지하기로 결정된 record는 새 .cleaned 확장자의 임시 segment에 다시 쓰입니다. 그룹 처리가 끝나면 임시 파일을 원래 segment 이름으로 swap하고 옛 파일을 삭제합니다. 도중에 broker가 죽어도 .cleaned 또는 .swap 확장자 단계에서 발견되어 안전하게 롤백되거나 재개됩니다.
여기서 알아 둘 것이 두 가지입니다.
- compaction은 in-place가 아닙니다. 임시로 새 segment를 만들고 swap 합니다. 그래서 compaction이 도는 동안 일시적으로 디스크가 두 배 가까이 필요할 수 있습니다.
- segment 단위로 묶어서 처리합니다. 묶음은
segment.bytes와segment.index.bytes를 동시에 만족하는 범위로 잘리며, 결과 segment가 너무 작아져 segment 수가 폭증하지 않게 합니다.
Tombstone — null value의 의미
key는 있지만 value가 null인 record는 tombstone으로 해석됩니다. compaction 시점에 "이 키를 삭제하라"는 의미입니다.
다만 tombstone은 즉시 사라지지 않습니다.
flowchart LR
A["First cleaning sees tombstone"] --> B["Mark deleteHorizonMs<br/>= now + delete.retention.ms"]
B --> C["Tombstone visible to consumers"]
C --> D["Subsequent cleaning sees<br/>now > deleteHorizonMs"]
D --> E["Drop tombstone"]
처음 compaction에서 tombstone을 만나면 deleteHorizonMs = now + delete.retention.ms(기본 24시간)를 segment 헤더에 기록합니다. 그동안은 그대로 디스크에 남아 있고 consumer도 읽을 수 있습니다. slow consumer가 tombstone을 읽고 자기 상태에서 key를 지울 시간을 주기 위함입니다.
운영 함정 하나: delete.retention.ms를 짧게(예: 1분) 설정하고 consumer가 잠깐 멈춰 있는 사이 compaction이 두 번 돌면, consumer는 tombstone을 영영 못 보게 됩니다. 그 결과 consumer 측 캐시에 deleted key가 남는 사고가 생깁니다. 기본값 24시간은 이런 사고를 막기 위한 보수적 기본값입니다.
트랜잭션 마커도 비슷한 처리
__transaction_state나 EOS를 쓰는 토픽의 .log에는 일반 record 외에 COMMIT/ABORT 트랜잭션 마커가 섞입니다. 마커도 tombstone과 같은 방식으로 deleteHorizonMs 기반 지연 삭제를 거치며, 그 사이 consumer는 read_committed 모드로 정확한 visibility를 보장받습니다.
Page Cache와 sendfile — 디스크가 빠른 이유
여기까지 보면 카프카는 모든 IO를 디스크에 쓰는 시스템입니다. 왜 빠를까요? 답은 카프카가 메모리를 직접 관리하지 않고 OS page cache에 위임한다는 데 있습니다.
- Producer가 보낸 데이터는 broker가
FileChannel.write로.log에 쓰면 일단 page cache로 들어갑니다. 디스크 fsync는 비동기로, 또는log.flush.interval.messages/log.flush.interval.ms기준으로 지연 실행됩니다. - Consumer가 fetch하면
FileChannel.transferTo가 호출되어 page cache 페이지를 그대로 socket buffer로 DMA 전송합니다. JVM heap에 한 번도 복사되지 않습니다.
flowchart LR
P[Producer] --> NIC1[NIC]
NIC1 --> SB1[Socket buffer]
SB1 --> PC[Page cache]
PC --DMA via sendfile--> SB2[Socket buffer]
SB2 --> NIC2[NIC]
NIC2 --> C[Consumer]
PC -.async flush.-> DISK[Disk]
이 모델 덕분에 카프카는 JVM heap을 작게 (보통 6 GiB 내외) 유지하고 나머지 RAM을 모두 page cache로 쓰는 패턴이 권장됩니다. heap을 키운다고 빨라지지 않고, 오히려 GC pause만 늘어납니다.
log.flush.interval.messages의 기본값은 Long.MAX_VALUE로 사실상 강제 flush를 끈 상태입니다. 카프카는 replication factor와 ISR로 내구성을 만들지, 단일 broker의 fsync에 의존하지 않습니다. 그래서 단일 broker가 죽어도 다른 broker의 page cache+log가 살아남으면 데이터는 안전합니다.
운영 관점 정리
지금까지 본 내용을 운영 증상-원인 표로 묶으면 다음과 같습니다.
| 증상 | 원인 | 대응 |
|---|---|---|
| retention이 발동하지 않음 | active segment에만 메시지가 쌓임 | segment.ms 짧게 (예: 1h) |
| compact 토픽이 줄어들지 않음 | 동일 — active segment에 dirty 모두 존재 | segment.ms 짧게 |
retention.ms보다 오래 메시지가 남음 |
maxTimestamp 기준 + segment.ms 가산 | 정상 동작 |
| broker 재시작이 매우 느림 | segment 수 폭증 → 인덱스 재구축 비용 | segment.bytes/segment.ms 키워서 segment 수 감소 |
| segment가 너무 작은 크기로 자주 rolling | TimeIndex가 OffsetIndex보다 먼저 참 | segment.index.bytes 상향 또는 timestamp 분포 점검 |
| compaction 중 디스크가 두 배로 부풂 | .cleaned 임시 파일 생성 |
디스크 여유 30% 이상 확보 |
| compaction 후에도 tombstone이 남음 | delete.retention.ms 만료 전 |
정상. 만료 후 다음 compaction에서 제거 |
| consumer 캐시에 deleted key가 남음 | tombstone을 못 본 채 compaction 두 번 진행 | delete.retention.ms 늘리고 consumer SLA 점검 |
| compaction CPU 사용량이 큼 | 큰 dirty 구간을 한 번에 빌드 | min.cleanable.dirty.ratio 낮춰 자주 작게 처리 |
마무리
카프카 broker의 storage layer는 세 개의 레이어로 떨어집니다.
- LogSegment — append-only
.log파일 + sparse 인덱스 두 개를 한 묶음으로 묶은 단위 - Rolling/Retention — segment 단위 lifecycle 관리. active segment만 쓰기, sealed segment만 retention 대상
- LogCleaner —
cleanup.policy=compact토픽에 대해 dirty 구간을 offset map으로 압축
이 세 레이어를 알면 "왜 줄지 않는가" "왜 빠른가" "왜 재시작이 느린가"가 모두 같은 그림 위에서 설명됩니다. Producer/Consumer는 결국 이 storage layer가 정의한 모양에 맞춰 동작하며, 운영 튜닝의 절반은 segment.ms / retention.ms / min.cleanable.dirty.ratio 세 가지를 토픽 특성에 맞게 조정하는 것으로 끝납니다.
참고자료
- Apache Kafka Documentation — Log Compaction: https://kafka.apache.org/documentation/#compaction
- Apache Kafka Documentation — Broker Configs: https://kafka.apache.org/documentation/#brokerconfigs
- Apache Kafka Documentation — Topic Configs: https://kafka.apache.org/documentation/#topicconfigs
- LogSegment.java (trunk): https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
- OffsetIndex.java (trunk): https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
- TimeIndex.java (trunk): https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
- LogConfig.java (trunk): https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
- LogCleaner.scala (trunk): https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogCleaner.scala
- LogCleanerManager.scala (trunk): https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogCleanerManager.scala
- KIP-33: Add a time based log index — https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
- KIP-354: Add a Maximum Log Compaction Lag — https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Add+a+Maximum+Log+Compaction+Lag

