Skip to main content

Command Palette

Search for a command to run...

Java Stream API 내부 구조 — Spliterator와 ReferencePipeline의 게으른 평가

Updated
15 min read

Java 8에 등장한 Stream API는 이제 자바 개발자라면 누구나 매일같이 쓰는 도구입니다. 그러나 list.stream().filter(...).map(...).collect(...) 한 줄 안에서 실제로 어떤 객체들이 어떤 순서로 협력하는지를 머릿속에 그릴 수 있는 사람은 의외로 많지 않습니다. 이 글은 OpenJDK 소스 기준으로 Stream 파이프라인의 세 가지 핵심 부품(Spliterator, AbstractPipeline 그리고 Sink)이 어떻게 맞물려 "게으른 평가(lazy evaluation)"라는 약속을 지켜내는지를, 코드 한 줄이 실제로 어떤 호출 체인을 만들어 내는지까지 따라가며 정리합니다. Java 21 / OpenJDK master 브랜치 기준입니다.

Stream API에 익숙해진 뒤에도 다음과 같은 질문에는 의외로 답하기 어렵습니다.

  • stream().filter(...).map(...)만 부르고 collect를 부르지 않으면 정말 아무 일도 일어나지 않는가요?
  • filter 다음에 map이 오면, 컬렉션을 두 번 순회하는 것인가요, 한 번만 도는 것인가요?
  • 무한 스트림(Stream.iterate(...))에 sorted()를 붙이면 왜 멈추지 않을까요?
  • parallelStream()은 어떻게 일을 쪼갤 시점을 알까요?

이 모든 질문은 결국 하나의 사실로 귀결됩니다 — Stream은 자료구조가 아니라 파이프라인의 명세라는 사실입니다. 그리고 그 명세를 "실행"하는 건 단 한 번, 터미널 연산을 호출하는 순간 뿐입니다.

이 글에서는 다음 순서로 그 실행 모델을 따라갑니다.

  1. Stream을 구성하는 세 가지 부품 — Spliterator, Pipeline, Sink
  2. stream() 호출이 만드는 자료구조 — 두 방향으로 연결된 스테이지 리스트
  3. 터미널 연산이 트리거하는 실행 — wrapSinkcopyInto
  4. 한 번의 패스로 모든 중간 연산이 합쳐지는 이유
  5. Short-circuit이 동작하는 메커니즘 — cancellationRequested
  6. Stateful 연산 — 왜 sorted()는 모든 원소를 봐야 하는가
  7. 병렬 스트림 — trySplit와 Fork/Join

마지막에는 이 모델을 알고 나서야 보이는 실수 몇 가지를 정리합니다.

큰 그림 — Stream을 떠받치는 세 가지 부품

먼저 결론에 해당하는 그림을 봅시다. list.stream().filter(p).map(f).collect(...)이라는 한 줄은 내부적으로 다음 세 종류의 객체가 서로를 가리키고 있는 그래프입니다.

flowchart LR
    SRC[Spliterator<br/>over List] --> H[Head<br/>AbstractPipeline]
    H --> F[StatelessOp<br/>filter]
    F --> M[StatelessOp<br/>map]
    M --> T[TerminalOp<br/>collect]

    T -. wrapSink .-> SM[Sink: map adapter]
    SM -. downstream .-> SF[Sink: filter adapter]
    SF -. downstream .-> ST[Sink: collector]

세 부품을 한 줄로 요약하면 다음과 같습니다.

  • Spliterator — 원소를 하나씩 꺼낼 수도 있고, 둘로 쪼갤 수도 있는 "분할 가능한 Iterator"입니다. Stream의 데이터 소스 역할을 합니다.
  • AbstractPipeline / ReferencePipeline — 각 중간 연산을 표현하는 스테이지(stage) 객체입니다. filter, map 한 번 호출이 스테이지 하나에 해당합니다. 이 스테이지들은 양방향 연결 리스트로 이어져 파이프라인을 이룹니다.
  • Sink — 실제로 원소를 받아 다음 단계로 흘려보내는 "실행 시점의 콜백 체인"입니다. 터미널 연산을 호출하는 순간에야 비로소 만들어집니다.

