Project Reactor — Reactive Streams 위에서 backpressure가 흐르는 방법
Spring WebFlux, R2DBC, Reactor Kafka 같은 JVM 위의 비동기 스택은 모두 Project Reactor 위에 서 있습니다. 그런데
Flux와Mono는 도대체 어떻게 생긴 객체이고,.subscribe()를 호출하는 그 순간 안에서 무슨 일이 벌어지는지를 설명하기는 의외로 어렵습니다. 이 글은 Reactive Streams 명세 1.0.4와 Reactor Core 3.8.x를 기준으로, 네 개의 인터페이스에서 시작해 Reactor의 역방향 Subscriber 체인, Scheduler, 그리고 연산자 융합까지를 한 그림 위에서 풀어냅니다.
1. 왜 reactive인가
push의 함정
스트림을 다루는 가장 단순한 방법은 두 가지입니다.
- Pull (당기기): 소비자가 필요할 때
next()를 호출해서 한 건씩 가져온다.Iterator가 대표적입니다. - Push (밀기): 생산자가 데이터가 생기는 대로 소비자에게 전달한다.
Observer,Listener가 대표적입니다.
pull은 흐름 제어가 자연스럽습니다. 소비자가 호출하지 않으면 데이터는 생산자 쪽에 머물러 있기 때문입니다. 대신 소비자는 다음 데이터가 도착할 때까지 블로킹 됩니다.
push는 블로킹이 없는 대신 흐름 제어가 사라집니다. 생산자가 초당 10만 건을 만들고 소비자가 1만 건밖에 처리하지 못한다면, 둘 사이의 큐가 무한히 쌓이거나 어디선가 OOM이 발생합니다. 흔히 이야기하는 "fast producer, slow consumer" 문제입니다.
backpressure
push의 장점(비동기, non-blocking)을 유지하면서 흐름 제어를 되돌리는 방법이 backpressure입니다. 소비자가 생산자에게 "지금까지 내가 받아낼 수 있는 건수는 N건"이라고 신호를 보내고, 생산자는 그 demand만큼만 push 합니다. demand가 0이면 잠시 멈춥니다.
이 단순한 규약이 Reactive Streams 명세의 핵심입니다. push의 외형을 갖되, 흐름 제어는 소비자가 잡습니다.
flowchart LR
Producer[Publisher] -->|onNext x N| Consumer[Subscriber]
Consumer -->|request N| Producer
Consumer -->|cancel| Producer
같은 문제를 풀기 위한 흐름 제어가 네트워크의 TCP receive window, 메시지 큐의 prefetch, OS 파이프의 PIPE_BUF 한도에도 있습니다. Reactive Streams는 이런 흐름 제어를 애플리케이션 수준의 객체 호출 규약으로 표준화한 것입니다.
2. Reactive Streams — 네 개의 인터페이스
Reactive Streams는 2015년에 JVM·JavaScript·.NET을 가로지르는 표준으로 출범했고, JVM 명세는 2019년 1.0.3, 2022년 1.0.4까지 다듬어졌습니다. JVM 명세는 네 개의 인터페이스와 한 묶음의 규칙으로 정의됩니다. 코드 자체는 의외로 짧습니다.
Publisher — 데이터의 근원
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
메서드는 단 하나, subscribe뿐입니다. 호출자가 자신을 구독하면 Publisher는 신호를 그 구독자에게 흘려보내기 시작합니다.
Publisher는 "원소를 가지고 있는 컬렉션"이 아닙니다. subscribe가 호출되기 전까지는 아무 일도 일어나지 않습니다. Reactor의 Flux.just(1, 2, 3)은 1, 2, 3을 담고 있는 자료구조가 아니라 "구독되면 1, 2, 3을 흘려보낼 의도"를 표현한 객체입니다. 이 게으른 평가는 뒤에 나올 연산자 체인 전체에 적용됩니다.
Subscriber — 데이터의 종착지
public interface Subscriber<T> {
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
구독자는 네 종류의 콜백을 받습니다.
onSubscribe(s)— 구독이 성립되면 가장 먼저 한 번 호출됩니다. 이 안에서 받은Subscription을 보관해 두었다가 demand를 신호합니다.onNext(t)— 데이터 한 건이 도착했을 때 호출됩니다.onError(t)— 오류로 종료됩니다. 더 이상 신호가 없습니다.onComplete()— 정상 종료됩니다. 더 이상 신호가 없습니다.
명세는 신호의 순서를 정규 표현식으로 못 박습니다.
onSubscribe onNext* (onError | onComplete)?
onSubscribe는 반드시 한 번, 그 뒤에 onNext가 0회 이상, 마지막에 종료 신호가 0회 또는 1회 도달합니다. 종료 신호 이후의 신호는 명세 위반입니다.
Subscription — 둘 사이의 다리
public interface Subscription {
void request(long n);
void cancel();
}
Subscription은 Publisher와 Subscriber가 서로를 모르는 채로 demand를 주고받게 하는 다리입니다. Subscriber는 자신이 받을 수 있는 건수를 request(n)으로 신호하고, 더 이상 받지 않을 때 cancel()로 끊습니다.
명세가 못 박은 몇 가지 규칙이 있습니다.
- 누적된 demand는 최대
Long.MAX_VALUE(2^63 - 1)까지 가능합니다. 사실상의 무한 demand에는request(Long.MAX_VALUE)를 보냅니다. request(n)에n <= 0이 들어오면Publisher는IllegalArgumentException을onError로 흘려보냅니다.Publisher가onNext로 흘려보낸 총 건수는 언제나 누적 demand 이하여야 합니다.
마지막 규칙이 backpressure의 본체입니다. 누적 demand는 단조 증가하는 숫자이고, onNext마다 1씩 줄어드는 잔여 demand가 0이 되면 Publisher는 흐름을 멈춰야 합니다.
Processor — 둘 다인 존재
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
Processor는 위쪽에서 들어오는 신호를 받아 가공해서 아래쪽으로 흘려보내는 중간 단계입니다. Reactive Streams 1.0.4 이후 직접적인 사용은 권장되지 않지만, 개념적으로는 "모든 operator는 Processor"라고 보면 됩니다. Reactor에서는 Sinks API가 이 자리를 대체합니다.
TCK — 명세 준수의 증거
말로 적힌 규칙만으로는 구현체가 명세를 지키는지 확인하기 어렵습니다. Reactive Streams는 그래서 TCK(Technology Compatibility Kit) 라는 테스트 묶음을 함께 배포합니다. reactive-streams-tck와 tck-flow 두 아티팩트가 있고, Reactor를 비롯해 RxJava, Akka Streams, Mutiny 같은 구현체는 모두 이 TCK를 통과합니다. 서로 다른 라이브러리가 한 파이프라인에 섞여 들어가도 신호 규약은 같다는 보장이 여기서 옵니다.
3. JDK 9의 java.util.concurrent.Flow
명세가 안정화되자 JDK 9가 이를 표준 API로 흡수했습니다. JEP 266이 도입한 java.util.concurrent.Flow 클래스는 네 개의 nested 인터페이스를 갖습니다.
public final class Flow {
public interface Publisher<T> { void subscribe(Subscriber<? super T> s); }
public interface Subscriber<T> { /* onSubscribe / onNext / onError / onComplete */ }
public interface Subscription { void request(long n); void cancel(); }
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
}
이름과 메서드 서명이 org.reactivestreams.*와 정확히 같습니다. JDK가 명세를 그대로 표준 라이브러리로 받아들였기 때문입니다.
Reactor는 두 세계를 잇는 어댑터 JdkFlowAdapter를 제공합니다.
Flow.Publisher<String> jdk = JdkFlowAdapter.publisherToFlowPublisher(fluxOfString);
Flux<String> flux = JdkFlowAdapter.flowPublisherToFlux(jdkPublisher);
같은 신호 규약을 따르므로 변환은 본질적으로 인터페이스 캐스팅에 가깝습니다. 두 표준을 같은 모양으로 유지한 보상입니다.
4. Project Reactor — Flux와 Mono
Project Reactor는 Reactive Streams 명세를 구현한 라이브러리이자, 그 위에 풍부한 operator를 얹은 도구상자입니다. 작성 시점 기준 최신 안정 버전은 Reactor Core 3.8.x이고, 베이스라인은 JDK 8입니다(JDK 9 / 21 multi-release JAR로 패키지됨).
핵심 타입은 두 가지입니다.
| 타입 | 의미 | Publisher 관점 |
|---|---|---|
Flux<T> |
0개에서 N개의 원소를 흘려보내는 스트림 | Publisher<T> |
Mono<T> |
0개 또는 1개의 원소를 흘려보내는 스트림 | Publisher<T> |
둘 다 Publisher<T>를 구현합니다. Mono는 단지 "원소가 0개 또는 1개"라는 추가 제약을 가진 Flux이고, 그 제약 덕분에 합성·조합 시 더 간결한 API를 가질 수 있습니다.
Reactor 내부에는 Publisher/Subscriber보다 약간 더 부유한 CorePublisher/CoreSubscriber 인터페이스가 있습니다. 표준 Publisher/Subscriber를 호환 가능하게 구현하면서 컨텍스트 전파(Context) 같은 Reactor 고유 기능을 지원하기 위한 확장입니다. 외부에서 보면 그냥 Publisher로 보입니다.
게으른 평가
Flux<Integer> flux = Flux.just(1, 2, 3)
.map(n -> n * 10)
.filter(n -> n > 10);
System.out.println("subscribe before");
// 아직 아무것도 출력되지 않음
flux.subscribe(System.out::println);
// 여기서야 20, 30 출력
.map이나 .filter는 새 Flux를 만들 뿐입니다. 실제로 1, 2, 3을 흘려보내는 일은 subscribe가 호출된 순간에 처음 일어납니다. Reactor에서 Publisher 선언은 곧 청사진이고, subscribe가 그 청사진을 실행하는 트리거입니다.
5. subscribe 안에서 일어나는 일 — 역방향 Subscriber 체인
operator를 chaining 하는 코드는 위에서 아래로 흐르는 것처럼 보입니다.
Flux.range(1, 100)
.map(n -> n * 2)
.filter(n -> n % 3 == 0)
.take(5)
.subscribe(System.out::println);
하지만 실제로 신호가 흐르는 방향은 정반대입니다. 구독 시점에 Reactor는 subscribe에서 시작해서 source 쪽으로 거슬러 올라가며 중간 Subscriber를 쌓습니다.
flowchart TD
Source["Flux.range"] -->|onNext| MapSub["MapSubscriber"]
MapSub -->|onNext| FilterSub["FilterSubscriber"]
FilterSub -->|onNext| TakeSub["TakeSubscriber"]
TakeSub -->|onNext| FinalSub["LambdaSubscriber"]
FinalSub -.->|request| TakeSub
TakeSub -.->|request| FilterSub
FilterSub -.->|request| MapSub
MapSub -.->|request| Source
구체적인 순서는 다음과 같습니다.
- 호출자가
subscribe를 호출하면 가장 아래쪽Flux(take결과)가 자신의subscribe(actual)를 실행합니다. take연산자는actual(원래 람다)을 감싸는TakeSubscriber를 만들고, 위쪽인filterFlux에subscribe(takeSubscriber)를 호출합니다.filter는takeSubscriber를 감싸는FilterSubscriber를 만들고 그 위로subscribe를 또 호출합니다.- 같은 방식으로
MapSubscriber까지 만들어지고, 마지막에Flux.range가 자신의 Subscription을 가장 위쪽 Subscriber에게onSubscribe로 건넵니다. - 그
onSubscribe는 체인을 따라 아래로 전파되고, 람다 Subscriber까지 도달했을 때 비로소request(n)이 발사됩니다.
request는 체인을 거꾸로 거슬러 올라가 source에 닿고, 그제야 첫 번째 onNext가 흘러내려옵니다.
이 구조의 핵심은 두 가지입니다.
- 모든 신호는 인접한 두 Subscriber 사이에서만 오갑니다. 다섯 단계 체인이라도 source는 자신이 어떤
take/filter/map의 일부인지 알 필요가 없습니다. - operator마다 자신만의 Subscriber 구현이 있습니다.
MapSubscriber는onNext를 가로채 mapper를 적용한 결과를 아래쪽onNext로 전달하는 식입니다. operator 코드를 따라 읽고 싶다면reactor.core.publisher.FluxMap,FluxFilter,FluxTake같은 클래스를 보면 됩니다.
6. Scheduler — 신호가 어느 스레드 위에서 흐르는가
Reactor는 기본적으로 현재 스레드에서 동기적으로 신호를 흘려보냅니다. 비동기와 multi-threading은 별개의 문제이고, Reactor는 후자를 별도의 도구로 분리해서 제공합니다.
네 가지 표준 Scheduler
| 팩토리 | 백킹 스레드 | 용도 |
|---|---|---|
Schedulers.parallel() |
CPU 코어 수만큼 고정 풀 | CPU 바운드 작업 |
Schedulers.boundedElastic() |
기본 상한 10 × CPU 코어, 워커당 큐 100,000건 | 블로킹 I/O |
Schedulers.single() |
단일 스레드 (전역 공유) | 순서가 보장돼야 하는 가벼운 작업 |
Schedulers.immediate() |
호출자 스레드 그대로 | 스케줄러 자리를 비우는 no-op |
parallel()과 boundedElastic()은 글로벌 싱글톤이라 operator의 기본 Scheduler로도 쓰입니다. Flux.interval, Mono.delay처럼 시간 기반 operator는 parallel() 위에서 기본 동작합니다.
이전 버전에 있던 Schedulers.elastic()은 상한 없이 무한히 스레드를 만들 수 있어 backpressure 문제를 가립니다. 그래서 deprecate 되었고 그 자리를 boundedElastic()이 대신합니다.
publishOn vs subscribeOn
스케줄러를 적용하는 두 operator의 차이는 Reactor를 처음 만날 때 가장 자주 혼동되는 부분입니다.
Flux.range(1, 5)
.map(this::cpuHeavy) // 어디서 실행?
.publishOn(Schedulers.parallel())
.map(this::moreCpu) // 어디서 실행?
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
규칙은 단순합니다.
publishOn(S)은 그 자리 이후의 신호를S위에서 흘립니다. 체인 안 어디에 두느냐에 따라 효과가 달라집니다. 위 예시에서 첫 번째map은boundedElastic위에서, 두 번째map은parallel위에서 실행됩니다.subscribeOn(S)은 source의 구독을S위에서 일으킵니다. 체인 어디에 두느냐와 무관하게 가장 위 source부터 영향을 줍니다. 두 번 이상 두면 가장 가까운 source 쪽의 것만 효과가 있습니다.
publishOn은 컨베이어 벨트 중간에 환적장을 설치하는 것이고, subscribeOn은 컨베이어 시동 자체를 어디서 거는지 정하는 것이라고 생각하면 헷갈리지 않습니다.
flowchart LR
Source[Flux.range] -->|elastic| Map1[map: cpuHeavy]
Map1 -->|publishOn parallel| Map2[map: moreCpu]
Map2 -->|parallel| Sink[subscribe]
블로킹 호출을 reactive 체인 안에서 어쩔 수 없이 써야 한다면 그 부분만 publishOn(Schedulers.boundedElastic())으로 격리하는 것이 표준 패턴입니다. parallel 풀 위에서 블로킹 호출을 하면 CPU 코어 수만큼만 있는 스레드가 모두 막혀 reactive 전체가 멈춥니다.
7. 연산자 융합 — Fuseable
긴 operator 체인은 단점도 있습니다. 매 단계마다 Subscriber 객체와 그 사이를 흐르는 onNext 호출이 생기기 때문입니다. 10단 체인이라면 한 원소가 10번의 가상 호출을 거쳐 간다는 뜻입니다.
Reactor는 이 비용을 줄이기 위해 operator fusion이라는 최적화를 적용합니다. 인접한 두 operator가 "융합 가능"하면 사실상 한 Subscriber로 합쳐서 호출 깊이를 줄입니다.
Fuseable.QueueSubscription
핵심 인터페이스는 Fuseable.QueueSubscription입니다.
public interface QueueSubscription<T> extends Queue<T>, Subscription {
int requestFusion(int requestedMode);
// ANY = 0, SYNC = 1, ASYNC = 2, NONE = 4
}
Subscription이면서 동시에 Queue<T>인 이 특수한 구독은 두 가지 융합 모드를 지원합니다.
- SYNC fusion: source가 동기적으로 원소를 다 가지고 있을 때(
Flux.just,Flux.fromIterable같은 경우). 다운스트림은onNext를 거치지 않고poll()로 직접 원소를 꺼냅니다. - ASYNC fusion: source가 비동기로 원소를 큐에 넣고(
UnicastProcessor,unicast Sinks같은 경우) 다운스트림은poll()로 가져갑니다. 큐 자체가 신호와 원소를 한 번에 표현합니다.
융합이 가능한 operator 짝을 만나면 Reactor는 둘을 하나의 Subscriber로 묶어 호출 비용을 줄입니다. 가능하지 않은 짝은 평범한 onNext 체인으로 떨어집니다.
이 최적화는 사용자가 직접 알아야 할 동작은 아닙니다. 다만 Reactor가 단순히 인터페이스를 충실히 구현한 라이브러리가 아니라 hot-path 비용을 깎으려는 의도가 깊게 박힌 라이브러리라는 사실을 보여주는 단면입니다.
8. Hot vs Cold publisher
마지막 한 갈래는 "한 Publisher를 두 명이 동시에 구독하면 어떻게 되는가"입니다.
Cold publisher는 구독자마다 새로운 데이터 시퀀스를 만들어 흘려보냅니다. Flux.range(1, 5)를 두 번 구독하면 두 구독자 모두 1부터 5까지를 처음부터 받습니다. HTTP 요청을 시작점으로 갖는 WebClient.get().retrieve() 같은 호출도 cold입니다. 매 구독마다 새 요청이 나갑니다.
Hot publisher는 구독 여부와 무관하게 흘러가는 시퀀스를 여러 구독자가 같이 듣습니다. 마이크 같은 식입니다. 구독이 늦으면 그 사이의 신호는 듣지 못합니다. Sinks.many().multicast()로 만든 sink가 대표적입니다.
cold를 hot으로 바꾸는 가장 흔한 방법은 share(), replay(), cache() operator입니다. 같은 source에 들어가는 비용이 큰 작업(HTTP 호출, DB 조회)을 한 번만 일으키고 결과를 여러 곳에서 듣고 싶을 때 씁니다.
flowchart TD
Cold[Cold Publisher] -->|subscribe 1| S1[Subscriber A]
Cold -->|subscribe 2| S2[Subscriber B]
Hot[Hot Publisher] -->|stream| Multicaster
Multicaster -->|fanout| H1[Subscriber A]
Multicaster -->|fanout| H2[Subscriber B]
명세 자체는 hot/cold 구분을 강제하지 않습니다. 같은 Publisher 인터페이스가 두 의미를 모두 표현할 수 있습니다. 이 자유로움은 가끔 디버깅을 어렵게 하지만, 모델의 일관성을 유지하기 위한 의도된 단순함입니다.
9. 한눈에 정리
지금까지의 그림을 한 장에 모으면 이렇습니다.
flowchart TB
subgraph Spec["Reactive Streams Spec 1.0.4"]
P[Publisher]
S[Subscriber]
Sub[Subscription]
Proc[Processor]
end
subgraph JDK["JDK 9 java.util.concurrent.Flow"]
FP[Flow.Publisher]
FS[Flow.Subscriber]
end
subgraph Reactor["Project Reactor 3.8.x"]
Flux
Mono
Sinks
Scheduler[Schedulers.parallel / boundedElastic / single]
Fuseable[Fuseable: QueueSubscription]
end
P --> FP
P --> Flux
P --> Mono
Flux --> Scheduler
Flux --> Fuseable
핵심을 다섯 줄로 줄이면 다음과 같습니다.
- Reactive Streams는 push 모델의 외형 위에 소비자가 demand를 신호하는 backpressure를 도입한 네 인터페이스 명세입니다.
- 모든 Publisher는
onSubscribe onNext* (onError | onComplete)?순서로 신호를 흘리고, demand는 누적Long.MAX_VALUE까지 가능합니다. - JDK 9의
java.util.concurrent.Flow는 같은 모양을 표준 라이브러리에 흡수한 것이고, Reactor는JdkFlowAdapter로 둘 사이를 잇습니다. - Reactor의
Flux/Mono는 게으른 청사진이고,subscribe가 호출되면 operator마다 만들어진 Subscriber가 source 쪽으로 거슬러 올라가며 체인을 이룹니다. 신호는 위에서 아래로, demand는 아래에서 위로 흐릅니다. - 스레드는
Schedulers로 따로 결정합니다.publishOn은 그 자리 이후의 신호 스레드를,subscribeOn은 source 구독이 일어나는 스레드를 정합니다.
이 다섯 줄을 기둥 삼아 WebFlux의 Netty event loop, R2DBC의 connection 획득, Reactor Kafka의 KafkaReceiver 같은 상위 스택을 따라 들어가면 같은 그림이 반복적으로 등장합니다. Spring 진영의 reactive 스택이 한 회사 한 라이브러리 위에서 같은 추상화를 공유한다는 사실은, 그래서 우연이 아닙니다.
참고자료
- Reactive Streams JVM Specification 1.0.4 (GitHub)
- Reactive Streams — Official Site
- JEP 266: More Concurrency Updates
java.util.concurrent.FlowJavadoc (Java SE 21)- Project Reactor — Reactor Core Reference Guide
- Reactor Core — Schedulers API
- Reactor Core — Fuseable API
- Reactor Core — GitHub
- Spring Blog — Flight of the Flux 3: Hopping Threads and Schedulers
- Hashnode Tags JSON

