Kafka Connect 내부 동작 — Worker, Task, 내부 토픽, 그리고 KIP-618 Exactly-Once
Apache Kafka의 두 번째 데이터 평면이라고 부를 만한 Kafka Connect의 내부 동작을 정리합니다. 외부 시스템과 카프카를 잇는 이 프레임워크가 Worker/Connector/Task 세 계층으로 책임을 나누는 방식, distributed 모드가 세 개의 내부 토픽 위에 세워지는 구조, KIP-415 incremental cooperative rebalancing이 정지 없는 스케일링을 만드는 원리, 그리고 KIP-618이 source connector까지 exactly-once를 끌어올린 방식을 같은 그림 위에 풀어냅니다.
왜 Connect가 필요한가
카프카로 데이터를 받거나 내보내는 가장 단순한 방법은 직접 KafkaProducer / KafkaConsumer를 들고 코드를 짜는 것입니다. 데이터베이스의 변경분을 토픽으로 흘려보내는 작업, 토픽의 메시지를 S3나 Elasticsearch에 적재하는 작업 모두 Producer/Consumer로 가능합니다.
문제는 이 코드들이 매번 똑같다는 점입니다. 누가 짜든:
- 외부 시스템의 커서/오프셋을 안전하게 저장해야 합니다(중복 적재 방지).
- 장애 시 재시작 후 마지막 지점부터 재개해야 합니다.
- 파티션 단위로 병렬도를 늘리고, 인스턴스가 늘어나면 작업을 다시 나눠야 합니다.
- 모니터링/REST 관리/설정 변경 같은 운영 표면을 갖춰야 합니다.
매번 이 인프라를 새로 짜는 대신, 외부 시스템과 카프카 사이의 데이터 이동을 표준화한 프레임워크가 Kafka Connect입니다. 사용자는 Connector 인터페이스에 해당 시스템과의 통신 로직만 채워 넣고, 위에 나열한 모든 인프라(병렬화, 오프셋 관리, 설정 분배, 장애 복구, REST API)는 Connect 프레임워크가 떠맡습니다.
flowchart LR
DB[(Source DB)] --> SC[Source Connector]
SC --> K[(Kafka Cluster)]
K --> SK[Sink Connector]
SK --> ES[(Elasticsearch)]
subgraph Connect["Kafka Connect Cluster"]
SC
SK
end
Connect는 별도의 JVM 프로세스 묶음으로 실행되며, 브로커와는 다른 컴포넌트입니다. 브로커가 메시지 저장과 전달에 집중한다면, Connect는 그 위에서 "외부 시스템과의 어댑터"를 표준화된 방식으로 호스팅합니다.
세 계층의 추상화 — Worker, Connector, Task
Connect의 핵심은 책임을 셋으로 쪼갠 추상화입니다.
Worker — 실행 프로세스
Worker는 Connect의 JVM 프로세스 그 자체입니다. REST 요청을 받고, 설정을 분배하고, 실제 데이터 이동 코드(Task)를 자기 안에서 스레드로 돌립니다. 사용자가 명령행에서 띄우는 것은 항상 Worker이며, Connect 클러스터는 Worker들의 모임입니다.
기본 REST 포트는 8083입니다. 어떤 Worker로 요청을 보내도 같은 결과가 나오며, 필요하면 Worker가 요청을 적절한 노드로 포워딩합니다.
Connector — 작업의 정의와 분할
Connector는 "어떤 외부 시스템과 어떻게 이야기할지"의 정의입니다. 실제 데이터를 옮기지는 않습니다. 대신 두 가지 책임을 갖습니다.
- 사용자 설정을 받아 검증한다.
- 그 설정을 N개의 Task 설정으로 쪼갠다(
taskConfigs(int maxTasks)).
예를 들어 JDBC Source Connector는 사용자가 준 테이블 목록을 보고 "Task 0은 이 4개 테이블을 폴링, Task 1은 저 4개 테이블을 폴링" 같은 식으로 작업을 나눕니다. 파일 시스템 Source라면 디렉토리를 나누고, S3 Sink라면 파티션을 나눕니다.
Connector는 클러스터 어딘가의 단 한 Worker에서만 실행됩니다(= leader Worker).
Task — 실제 데이터 이동
Task는 실제 바이트를 움직이는 코드입니다. Source면 외부 시스템에서 읽어 SourceRecord를 만들고, Sink면 카프카에서 받은 SinkRecord를 외부 시스템에 씁니다.
Task는 Worker들에 분산 배치됩니다. Worker 3개 × Task 6개라면 평균 Worker 1대가 Task 2개씩 잡고 도는 그림이 됩니다.
flowchart TB
subgraph W1["Worker 1 (JVM)"]
C[Connector A definition]
T1[Task A-0]
T2[Task A-1]
end
subgraph W2["Worker 2 (JVM)"]
T3[Task A-2]
T4[Task B-0]
end
subgraph W3["Worker 3 (JVM)"]
T5[Task B-1]
end
C -. taskConfigs .-> T1
C -. taskConfigs .-> T2
C -. taskConfigs .-> T3
이 분리가 중요한 이유는 Connector는 1개이지만 Task는 N개라는 점에 있습니다. 사용자는 "한 Connector에 1~`tasks.max`개의 Task를 띄워라"라고 선언하고, 작업을 쪼개는 책임은 Connector 클래스가, 띄우고 분배하는 책임은 Worker 그룹이 갖습니다.
Source vs Sink — 두 방향의 데이터 이동
SourceTask — pull 모델
SourceTask는 외부 시스템에서 데이터를 끌어와 카프카에 넣는 쪽입니다. 핵심 메서드는 다음과 같습니다.
public abstract class SourceTask implements Task {
public abstract void start(Map<String, String> props);
public abstract List<SourceRecord> poll() throws InterruptedException;
public void commitRecord(SourceRecord record,
RecordMetadata metadata) throws InterruptedException {}
public void commit() throws InterruptedException {}
public abstract void stop();
}
Worker는 별도 스레드에서 무한 루프로 poll()을 호출합니다. 데이터가 없으면 poll()은 null을 돌려주거나 잠시 블록하고, 호출 사이에 너무 오래 잡고 있지 않는 것이 권장됩니다(PAUSED 상태 전환에 응답해야 하기 때문). 받은 SourceRecord 묶음은 Worker 내부의 KafkaProducer로 카프카에 발행됩니다.
SourceRecord에는 payload(key, value, topic, partition) 와 함께 source partition / source offset 한 쌍이 같이 실립니다. 이 둘은 카프카 파티션이 아니라 "외부 시스템 입장에서의 커서"입니다. JDBC라면 { "table": "orders" } → { "id": 1024 }, 파일이라면 { "filename": "a.log" } → { "offset": 89456 } 같은 식입니다. Worker는 이 source offset을 따로 offsets 토픽에 저장합니다(뒤에서 다룹니다).
commitRecord()는 프로듀서가 해당 레코드에 대해 ACK 콜백을 받았을 때 호출되며, Task가 그 시점에 외부 시스템에 별도 커서를 찍어야 할 때 사용합니다. transformation으로 record가 필터링된 경우에도 호출됩니다(broker에는 도달하지 않더라도 framework는 이를 "처리 완료"로 간주합니다).
SinkTask — push 모델
SinkTask는 반대 방향입니다. 카프카 토픽을 컨슈머로 읽어 외부 시스템에 쓰는 쪽입니다.
public abstract class SinkTask implements Task {
public abstract void start(Map<String, String> props);
public abstract void put(Collection<SinkRecord> records);
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {}
public void open(Collection<TopicPartition> partitions) {}
public void close(Collection<TopicPartition> partitions) {}
public abstract void stop();
}
Worker는 내부 KafkaConsumer로 토픽을 폴링하고, 받은 레코드를 SinkRecord로 감싸 put()에 넘깁니다. SinkTask는 이를 외부 시스템에 기록합니다. 일정 주기마다 Worker는 flush()를 호출하고, 그 호출이 정상 반환되면 그 시점까지의 컨슈머 오프셋을 카프카에 커밋합니다.
여기서 한 가지 미묘한 점은, Sink connector의 컨슈머 오프셋은 일반 컨슈머와 똑같이 카프카의 __consumer_offsets 에 저장된다는 점입니다. Source 쪽 source offset은 별도 내부 토픽으로 가지만, Sink 쪽은 굳이 따로 둘 필요가 없습니다. 이미 카프카 컨슈머 그룹의 오프셋 메커니즘이 잘 동작하기 때문입니다.
flowchart LR
subgraph SRC["Source flow"]
EXT1[(External system)] --> P1["SourceTask.poll()"]
P1 --> PROD[Worker's Producer]
PROD --> TOPIC1[(Kafka topic)]
P1 -. source offsets .-> OFF[(offset.storage.topic)]
end
flowchart LR
subgraph SNK["Sink flow"]
TOPIC2[(Kafka topic)] --> CONS[Worker's Consumer]
CONS --> P2["SinkTask.put()"]
P2 --> EXT2[(External system)]
P2 -. consumer offsets .-> CO[(__consumer_offsets)]
end
데이터 변환 파이프라인 — Converter와 SMT
Connector가 만든 SourceRecord가 카프카에 닿기까지의 경로 위에는 두 개의 변환 단계가 더 있습니다.
SourceTask.poll()
→ Transformations (SMT chain)
→ Converter (Connect 내부 표현 → 직렬화된 바이트)
→ Producer → broker
Converter — 직렬화 책임
Converter는 Connect의 내부 데이터 모델(org.apache.kafka.connect.data.*의 Schema/Struct/Map/원시 타입)을 카프카로 보낼 바이트로 직렬화합니다. 반대 방향도 같습니다.
기본 제공되는 Converter는 다음과 같습니다.
| Converter | 형식 | 비고 |
|---|---|---|
StringConverter |
UTF-8 문자열 | 가장 단순 |
ByteArrayConverter |
원시 바이트 | 변환 없음 |
JsonConverter |
JSON | schemas.enable=true면 스키마 포함 |
AvroConverter (Confluent) |
Avro 바이너리 | Schema Registry 필요 |
ProtobufConverter (Confluent) |
Protobuf | Schema Registry 필요 |
JsonSchemaConverter (Confluent) |
JSON + 스키마 | Schema Registry 필요 |
key.converter와 value.converter는 별도로 지정할 수 있습니다. 예를 들어 키는 StringConverter, 값은 AvroConverter처럼 비대칭 설정이 흔합니다.
Schema Registry 기반 Converter들은 첫 직렬화 시 스키마를 Registry에 자동 등록하고, 메시지에는 5바이트짜리 magic byte + schema ID만 박아 넣습니다. 역직렬화 측은 ID로 스키마를 받아 와 디코딩합니다. 이 분리가 메시지 크기와 스키마 진화를 동시에 해결하는 핵심 트릭입니다.
SMT — 단일 메시지 변환 체인
Single Message Transforms(SMT)는 KIP-66에서 도입된, 메시지 한 건 단위의 변환 기능입니다. 컨버터 단계 이전에 끼어들어 SourceRecord/SinkRecord를 수정합니다.
transforms=insertKey,extractInt,mask
transforms.insertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.insertKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
transforms.mask.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.mask.fields=ssn
transforms 설정값은 별칭의 순서 있는 리스트입니다. 첫 번째 transform이 record를 받아 수정해 돌려주면, 그 출력이 두 번째 transform의 입력이 되는 식으로 체인이 흐릅니다. 어떤 transform이 null을 돌려주면 그 record는 폐기됩니다(Source면 카프카에 발행되지 않고, Sink면 SinkTask에 전달되지 않습니다).
flowchart LR
R0[Record] --> T1[Transform 1]
T1 --> T2[Transform 2]
T2 --> T3[Transform 3]
T3 --> CONV[Converter]
CONV --> KAFKA[(Kafka)]
KIP-585에서 추가된 Predicate는 이 체인에 조건 분기를 더합니다. transforms.foo.predicate=topicMatches처럼 지정하면, 해당 transform은 predicate가 true인 record에만 적용됩니다.
Transformation 인터페이스 자체는 단순합니다.
public interface Transformation<R extends ConnectRecord<R>>
extends Configurable, Closeable {
R apply(R record);
ConfigDef config();
void close();
}
stateless가 권장되며, record 한 건만으로 결정 가능한 작업(필드 제거, 헤더 추가, 라우팅 변경, 마스킹 등)에 적합합니다. 여러 record를 모아 집계하는 작업은 SMT의 범위를 벗어나며, Kafka Streams나 ksqlDB가 다룰 영역입니다.
두 가지 실행 모드 — Standalone vs Distributed
Connect Worker는 두 가지 모드로 실행됩니다.
Standalone — 한 프로세스
connect-standalone.sh 스크립트로 띄우는 모드입니다. Worker 한 대가 Connector를 받고, Task를 자기 안에서 띄우고, source offset도 로컬 파일(offset.storage.file.filename)에 저장합니다. REST API도 그 한 Worker 안에서만 동작합니다.
이름 그대로 단일 노드 운영에 적합하며, 개발/테스트, 로컬 파일 기반 작업, 단순한 사이드카 패턴에 쓰입니다. 확장성과 가용성을 제공하지 않습니다. 그리고 뒤에서 다룰 KIP-618 exactly-once는 standalone 모드에서는 지원되지 않습니다.
Distributed — Worker 클러스터
connect-distributed.sh로 띄우는 운영용 모드입니다. 같은 group.id를 가진 Worker들이 카프카의 그룹 멤버십 프로토콜로 서로를 발견하고 Connect 클러스터를 이룹니다. 이 group.id는 컨슈머 그룹의 group.id와는 완전히 별개의 네임스페이스입니다.
distributed 모드의 모든 상태는 카프카 내부 토픽 세 개에 저장됩니다. 다음 절의 핵심입니다.
| 항목 | Standalone | Distributed |
|---|---|---|
| 설정 저장 | 로컬 properties 파일 | config.storage.topic |
| Source 오프셋 | 로컬 파일 | offset.storage.topic |
| 상태 | 메모리 | status.storage.topic |
| 스케일링 | 불가 | Worker 추가/제거 |
| 장애 복구 | Worker 죽으면 정지 | 다른 Worker로 재할당 |
| Exactly-once source | 미지원 | KIP-618로 지원 |
| 사용처 | 개발/테스트 | 운영 |
세 개의 내부 토픽 — Distributed 모드의 등뼈
distributed 모드 클러스터는 카프카 안에 세 개의 토픽을 만들어 그 위에 상태를 올립니다. 세 토픽 모두 cleanup.policy=compact 인 것이 핵심입니다. 같은 키에 대해 항상 최신 값만 유지되는 KV 스토어처럼 동작시킵니다.
config.storage.topic — 클러스터의 설정 평면
연결된 Connector들과 그들이 생성한 Task 설정이 들어갑니다. 사용자가 REST API로 connector 설정을 만들거나 바꿀 때, 또는 Connector가 reconfiguration을 요청할 때마다 새 레코드가 발행되며, 모든 Worker는 이 토픽을 컨슘해 같은 설정을 본다는 보장을 얻습니다.
권장 파티션 수는 1입니다. 작은 토픽이지만 모든 Worker가 동일한 순서로 읽어야 하기 때문입니다.
offset.storage.topic — Source connector의 커서
Source connector가 발행하는 SourceRecord의 source partition / source offset 한 쌍이 여기에 들어갑니다. 키는 [connector_name, source_partition], 값은 source_offset으로 직렬화됩니다.
권장 파티션 수는 25 또는 50 정도로 잡습니다. 여러 Connector × 여러 source partition의 업데이트가 동시에 들어가기 때문에 충분한 병렬도가 필요합니다. 복제 인자는 3 이상을 권장합니다.
status.storage.topic — 상태 평면
각 Connector와 Task의 현재 상태(RUNNING, PAUSED, FAILED, STOPPED)와 에러 메시지가 발행되는 토픽입니다. REST API의 /status 응답은 이 토픽의 최신 상태를 읽어 만들어집니다.
권장 파티션 수는 5 정도입니다. 상태 변경 빈도는 낮지만 모든 Worker가 빠르게 보고 보는 정도면 충분합니다.
flowchart TB
subgraph WORKERS["Connect Worker group (group.id=foo)"]
W1[Worker 1]
W2[Worker 2]
W3[Worker 3]
end
subgraph KAFKA["Kafka cluster"]
CFG[(config.storage.topic<br/>1 partition, compact)]
OFF[(offset.storage.topic<br/>25 partitions, compact)]
STAT[(status.storage.topic<br/>5 partitions, compact)]
end
W1 <--> CFG
W2 <--> CFG
W3 <--> CFG
W1 <--> OFF
W2 <--> OFF
W3 <--> OFF
W1 <--> STAT
W2 <--> STAT
W3 <--> STAT
이 그림이 distributed Connect의 본질입니다. Worker 클러스터의 상태는 카프카 토픽에 있고, Worker는 stateless에 가깝게 동작합니다. Worker 하나가 죽어도 다른 Worker가 같은 토픽들을 읽어 동일한 사실 위에서 다시 출발할 수 있습니다.
같은 Connect 클러스터에 속한다는 것의 정의는 곧 "같은 group.id, 같은 config.storage.topic / offset.storage.topic / status.storage.topic 이름을 공유한다"이고, 모든 distributed worker 설정은 이 네 값이 정확히 일치해야 합니다.
클러스터 협조 — group membership과 rebalance
distributed Worker들이 서로를 발견하고 작업을 나누는 메커니즘은, 컨슈머 그룹이 파티션을 나누는 것과 같은 카프카의 그룹 멤버십 프로토콜 위에 만들어졌습니다. 다만 자원이 "토픽 파티션"이 아니라 "Connector와 Task"이고, 그래서 별도의 assignor가 필요합니다.
Eager rebalance — 옛 방식의 stop-the-world
Apache Kafka 2.3 이전까지 Connect는 컨슈머와 같은 eager 프로토콜을 썼습니다. 멤버십에 변화가 생길 때마다(Worker 추가/제거, Connector 추가/제거, 설정 변경):
- 모든 Worker가 자신이 가진 모든 Connector/Task를 놓는다(revoke).
- Leader가 새 할당을 계산한다.
- 모든 Worker가 새로 받은 자원을 시작한다.
이 사이 클러스터 전체의 데이터 흐름은 정지합니다. 단 한 Worker만 추가/재시작해도 전체가 멈춥니다.
KIP-415 — Incremental Cooperative Rebalancing
2.3에서 도입된 incremental cooperative rebalancing(IC rebalancing)은 이 stop-the-world를 없앴습니다.
핵심 아이디어는 두 가지입니다.
- 변한 것만 옮긴다. 새 멤버가 들어왔다면, 기존 멤버는 자기가 잡고 있던 자원 중 새 멤버에게 넘겨야 할 것만 놓고, 나머지는 계속 돌립니다.
- 두 단계 rebalance. 첫 라운드에서 leader가 새 계획을 알리고, "넘겨야 할 자원이 있는 worker"만 그 자원을 revoke합니다. 두 번째 라운드에서 새 멤버가 그 자원을 받습니다. 그 외 worker들은 한 번도 일을 멈추지 않습니다.
- scheduled rebalance delay. worker가 잠시 사라졌다 다시 살아나는 흔한 경우(롤링 재시작, 짧은 GC pause)를 위해
scheduled.rebalance.max.delay.ms(기본 5분)만큼은 그 worker의 자원을 다른 worker에게 옮기지 않고 비워 둡니다. 죽었던 worker가 그 안에 돌아오면 자기 자원을 그대로 다시 잡습니다.
flowchart TB
subgraph BEFORE["Before (eager)"]
E1[Add Worker 4] --> E2["ALL Workers revoke"]
E2 --> E3["ALL Workers idle"]
E3 --> E4["Leader plans"]
E4 --> E5["ALL Workers restart all tasks"]
end
subgraph AFTER["After (incremental cooperative)"]
I1[Add Worker 4] --> I2["Leader plans delta"]
I2 --> I3["Only Workers losing tasks revoke them"]
I3 --> I4["Worker 4 picks them up"]
I4 --> I5["Other Workers never stopped"]
end
이 변화 덕분에 운영 중인 Connect 클러스터의 스케일 아웃이 데이터 흐름에 거의 영향을 주지 않게 되었습니다.
REST API — 클러스터의 단일 관리 표면
distributed 모드의 모든 운영은 8083 포트의 REST API로 합니다. 어떤 Worker로 요청을 보내도 같은 결과가 나오며, 필요한 경우 Worker가 leader로 요청을 포워딩합니다.
자주 쓰이는 엔드포인트는 다음과 같습니다.
| Method | Path | 용도 |
|---|---|---|
| GET | / |
Worker 정보, Kafka 클러스터 ID |
| GET | /connectors |
Connector 이름 목록 |
| POST | /connectors |
Connector 생성 |
| GET | /connectors/{name} |
Connector 설정 조회 |
| PUT | /connectors/{name}/config |
Connector 설정 변경 또는 생성 |
| DELETE | /connectors/{name} |
Connector 삭제 |
| GET | /connectors/{name}/status |
Connector + Task 상태 |
| GET | /connectors/{name}/tasks |
Task 목록 |
| GET | /connectors/{name}/tasks/{id}/status |
특정 Task 상태 |
| POST | /connectors/{name}/restart |
Connector(및 옵션으로 Task) 재시작 (KIP-745) |
| POST | /connectors/{name}/tasks/{id}/restart |
특정 Task만 재시작 |
| PUT | /connectors/{name}/pause |
일시정지 |
| PUT | /connectors/{name}/resume |
재개 |
| PUT | /connectors/{name}/stop |
Stopped 상태로 전환(KIP-875) |
| GET | /connectors/{name}/offsets |
Source/Sink 오프셋 조회 (KIP-875) |
| PATCH/DELETE | /connectors/{name}/offsets |
오프셋 변경/리셋 (KIP-875) |
| GET | /connector-plugins |
설치된 Connector/Transform 플러그인 |
| PUT | /connector-plugins/{name}/config/validate |
설정 검증 |
| GET | /admin/loggers |
로거 레벨 조회 |
| PUT | /admin/loggers/{name} |
로거 레벨 변경 |
KIP-745는 restart 엔드포인트에 includeTasks, onlyFailed 쿼리 파라미터를 추가해 "실패한 Task만 재시작" 같은 패턴을 한 번에 처리할 수 있게 했습니다. KIP-875는 source/sink offset을 REST로 조회·수정·리셋하는 1급 인터페이스를 추가해, 그동안 offset.storage.topic에 직접 쓰는 운영 트릭으로 다루던 일을 공식 API로 끌어올렸습니다.
KIP-618 — Source Connector Exactly-Once
Kafka Connect의 메시지 전달 보장은 오랫동안 비대칭이었습니다.
- Sink connector: 카프카에서 읽는 쪽이라 카프카 컨슈머의 일반 메커니즘으로 처리할 수 있고, 외부 시스템이 멱등하면 사실상 exactly-once였습니다.
- Source connector: 카프카에 쓰는 쪽인데, "외부 source offset을 카프카에 저장하는 일"과 "데이터를 카프카에 발행하는 일"이 별개의 produce였기 때문에 둘 사이에 장애가 나면 중복이 발생할 여지가 있었습니다. 즉 best effort at-least-once였습니다.
Apache Kafka 3.3(2022)에서 GA된 KIP-618은 source 쪽에도 exactly-once를 가져왔습니다. 핵심은 두 가지입니다.
1. 트랜잭셔널 프로듀서로 source offset과 payload를 묶기
Worker는 내부적으로 KafkaProducer로 source record를 발행합니다. KIP-618 이후, EOS가 켜진 source connector의 발행은 트랜잭션 안에서 이루어집니다.
producer.beginTransaction()
→ producer.send(SourceRecord 1..N)
→ producer.send(source offset to offset.storage.topic)
producer.commitTransaction()
같은 트랜잭션 안에서 사용자 토픽의 payload와 offset.storage.topic의 source offset이 함께 커밋되거나 함께 어보트됩니다. 컨슈머가 isolation.level=read_committed로 읽으면, 어보트된 트랜잭션의 메시지는 보이지 않으므로 중복이 사라집니다.
2. Zombie task fencing
Connect는 rebalance 도중 잠시 같은 Task의 두 인스턴스가 떠 있는 상황을 만들 수 있습니다(이전 Worker가 죽었다고 판단됐는데 사실은 살아 있던 경우). KIP-618은 이를 막기 위해 트랜잭셔널 ID와 producer epoch를 활용합니다. 새 Task 인스턴스가 자신을 register하면 producer epoch가 올라가고, 옛 Task의 produce는 broker에서 거절됩니다(InvalidProducerEpochException).
이 메커니즘이 동작하려면 connector 측에 트랜잭셔널 ID를 사용할 수 있는 ACL과 InitProducerId 같은 admin API 권한이 필요하며, KIP-618은 Connect framework가 워커 권한과 connector 권한을 분리해 사용하는 모델도 함께 정의했습니다.
활성화 방법과 제약
KIP-618을 켜려면 클러스터 전체에서 worker 설정 exactly.once.source.support를 단계적으로 올려야 합니다.
disabled(기본) — 기존 동작.preparing— 옛 worker와 새 worker의 호환 단계. exactly-once 트랜잭션 ID용 ACL을 사용 가능 상태로 두지만 아직 트랜잭션은 시작하지 않습니다.enabled— 모든 worker가 트랜잭셔널 모드로 동작.
그 위에 connector 단위로 exactly.once.support=requested(가능하면 사용) 또는 required(반드시, 안 되면 시작 거부)를 지정합니다. transaction.boundary=poll | interval | connector로 트랜잭션을 끊는 단위를 골라, 처리량과 latency를 트레이드오프할 수 있습니다.
다음 제약이 있습니다.
- distributed 모드 전용. standalone에서는 EOS source가 지원되지 않습니다.
- 클러스터 전체 설정. 한 클러스터 안에서 connector마다 켜고 끄는 식이 아닙니다(개별 connector는 EOS가 켜진 클러스터에서
required/requested로만 골라 씁니다). - Connector 책임. Source partition을 한 시점에 단 하나의 Task에만 배정해야 하고, source offset만 가지고 외부 시스템에서 재개할 수 있어야 합니다. 이 두 조건을 만족하지 못하는 connector는 EOS를 켜도 의미가 없습니다.
KIP-618이 도입된 뒤에야 비로소 Connect의 양방향 데이터 평면이 모두 exactly-once를 갖춘 1급 시스템이 되었다고 말할 수 있게 되었습니다.
한 번에 정리하기
Connect의 동작을 한 그림으로 요약하면 다음과 같습니다.
flowchart TB
subgraph EXT["External systems"]
DB[(Database)]
S3[(S3)]
end
subgraph CC["Connect cluster (Workers)"]
REST[REST :8083]
subgraph W["Worker JVM"]
ST[SourceTask]
SK[SinkTask]
PROD[Producer]
CONS[Consumer]
SMT[SMT chain]
CONV[Converter]
end
end
subgraph KK["Kafka cluster"]
UT[(user topics)]
CFG[(config.storage.topic)]
OFF[(offset.storage.topic)]
STAT[(status.storage.topic)]
CO[(__consumer_offsets)]
end
DB --> ST
ST --> SMT --> CONV --> PROD --> UT
PROD -. tx with payload .-> OFF
UT --> CONS --> SK --> S3
CONS -. commits .-> CO
REST -. read/write .-> CFG
REST -. read .-> STAT
W -. heartbeat / status .-> STAT
W -. consumes config .-> CFG
핵심을 다시 짚으면:
- 세 계층: Worker(프로세스), Connector(설정과 분할), Task(실제 IO).
- 두 방향: Source는
poll()로 끌어와 producer로 발행, Sink는 consumer로 받아put()으로 외부에 씀. - 파이프라인: SMT chain → Converter가 record 한 건의 변환 책임을 분리.
- 두 모드: standalone(로컬 파일, 단일 노드)과 distributed(세 내부 토픽 기반 클러스터).
- 세 내부 토픽: config(설정), offset(source 커서), status(상태). 모두 compact.
- KIP-415: incremental cooperative rebalancing으로 stop-the-world 제거.
- KIP-618: source connector의 카프카 발행과 offset 저장을 한 트랜잭션으로 묶어 exactly-once 달성.
- KIP-745 / KIP-875: 운영 표면(restart 세분화, offset 관리)을 REST 1급으로 끌어올림.
Connect는 카프카가 단순한 메시징 브로커를 넘어 데이터 이동의 표준 백본이 되도록 만든 핵심 부품입니다. 직접 producer/consumer를 짜는 코드 한 줄을 줄이는 대신, "외부 시스템과의 어댑터"를 표준 인터페이스로 떼어내 표준화된 운영·확장·내결함성·정확성을 한꺼번에 가져오는 것이 그 핵심입니다.
참고자료
- Apache Kafka 공식 문서, Connect 섹션 — https://kafka.apache.org/documentation/#connect
- Confluent Platform Connect Design — https://docs.confluent.io/platform/current/connect/design.html
- Confluent Platform Connect 개요 — https://docs.confluent.io/platform/current/connect/index.html
- Connect REST API Reference — https://docs.confluent.io/platform/current/connect/references/restapi.html
- Connect Worker Configuration Properties — https://docs.confluent.io/platform/current/connect/references/allconfigs.html
- Kafka Connect Deep Dive — Converters and Serialization Explained (Confluent Blog) — https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
- KIP-66: Single Message Transforms for Kafka Connect — https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect
- KIP-415: Incremental Cooperative Rebalancing in Kafka Connect — https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect
- KIP-618: Exactly-Once Support for Source Connectors — https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors
- KIP-745: Connect API to restart connector and tasks — https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308623
- KIP-875: First-class offsets support in Kafka Connect — https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
- Confluent Blog: Incremental Cooperative Rebalancing in Apache Kafka — https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
- Confluent Blog: Kafka Connect Improvements in Apache Kafka 2.3 — https://www.confluent.io/blog/kafka-connect-improvements-in-apache-kafka-2-3/
- Apache Kafka source —
connect/api(SourceTask,SinkTask,Transformation) — https://github.com/apache/kafka/tree/trunk/connect/api/src/main/java/org/apache/kafka/connect