비유하자면 Pipeline은 설계도, Sink는 공장 라인, Spliterator는 원료를 공급하는 컨베이어 벨트입니다. 설계도는 미리 그려 두지만, 라인을 깔고 컨베이어를 돌리는 건 마지막에 단 한 번뿐입니다.

스테이지 — 호출이 만드는 양방향 리스트

Stream.of(1, 2, 3).filter(...).map(...)을 부르는 동안 호출되는 메서드는 모두 ReferencePipeline에 정의돼 있습니다. OpenJDK의 실제 코드를 보면 filter는 다음과 같이 구현돼 있습니다.

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<>(this, StreamShape.REFERENCE,
        StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }
                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

여기서 중요한 사실은 두 가지입니다.

첫째, filter 호출이 즉시 만든 것은 StatelessOp라는 새로운 스테이지일 뿐, 어떤 원소도 처리되지 않습니다. 즉 filter(...) 자체는 순수하게 자료구조를 한 칸 늘리는 동작입니다.

둘째, opWrapSink는 호출되지 않습니다. opWrapSink는 "이 스테이지가 실행되는 시점에, 다음 스테이지의 Sink를 감싸서 새 Sink를 만들어 주는 방법"을 정의해 둔 메서드일 뿐, 정의될 때 실행되지는 않습니다. 즉 filter메서드 디스패치 테이블을 한 칸 추가하는 동작에 가깝습니다.

map도 형태가 동일합니다.

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<>(this, StreamShape.REFERENCE,
        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<>(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

filter와의 유일한 의미 있는 차이는 accept 안에서 mapper.apply를 호출해서 원소를 변환한 뒤 downstream으로 넘긴다는 점입니다. filter는 downstream을 호출하지 않을 수도 있고(predicate가 false면 그냥 버립니다), map은 반드시 호출합니다.

AbstractPipeline의 생성자를 보면 새 스테이지가 이전 스테이지와 어떻게 묶이는지가 드러납니다(개념적으로 다음과 같은 필드를 갖습니다).

abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {

    private final AbstractPipeline sourceStage;   // Head 스테이지
    private final AbstractPipeline previousStage; // 이전 스테이지
    private AbstractPipeline nextStage;           // 다음 스테이지 (다음 연산 추가 시 설정)
    private int depth;                            // sourceStage = 0, +1씩 증가
    private int sourceOrOpFlags;                  // 스테이지 단위 플래그
    ...
}

StatelessOp를 만들 때 이전 스테이지(previousStage)에 자기 자신을 nextStage로 연결합니다. 결과적으로 다음과 같은 양방향 연결 리스트가 만들어집니다.

Head ⇄ filter(StatelessOp) ⇄ map(StatelessOp) ⇄ ...
depth=0     depth=1            depth=2

이 구조 덕분에 나중에 터미널 연산이 시작될 때 가장 뒤쪽 스테이지에서 시작해 앞으로 거슬러 올라가며 Sink 체인을 만들 수 있습니다.

linkedOrConsumed — 두 번 쓰지 못하는 이유

스테이지 객체에는 매우 중요한 boolean이 하나 더 있습니다.

/**
 * True if this pipeline has been linked or consumed
 */
private boolean linkedOrConsumed;

protected void linkOrConsume() {
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;
}

이 플래그는 두 가지 시점에 true로 바뀝니다 — 이 스테이지 뒤에 새 스테이지가 붙을 때, 그리고 터미널 연산이 호출되어 이 스테이지가 소비될 때입니다. 한 번 true가 되면 다음 호출은 즉시 IllegalStateException을 던집니다.

이 한 줄짜리 가드 덕분에 "스트림은 한 번만 쓸 수 있다"라는 자바의 약속이 성립합니다. 이미 collect한 스트림 객체에 다시 map을 부르면 즉시 에러가 나는 이유도, 같은 스트림 변수에 두 번 터미널 연산을 부를 수 없는 이유도 모두 같은 메커니즘입니다.

Stream<Integer> s = list.stream().filter(x -> x > 0);
long c1 = s.count();
long c2 = s.count(); // IllegalStateException: stream has already been operated upon or closed

Spliterator — Iterator를 넘어선 분할 가능 순회자

스테이지가 "무엇을 할지"의 설계도라면, Spliterator는 "어디서 원소를 가져올지"의 책임을 집니다.

이름 자체가 split + iterator라는 합성어임에서 드러나듯, Spliterator는 두 가지 일을 합니다. 하나씩 꺼내는 일과, 둘로 쪼개는 일입니다. 핵심 메서드는 네 개입니다.

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action);
    Spliterator<T> trySplit();
    long estimateSize();
    int characteristics();

    default void forEachRemaining(Consumer<? super T> action) {
        do {
        } while (tryAdvance(action));
    }
}
  • tryAdvance — 다음 원소가 있으면 action에 넘기고 true를 반환, 없으면 false. Iterator의 hasNext+next를 하나로 합친 것에 해당합니다.
  • forEachRemaining — 기본 구현은 tryAdvance를 반복 호출하지만, 가능한 모든 Spliterator는 이를 오버라이드해서 더 빠른 일괄 순회를 제공합니다. 시퀀셜 스트림의 핫 패스는 사실상 이 메서드입니다.
  • trySplit — 자신이 가진 원소의 일부를 잘라 새 Spliterator로 반환. 부모는 나머지를 그대로 갖습니다. 더 쪼갤 수 없으면 null. 이게 병렬 스트림의 핵심입니다.
  • characteristics — 자신과 자신이 다루는 원소의 성격을 비트마스크로 알려 줍니다.

