Mono & Flux
Mono and Flux are the two core types of Project Reactor, the reactive library that powers Spring WebFlux. Both are Publisher implementations from the Reactive Streams spec, but they differ in cardinality: a Mono emits at most one item, a Flux emits zero to many. You compose them with operators into declarative, non-blocking pipelines.
Mono vs Flux
Mono<T> | Flux<T> | |
|---|---|---|
| Cardinality | 0 or 1 item | 0 to N items (possibly infinite) |
| Typical use | single result, void operations | collections, streams, SSE |
| Maps to | Optional<T> / single value | List<T> / Stream<T> |
| Completion | onComplete after 0–1 onNext | onComplete after N onNext |
Both arrive with the reactor-core dependency, which is pulled in transitively by spring-boot-starter-webflux.
Creating Mono and Flux
// Mono
Mono<String> just = Mono.just("hello");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new IllegalStateException("boom"));
Mono<String> lazy = Mono.fromSupplier(() -> compute()); // deferred
Mono<User> async = Mono.fromFuture(callApiAsync()); // CompletableFuture
// Flux
Flux<Integer> range = Flux.range(1, 5); // 1,2,3,4,5
Flux<String> vals = Flux.just("a", "b", "c");
Flux<String> iter = Flux.fromIterable(List.of("x", "y"));
Flux<Long> ticks = Flux.interval(Duration.ofSeconds(1)); // 0,1,2,... forever
Warning: Prefer
Mono.fromSupplier/Mono.deferoverMono.just(compute())when the value is expensive or has side effects.Mono.justevaluates its argument eagerly, right now, defeating laziness.
The golden rule: nothing happens until you subscribe
A Mono or Flux is a cold publisher — a recipe, not a running computation. Declaring the pipeline does nothing. Work begins only when something subscribes. In a Spring controller, the framework subscribes for you; you almost never call subscribe() by hand in application code.
Flux<Integer> pipeline = Flux.range(1, 3)
.map(i -> i * 10)
.doOnNext(i -> System.out.println("emit " + i));
// Nothing has printed yet — no subscriber.
pipeline.subscribe(System.out::println);
Output:
emit 10
10
emit 20
20
emit 30
30
Core operators
Operators transform the stream and return a new publisher (they are chainable and side-effect free on the source).
Flux.just("apple", "banana", "cherry")
.filter(s -> s.length() > 5) // keep banana, cherry
.map(String::toUpperCase) // transform each item, sync
.flatMap(s -> lookupAsync(s)) // async per item, flattened
.onErrorResume(ex -> Flux.just("FALLBACK"))
.subscribe(System.out::println);
| Operator | Purpose |
|---|---|
map | synchronous 1:1 transform of each item |
flatMap | async transform returning a Mono/Flux, flattened (interleaved) |
concatMap | like flatMap but preserves order, one at a time |
filter | drop items that fail a predicate |
zip | combine items from multiple publishers pairwise |
onErrorResume | switch to a fallback publisher on error |
onErrorReturn | emit a single fallback value on error |
switchIfEmpty | provide an alternative when the source is empty |
defaultIfEmpty | emit a default value when empty |
map vs flatMap
This distinction trips everyone up. Use map for a plain, synchronous transformation. Use flatMap when the transformation is itself asynchronous and returns a publisher — otherwise you end up with a Flux<Mono<T>> you can never unwrap.
// WRONG: nested publisher
Mono<Mono<User>> bad = userIdMono.map(id -> findUserById(id)); // findUserById returns Mono<User>
// RIGHT: flatMap unwraps the inner publisher
Mono<User> good = userIdMono.flatMap(id -> findUserById(id));
Combining with zip
Mono<User> user = userClient.findById(id);
Mono<Account> account = accountClient.findByUser(id);
Mono<Profile> profile = Mono.zip(user, account)
.map(tuple -> new Profile(tuple.getT1(), tuple.getT2()));
zip runs both calls concurrently and combines their results once both complete — a common pattern for fanning out to several services at once.
Error handling
Errors are first-class signals that travel down the pipeline like data. Handle them with operators rather than try/catch.
findUserById(id)
.map(User::email)
.switchIfEmpty(Mono.error(new UserNotFoundException(id)))
.onErrorResume(UserNotFoundException.class,
ex -> Mono.just("[email protected]"));
Schedulers: subscribeOn and publishOn
By default the pipeline runs on whatever thread subscribes. Schedulers let you shift work to a different thread pool — essential for the rare case where you must call a blocking API from a reactive flow without stalling the event loop.
Mono.fromCallable(() -> legacyBlockingDao.load(id)) // blocking call
.subscribeOn(Schedulers.boundedElastic()) // run it off the event loop
.map(this::transform); // back on the pipeline thread
| Operator | Effect |
|---|---|
subscribeOn | chooses the thread the source runs on (affects the whole chain upstream) |
publishOn | switches the thread for operators downstream of it |
Schedulers.boundedElastic() | bounded pool for blocking/long-running tasks |
Schedulers.parallel() | fixed pool sized to CPU cores for CPU-bound work |
Tip:
boundedElastic()is the safe escape hatch for unavoidable blocking calls. But wrapping JDBC in it does not make your app reactive — for true non-blocking SQL use R2DBC.
Testing with StepVerifier
Reactor ships reactor-test, whose StepVerifier asserts on emitted signals.
StepVerifier.create(Flux.just("a", "b").map(String::toUpperCase))
.expectNext("A", "B")
.verifyComplete();
Related Topics
- Reactive Programming — the non-blocking model and Reactive Streams.
- Spring WebFlux — using
Mono/Fluxin web handlers. - Reactive REST APIs — operators applied to real endpoints.
- R2DBC (Reactive SQL) — repositories that return
MonoandFlux. - Java Streams — the synchronous, pull-based cousin of Flux.
- Optional — the synchronous analogue of
Mono.