Stateless Operations
Stateless operators are the workhorses of the Kafka Streams DSL: they transform, filter, reshape, and route records one at a time without consulting any persisted state. Because each record is processed independently, these operators need no state stores, no changelog topics, and no repartitioning unless you deliberately change the key — which makes them cheap, easy to reason about, and the right default for most enrichment and cleansing logic. In production, the bulk of a topology is usually a chain of stateless steps feeding a smaller stateful core.
Throughout this page we use a running example: a stream of raw order events keyed by order id, where the value is a JSON payload we model as a Java record.
public record Order(String orderId, String country, String status, double amount) {}
Filtering records
filter keeps records for which a predicate returns true; filterNot keeps those for which it returns false. Both inspect key and value and never modify the record, so the key stays valid and no repartition is triggered.
KStream<String, Order> orders = builder.stream("orders");
KStream<String, Order> paidOrders =
orders.filter((key, order) -> "PAID".equals(order.status()));
KStream<String, Order> nonZero =
orders.filterNot((key, order) -> order.amount() == 0.0);
Mapping keys and values
mapValues transforms only the value and leaves the key untouched — this is the preferred mapping operator because Kafka Streams knows the key did not change and therefore avoids a costly repartition. map lets you change both key and value, which marks the stream for repartitioning on the next stateful operation (the key may now route to a different partition).
// Value-only: no repartition
KStream<String, Double> amounts =
orders.mapValues(order -> order.amount());
// Key + value: flags downstream repartition
KStream<String, Order> byCountry =
orders.map((key, order) -> KeyValue.pair(order.country(), order));
Prefer
mapValues/filterovermapwhenever the key is unchanged. Usingmapunnecessarily forces a repartition topic that adds latency, network traffic, and an extra internal topic to operate.
Re-keying with selectKey
When you only need to change the key, selectKey is clearer than map and expresses intent precisely. It still marks the stream for repartitioning before the next aggregation or join.
KStream<String, Order> rekeyed =
orders.selectKey((key, order) -> order.country());
Flattening with flatMap
flatMap and flatMapValues emit zero, one, or many output records per input record. Use flatMapValues when the key is unchanged (no repartition) and flatMap when you also produce new keys.
// Split a comma-separated tags value into one record per tag
KStream<String, String> tagStream = builder.stream("order-tags");
KStream<String, String> exploded =
tagStream.flatMapValues(value -> Arrays.asList(value.split(",")));
// Emit per-line-item records with new keys
KStream<String, LineItem> lineItems =
orders.flatMap((key, order) ->
order.items().stream()
.map(item -> KeyValue.pair(item.sku(), item))
.toList());
Branching a stream
split() routes records into multiple named branches based on predicates — the modern replacement for the deprecated branch(...) method. Each record falls into the first matching branch; defaultBranch() captures the rest, and the result is a Map keyed by branch name.
Map<String, KStream<String, Order>> branches =
orders.split(Named.as("order-"))
.branch((k, o) -> o.amount() >= 1000, Branched.as("high"))
.branch((k, o) -> o.amount() >= 100, Branched.as("medium"))
.defaultBranch(Branched.as("low"));
branches.get("order-high").to("orders-high-value");
branches.get("order-medium").to("orders-medium-value");
branches.get("order-low").to("orders-low-value");
Merging streams
merge combines two streams of the same key and value types into one, interleaving their records. It is the inverse of branching and is handy for re-uniting branches after independent processing.
KStream<String, Order> reunited =
branches.get("order-high").merge(branches.get("order-medium"));
Inspecting with peek and foreach
peek is a non-mutating side-effect operator: it lets you observe each record (log it, increment a metric) and then passes it through unchanged. foreach is a terminal operator — it consumes the stream and returns nothing, so nothing can follow it.
orders.peek((key, order) -> log.debug("Processing order {}", key))
.filter((key, order) -> order.amount() > 0)
.foreach((key, order) -> metrics.counter("orders.processed").increment());
peekandforeachgive no delivery guarantees for their side effects across failures. Never use them for writes that must be exactly-once — emit to a topic withtoand let a downstream consumer handle persistence instead.
Writing output with to and through
to is a terminal sink that writes the stream to a Kafka topic. through writes to a topic and continues processing from it; in modern versions it is replaced by repartition() when you only need a re-partitioned intermediate stream.
KStream<String, Order> normalized =
orders.mapValues(o -> new Order(o.orderId(), o.country().toUpperCase(), o.status(), o.amount()));
// Terminal write
normalized.to("orders-normalized", Produced.with(Serdes.String(), orderSerde));
// Continue from a materialized intermediate topic
KStream<String, Order> repartitioned =
normalized.selectKey((k, o) -> o.country())
.repartition(Repartitioned.with(Serdes.String(), orderSerde));
Running the normalized pipeline against sample input shows the per-record transformation:
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic orders-normalized --from-beginning --property print.key=true
Output:
ord-1 {"orderId":"ord-1","country":"US","status":"PAID","amount":250.0}
ord-2 {"orderId":"ord-2","country":"DE","status":"PAID","amount":1200.0}
Operator reference
| Operator | Changes key? | May repartition? | Records out per in |
|---|---|---|---|
filter / filterNot | No | No | 0 or 1 |
mapValues | No | No | 1 |
map | Yes | Yes | 1 |
selectKey | Yes | Yes | 1 |
flatMapValues | No | No | 0..N |
flatMap | Yes | Yes | 0..N |
split / branch | No | No | 1 (one branch) |
merge | No | No | 1 |
peek | No | No | 1 (pass-through) |
foreach | — | No | terminal |
Best Practices
- Default to
mapValues,filter, andflatMapValueswhenever the key is unchanged so Kafka Streams can skip repartitioning. - Reserve
mapandselectKeyfor genuine re-keying, and group all key changes immediately before the aggregation or join that needs them. - Use
split()with namedBranchedentries instead of the deprecatedbranch(...); always provide adefaultBranchso records are never silently dropped. - Keep
peek/foreachside effects idempotent and observability-only; route durable writes throughtoand a dedicated consumer. - Specify explicit
Produced/Repartitionedserdes rather than relying on defaults, especially after a key change. - Chain stateless operators freely — they fuse into a single sub-topology and add negligible overhead compared to stateful steps.