특성 플래그는 모두 8개로, Stream의 최적화 결정에 직접 영향을 줍니다.

플래그 의미
ORDERED 원소 순서가 의미 있음. Listtrue, HashSetfalse
DISTINCT 어떤 두 원소도 equals로 같지 않음
SORTED Comparator(기본은 자연순서)에 따라 정렬돼 있음
SIZED estimateSize()가 정확한 개수를 반환함
NONNULL 원소에 null이 없음
IMMUTABLE 순회 도중 소스가 변경되지 않음
CONCURRENT 동시 수정에 안전한 소스
SUBSIZED trySplit이 만들어 내는 자식들도 모두 SIZED

ArrayListspliterator()SIZED | SUBSIZED | ORDERED를 반환합니다. 크기가 정확하고, 둘로 잘라도 각각 정확한 크기이며, 원소에 순서가 있다는 뜻입니다.

HashSetspliterator()SIZED | DISTINCT만 반환합니다. 순서가 없으므로 ORDERED가 빠지고, 해시 테이블 분할 특성상 둘로 자른 결과의 크기를 정확히 알 수 없어 SUBSIZED도 빠집니다.

이 차이는 그저 정보가 아니라 실제 최적화 결정으로 이어집니다. 예컨대 ORDERED가 아닌 소스에 .findFirst()를 부르면, JDK는 그냥 첫 번째로 만난 원소를 반환해도 무방하다고 판단할 수 있습니다.

Sink — 실행 시점의 함수 체인

Pipeline이 설계도라면, Sink는 실행 시점에 만들어지는 실제 라인입니다. SinkConsumer를 상속한 인터페이스로, 데이터의 시작과 끝을 알리는 메서드를 추가로 갖습니다.

interface Sink<T> extends Consumer<T> {
    default void begin(long size) {}
    default void end() {}
    default boolean cancellationRequested() { return false; }

    @Override
    void accept(T t);
}

핵심은 begin → accept(n번) → end 의 라이프사이클이 약속돼 있다는 점입니다. begin은 데이터가 들어오기 직전에 한 번 호출되고, accept로 모든 원소를 처리한 뒤 end가 한 번 호출됩니다.

이 약속이 왜 필요한지는 sorted()collect(toList()) 같은 연산을 떠올리면 분명합니다. 정렬은 모든 원소를 모아 두었다가 마지막에 정렬해서 흘려보내야 하므로, accept에서는 모으기만 하고 실제 정렬은 end에서 합니다. toList()도 마찬가지로 begin에서 적절한 크기의 리스트를 준비하고, accept에서 채우고, end에서 결과를 마감합니다.

스테이지를 연결할 때 쓰는 보조 클래스가 Sink.ChainedReference입니다.

abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
    protected final Sink<? super E_OUT> downstream;

    public ChainedReference(Sink<? super E_OUT> downstream) {
        this.downstream = Objects.requireNonNull(downstream);
    }

    @Override
    public void begin(long size) { downstream.begin(size); }

    @Override
    public void end() { downstream.end(); }

    @Override
    public boolean cancellationRequested() {
        return downstream.cancellationRequested();
    }
}

여기서 보이는 패턴은 데코레이터입니다. 각 중간 연산이 만드는 Sink는 자기 자신만의 변환 로직을 accept에 넣고, begin/end/cancellationRequested는 기본적으로 downstream으로 그대로 위임합니다. 그래서 위에서 본 filter 구현이 accept만 오버라이드하고 begin은 굳이 짧게만 손대고 있는 것입니다.

터미널 연산 — wrapSink와 copyInto

지금까지 본 모든 코드는 사실상 실행을 미루기 위한 코드였습니다. 실제 실행은 터미널 연산이 호출되는 순간 시작합니다. AbstractPipeline.evaluate가 그 진입점입니다.

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;
    return isParallel()
        ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

evaluate는 두 가지를 합니다. linkedOrConsumed를 세팅해서 이 스트림을 한 번만 쓰도록 잠그고, 시퀀셜이면 evaluateSequential로, 병렬이면 evaluateParallel로 분기합니다.

시퀀셜 경로의 핵심은 wrapAndCopyInto입니다.

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink,
                                                      Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}

이름이 모든 걸 말해 줍니다 — 터미널 Sink를 받아서, 중간 연산들로 한 겹씩 wrap(감싸고), 그 결과를 소스 Spliterator에 copyInto(부어 넣습니다).

wrapSink는 마지막 스테이지부터 거꾸로 거슬러 올라갑니다.

@Override
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    Objects.requireNonNull(sink);
    for (AbstractPipeline p = AbstractPipeline.this; p.depth > 0; p = p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}

이게 바로 앞서 정의해 둔 opWrapSink가 비로소 호출되는 시점입니다. map 스테이지의 opWrapSink는 터미널 Sink를 자신의 ChainedReference로 감싸고, filter 스테이지의 opWrapSink는 그 결과를 다시 한 번 감쌉니다. 최종 반환값은 소스 입력 타입을 받는 Sink입니다.

그 다음 copyInto가 실행됩니다.

final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

세 줄짜리 핵심 경로가 보입니다 — begin 한 번, Spliterator의 forEachRemaining으로 일괄 순회하면서 accept n번, 마지막에 end 한 번. 이게 전부입니다.

한 번의 패스 — 게으름이 만드는 효율성

여기서 자주 놓치는 사실 하나를 짚어 봅시다. filter 다음에 map이 와도 컬렉션을 두 번 도는 게 아닙니다. 단 한 번 돕니다.

왜 그런지는 위에서 본 Sink 체인을 떠올리면 명확합니다. 각 원소는 다음과 같은 호출 체인을 따라갑니다.

flowchart TB
    A[Spliterator.forEachRemaining]
    B[filter Sink.accept x]
    C{predicate.test x}
    D[map Sink.accept x]
    E[terminal Sink.accept f x]

    A --> B
    B --> C
    C -- true --> D
    C -- false --> A
    D --> E
    E -- next element --> A

filter의 Sink는 predicate.test(u)falsedownstream.accept를 부르지 않습니다. 그러면 그 원소는 그 자리에서 사라지고, Spliterator는 곧장 다음 원소로 넘어갑니다. 컬렉션은 처음부터 끝까지 단 한 번만 순회되고, 살아남은 원소들만이 map까지 도달합니다.

이 동작 방식은 종종 operation fusion(연산 융합) 이라고도 표현됩니다. 자바의 공식 문서가 명시적으로 약속하는 효율의 근거이기도 합니다 — 인용하면, "filtering, mapping, and summing can be fused into a single pass on the data, with minimal intermediate state."

이는 imperative 버전과 등가입니다.

// stream version
list.stream()
    .filter(x -> x > 0)
    .map(x -> x * 2)
    .collect(toList());

// 위와 동등한 imperative 코드
List<Integer> result = new ArrayList<>();
for (Integer x : list) {        // 한 번의 순회
    if (x > 0) {                 // filter 인라인
        Integer mapped = x * 2;  // map 인라인
        result.add(mapped);      // collect 인라인
    }
}

차이는 표현 방식뿐, 실제 실행되는 작업의 양은 같습니다.

Short-circuit — cancellationRequested의 역할

findFirst, anyMatch, limit 같은 연산은 "필요한 만큼만 보고 멈춰야" 합니다. 무한 스트림에 findFirst를 부르면 무한히 도는 게 아니라 첫 매칭이 나오면 끝나야 합니다.

이를 위해 Sink 인터페이스에는 cancellationRequested()가 있습니다. copyIntoSHORT_CIRCUIT 플래그를 보면 일반 경로 대신 copyIntoWithCancel을 부르는데, 그 안에서는 다음과 같이 매 원소마다 취소 여부를 확인합니다.

final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    boolean cancelled;
    do { } while (!(cancelled = sink.cancellationRequested())
                  && spliterator.tryAdvance(sink));
    return cancelled;
}

tryAdvance로 원소를 한 개씩 가져오면서, 매 순회 직전에 Sink에게 "이제 그만 받고 싶니?"라고 묻습니다. findFirst의 Sink는 첫 매칭을 받는 순간부터 cancellationRequestedtrue를 반환하므로 루프가 끝납니다.

limit(n)의 Sink도 마찬가지로 n개를 받은 시점부터 true를 반환해서, 무한 스트림이라도 정확히 n개에서 멈추게 만듭니다.

이 메커니즘은 비-쇼트서킷 경로(forEachRemaining을 호출하는 빠른 일괄 순회)와 달리 원소당 호출 오버헤드가 더 크다는 점에서 트레이드오프가 있습니다. 그래서 OpenJDK는 파이프라인 플래그에 SHORT_CIRCUIT이 있을 때만 이 경로를 탑니다.

Stateful 연산 — 모든 원소를 봐야 끝나는 사람들

지금까지 본 filter, mapstateless 연산이었습니다. 직전 원소 한두 개의 결과만으로 현재 원소를 처리할 수 있습니다. 자바 공식 문서는 이를 명확히 구분합니다 — "Stateless operations, such as filter and map, retain no state from previously seen element."

반면 sorted, distinctstateful입니다. 마지막 원소가 도착하기 전에는 결과를 만들어 낼 수 없습니다. 가장 작은 원소가 마지막에 올 수도 있으니까요. 같은 원소가 마지막에 한 번 더 등장할 수도 있고요.

stateful 연산의 Sink는 보통 다음 모양을 띱니다.

// 개념적인 sorted Sink
abstract static class RefSortingSink<T> extends Sink.ChainedReference<T, T> {
    private List<T> buffer;
    private final Comparator<? super T> comparator;

    @Override
    public void begin(long size) {
        buffer = (size >= 0) ? new ArrayList<>((int) size) : new ArrayList<>();
        downstream.begin(size);
    }

    @Override
    public void accept(T t) {
        buffer.add(t);
    }

    @Override
    public void end() {
        buffer.sort(comparator);
        if (!cancellationRequested()) {
            downstream.begin(buffer.size());
            for (T t : buffer) downstream.accept(t);
        }
        downstream.end();
        buffer = null;
    }
}

accept에서 downstream으로 넘기지 않고 버퍼에 모으기만 한다는 점이 핵심입니다. 그래서 무한 스트림(Stream.iterate(0, i -> i+1).sorted().limit(10))에 sorted를 붙이면 end가 영원히 호출되지 않고, 결과적으로 프로그램이 멈추지 않습니다. limit가 sorted보다 에 와도 동작하지 않습니다. sorted가 모든 원소를 봐야 한다는 사실이 limit으로 깎이지 않기 때문입니다.

이건 단순한 구현 결과가 아니라 의미상으로도 옳습니다. 정렬은 전체에 대한 연산이지, 부분 시퀀스에 대해 정의되지 않습니다.

병렬 스트림 — trySplit와 Fork/Join

parallelStream()을 부르거나 .parallel()을 부르면 evaluateevaluateParallel 경로로 갑니다. 시퀀셜 경로가 Sink 체인 한 줄을 만들어 forEachRemaining으로 흘려보냈다면, 병렬 경로는 같은 Sink 체인을 여러 개 만들어 여러 스레드에서 동시에 흘립니다.

작업 분할의 책임은 Spliterator의 trySplit이 집니다. ForkJoinTask로 감싼 작업이 다음 패턴을 따릅니다(개념 코드):

void compute() {
    Spliterator<P_IN> rs = spliterator, ls;
    long sizeEstimate = rs.estimateSize();
    long sizeThreshold = ...; // 보통 (total / (workers * 4))
    while (sizeEstimate >= sizeThreshold
           && (ls = rs.trySplit()) != null) {
        AbstractTask<P_IN, P_OUT, R, K> leftChild = makeChild(ls);
        leftChild.fork();
        rs = rs.trySplit() != null ? rs : rs; // 갱신
        sizeEstimate = rs.estimateSize();
    }
    // 더 못 쪼개면 시퀀셜 처리
    setLocalResult(doLeaf());
}

ArrayList의 Spliterator는 내부 배열을 인덱스 범위로 다루기 때문에 trySplit이 O(1)에 가깝게 동작합니다. 단순히 [from, mid)를 새 Spliterator로 떼어 주고, 자신은 [mid, to)를 갖습니다. 반면 LinkedList의 Spliterator는 분할이 거의 의미가 없어서 병렬 성능이 잘 나오지 않습니다.

분할이 더 이상 의미가 없을 정도로 작아지면 각 leaf 작업은 시퀀셜 경로와 동일하게 wrapAndCopyInto를 부릅니다. 즉 leaf 한 개의 처리 방식은 시퀀셜 스트림과 똑같습니다.

각 leaf의 결과는 부모 작업에서 combine으로 합쳐집니다. reduce는 결합 법칙(associative)을 갖는 reducer 덕분에 그냥 두 값을 합치면 되고, collect(toList())는 두 리스트를 concat하면 됩니다. 그래서 reducer/collector에는 항상 결합 법칙이 요구됩니다. 결합 법칙이 깨지면 순서에 따라 결과가 달라집니다.

flowchart TB
    R[Root Task<br/>spliterator] --> L1[Left split]
    R --> R1[Right split]
    L1 --> L2[Left.Left]
    L1 --> L3[Left.Right]
    R1 --> R2[Right.Left]
    R1 --> R3[Right.Right]
    L2 --> CL[combine LL+LR]
    L3 --> CL
    R2 --> CR[combine RL+RR]
    R3 --> CR
    CL --> ROOT[combine L+R]
    CR --> ROOT

병렬 스트림이 기본적으로 사용하는 풀은 공용 ForkJoinPool(ForkJoinPool.commonPool())입니다. 별도 설정 없이 쓰면 모든 병렬 스트림이 같은 풀을 공유하므로, IO 블로킹이 섞인 작업을 병렬 스트림에 태우면 풀이 빨려 들어가서 다른 병렬 스트림 작업까지 굶을 수 있습니다.

흔히 만나는 함정들

내부 모델을 알고 나면, 자주 보는 버그 몇 가지의 원인이 한 줄로 보입니다.

1. peek로 디버깅이 안 된다

list.stream()
    .peek(x -> System.out.println("seen: " + x))
    .filter(x -> x > 0)
    .findFirst();

peek는 중간 연산이라 단독으로는 아무 일도 안 합니다. 위 코드는 정상 동작하지만, findFirst를 빼면 출력은 한 줄도 안 나옵니다. 또한 findFirst가 짧게 끊고 끝나면 살아남은 원소만 peek됩니다. peek는 디버깅용으로 흔히 쓰이지만, "이 시점에서 원소가 모두 흘러간다"를 보장하지 않는다는 점에서 사실은 위험합니다.

2. 같은 스트림을 두 번 쓸 수 없다

Stream<Integer> s = list.stream();
long c = s.count();
List<Integer> l = s.toList(); // IllegalStateException

앞서 본 linkedOrConsumed가 원인입니다. 스트림은 변수에 담아 두지 말고, 매번 list.stream()을 새로 부르세요.

3. 무한 스트림 + sorted = 영원한 대기

Stream.iterate(0, i -> i + 1)
      .sorted()        // 모든 원소를 봐야 하므로 절대 끝나지 않음
      .limit(10)
      .toList();

sorted는 stateful이라 begin → 모든 accept → end의 사이클이 끝나야 결과를 흘려보냅니다. 무한 소스에서는 그 사이클이 닫히지 않습니다. limit를 앞에 두면 해결됩니다.

Stream.iterate(0, i -> i + 1)
      .limit(10)
      .sorted()        // 10개만 정렬
      .toList();

4. 병렬 스트림에 mutable 누적자 쓰기

List<Integer> result = new ArrayList<>();
list.parallelStream()
    .filter(x -> x > 0)
    .forEach(result::add);   // ArrayList는 thread-safe하지 않음

forEach의 콜백은 여러 스레드에서 동시에 호출됩니다. Collections.synchronizedList를 쓰거나, 아예 .collect(toList())로 바꿔야 합니다. collect는 내부적으로 thread-local 누적자를 쓰고 combine으로 합치므로 안전합니다.

5. 람다 안에서 외부 변수 변형

int[] sum = {0};
list.stream().forEach(x -> sum[0] += x);  // 동작은 하지만 의도가 어긋남

시퀀셜에서는 동작하지만, 같은 코드의 parallelStream은 데이터 레이스로 결과가 깨집니다. 더 근본적으로, Stream의 라이프사이클(begin/accept/end)을 활용하지 못합니다. reduce(0, Integer::sum)이나 mapToInt(...).sum()을 쓰세요.

정리

stream().filter(...).map(...).collect(...)는 다음 순서로 실행됩니다.

  1. stream() — 소스 Spliterator를 감싸는 Head 스테이지 생성
  2. filter(...) — StatelessOp 스테이지 추가, 이전 스테이지의 linkedOrConsumed를 true로 셋
  3. map(...) — 또 하나의 StatelessOp 스테이지 추가
  4. collect(...)evaluate 호출
    • 마지막 스테이지부터 거슬러 올라가며 wrapSink로 Sink 체인 구성
    • 소스 Spliterator의 forEachRemaining으로 모든 원소를 Sink 체인 입구에 흘려 넣음
    • 각 원소는 filter Sink → map Sink → collector Sink 순으로 통과
    • 통과한 원소만 다음 단계로, 매 단계는 직전 단계의 downstream을 호출

세 부품의 협력 — 분할 가능한 데이터 소스(Spliterator), 양방향 연결된 스테이지(Pipeline), 데코레이터로 쌓인 콜백 체인(Sink) — 이 합쳐져 다음 세 가지 약속을 만들어 냅니다.

  • Lazy evaluation — 터미널 연산 전까지는 어떤 원소도 처리되지 않습니다.
  • Operation fusion — 모든 중간 연산이 한 번의 소스 순회에 융합됩니다.
  • Short-circuitcancellationRequested를 통해 무한/큰 소스에서도 필요한 만큼만 봅니다.

다음에 .stream()을 칠 때, 머릿속에 떠올릴 그림은 단순한 컬렉션 변환이 아니라 세 종류의 객체로 짜인 게으른 파이프라인입니다. 그 파이프라인은 마지막 한 줄, 터미널 연산이 호출되는 그 순간에 비로소 살아 움직입니다.

참고자료

More from this blog

JVM은 컨테이너의 CPU와 메모리 한계를 어떻게 알아낼까

8코어 노드에 컨테이너를 띄웠는데 ForkJoinPool이 스레드를 한두 개만 만들어요. 메모리는 넉넉히 줬는데 컨테이너가 자꾸 OOMKilled로 죽고요. 분명히 같은 JAR인데 로컬에서는 멀쩡하다가 쿠버네티스에만 올리면 이상해져요. 이 글은 "왜 컨테이너 속 JVM은 다르게 행동하는가"를 cgroup이라는 진짜 경계선과, JVM이 그 경계를 읽어내는 내

May 21, 202615 min read

ThreadPoolExecutor는 언제 스레드를 새로 만들까 — execute()의 3단계

Executors.newFixedThreadPool(10) 한 줄을 쓰면서도, 11번째 작업이 오면 스레드가 11개로 늘어날 거라고 막연히 기대해 본 적 없으신가요. 실제로는 큐가 먼저 무한히 쌓이고 스레드는 영원히 10개에 머물러요. 이 글은 ThreadPoolExecutor가 작업을 받았을 때 "스레드를 새로 만들지, 큐에 넣을지, 거부할지"를 결정하는

May 21, 202617 min read

자바 synchronized는 어떻게 동작할까 — 모니터, 락 인플레이션, 그리고 사라진 biased locking

synchronized 키워드 하나로 스레드 안전을 얻는 동안, JVM 안에서는 객체 헤더의 비트를 뒤집고, 스택에 락 레코드를 쌓고, 경합이 생기면 네이티브 모니터로 승격하는 일이 벌어져요. 이 글은 그 한 번의 잠금이 객체 헤더부터 ObjectMonitor까지 어떤 경로를 거치는지, 그리고 한때 있었다가 JDK 18에서 사라진 biased locking

May 19, 202616 min read

JVM 객체 할당의 비밀 — TLAB, Bump-the-Pointer, 그리고 할당이 거의 공짜인 이유

Java에서 new를 호출하면 무슨 일이 벌어질까요? "힙에 메모리를 잡는다"는 한 문장 뒤에는 스레드마다 자기만의 분양 구역을 나눠 갖는 정교한 설계가 숨어 있어요. 이 글은 HotSpot JVM이 객체 할당을 어떻게 "거의 공짜"로 만드는지 그 내부를 따라가 보려는 글이에요. JVM 메모리 동작 원리에 관심 있는 분께 권해요. 자바를 쓰다 보면 객체를

May 15, 202614 min read

Java Zero-Copy — FileChannel.transferTo, sendfile, 그리고 Kafka가 디스크를 네트워크로 흘려보내는 방법

"파일을 읽어서 소켓으로 보낸다." 한 줄짜리 요구사항이에요. 그런데 이 한 줄 뒤에서 데이터는 메모리를 네 번이나 복사하고, CPU는 커널과 유저 공간을 네 번이나 들락거려요. Kafka처럼 초당 수십만 건을 흘려보내야 하는 시스템에서 이 비용은 그냥 넘길 수가 없어요. 이 글은 그 복사를 한 겹씩 벗겨내는 zero-copy의 동작 원리를 따라가요. 전통

May 15, 202617 min read

끄적끄적 테크 블로그

165 posts

물류 회사에 다니고 있는 개발자 블로그입니다. 개발을 너무 좋아해서 정신없이 작업하다가 중간에 끄적거리며 내용들을 몇개 적어봅니다 ㅎㅎ