Skip to content
Apache Kafka kf streams 5 min read

Stateless Operations

Stateless operators are the workhorses of the Kafka Streams DSL: they transform, filter, reshape, and route records one at a time without consulting any persisted state. Because each record is processed independently, these operators need no state stores, no changelog topics, and no repartitioning unless you deliberately change the key — which makes them cheap, easy to reason about, and the right default for most enrichment and cleansing logic. In production, the bulk of a topology is usually a chain of stateless steps feeding a smaller stateful core.

Throughout this page we use a running example: a stream of raw order events keyed by order id, where the value is a JSON payload we model as a Java record.

public record Order(String orderId, String country, String status, double amount) {}

Filtering records

filter keeps records for which a predicate returns true; filterNot keeps those for which it returns false. Both inspect key and value and never modify the record, so the key stays valid and no repartition is triggered.

KStream<String, Order> orders = builder.stream("orders");

KStream<String, Order> paidOrders =
    orders.filter((key, order) -> "PAID".equals(order.status()));

KStream<String, Order> nonZero =
    orders.filterNot((key, order) -> order.amount() == 0.0);

Mapping keys and values

mapValues transforms only the value and leaves the key untouched — this is the preferred mapping operator because Kafka Streams knows the key did not change and therefore avoids a costly repartition. map lets you change both key and value, which marks the stream for repartitioning on the next stateful operation (the key may now route to a different partition).

// Value-only: no repartition
KStream<String, Double> amounts =
    orders.mapValues(order -> order.amount());

// Key + value: flags downstream repartition
KStream<String, Order> byCountry =
    orders.map((key, order) -> KeyValue.pair(order.country(), order));

Prefer mapValues/filter over map whenever the key is unchanged. Using map unnecessarily forces a repartition topic that adds latency, network traffic, and an extra internal topic to operate.

Re-keying with selectKey

When you only need to change the key, selectKey is clearer than map and expresses intent precisely. It still marks the stream for repartitioning before the next aggregation or join.

KStream<String, Order> rekeyed =
    orders.selectKey((key, order) -> order.country());

Flattening with flatMap

flatMap and flatMapValues emit zero, one, or many output records per input record. Use flatMapValues when the key is unchanged (no repartition) and flatMap when you also produce new keys.

// Split a comma-separated tags value into one record per tag
KStream<String, String> tagStream = builder.stream("order-tags");

KStream<String, String> exploded =
    tagStream.flatMapValues(value -> Arrays.asList(value.split(",")));

// Emit per-line-item records with new keys
KStream<String, LineItem> lineItems =
    orders.flatMap((key, order) ->
        order.items().stream()
             .map(item -> KeyValue.pair(item.sku(), item))
             .toList());

Branching a stream

split() routes records into multiple named branches based on predicates — the modern replacement for the deprecated branch(...) method. Each record falls into the first matching branch; defaultBranch() captures the rest, and the result is a Map keyed by branch name.

Map<String, KStream<String, Order>> branches =
    orders.split(Named.as("order-"))
          .branch((k, o) -> o.amount() >= 1000, Branched.as("high"))
          .branch((k, o) -> o.amount() >= 100,  Branched.as("medium"))
          .defaultBranch(Branched.as("low"));

branches.get("order-high").to("orders-high-value");
branches.get("order-medium").to("orders-medium-value");
branches.get("order-low").to("orders-low-value");

Merging streams

merge combines two streams of the same key and value types into one, interleaving their records. It is the inverse of branching and is handy for re-uniting branches after independent processing.

KStream<String, Order> reunited =
    branches.get("order-high").merge(branches.get("order-medium"));

Inspecting with peek and foreach

peek is a non-mutating side-effect operator: it lets you observe each record (log it, increment a metric) and then passes it through unchanged. foreach is a terminal operator — it consumes the stream and returns nothing, so nothing can follow it.

orders.peek((key, order) -> log.debug("Processing order {}", key))
      .filter((key, order) -> order.amount() > 0)
      .foreach((key, order) -> metrics.counter("orders.processed").increment());

peek and foreach give no delivery guarantees for their side effects across failures. Never use them for writes that must be exactly-once — emit to a topic with to and let a downstream consumer handle persistence instead.

Writing output with to and through

to is a terminal sink that writes the stream to a Kafka topic. through writes to a topic and continues processing from it; in modern versions it is replaced by repartition() when you only need a re-partitioned intermediate stream.

KStream<String, Order> normalized =
    orders.mapValues(o -> new Order(o.orderId(), o.country().toUpperCase(), o.status(), o.amount()));

// Terminal write
normalized.to("orders-normalized", Produced.with(Serdes.String(), orderSerde));

// Continue from a materialized intermediate topic
KStream<String, Order> repartitioned =
    normalized.selectKey((k, o) -> o.country())
              .repartition(Repartitioned.with(Serdes.String(), orderSerde));

Running the normalized pipeline against sample input shows the per-record transformation:

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic orders-normalized --from-beginning --property print.key=true

Output:

ord-1   {"orderId":"ord-1","country":"US","status":"PAID","amount":250.0}
ord-2   {"orderId":"ord-2","country":"DE","status":"PAID","amount":1200.0}

Operator reference

OperatorChanges key?May repartition?Records out per in
filter / filterNotNoNo0 or 1
mapValuesNoNo1
mapYesYes1
selectKeyYesYes1
flatMapValuesNoNo0..N
flatMapYesYes0..N
split / branchNoNo1 (one branch)
mergeNoNo1
peekNoNo1 (pass-through)
foreachNoterminal

Best Practices

  • Default to mapValues, filter, and flatMapValues whenever the key is unchanged so Kafka Streams can skip repartitioning.
  • Reserve map and selectKey for genuine re-keying, and group all key changes immediately before the aggregation or join that needs them.
  • Use split() with named Branched entries instead of the deprecated branch(...); always provide a defaultBranch so records are never silently dropped.
  • Keep peek/foreach side effects idempotent and observability-only; route durable writes through to and a dedicated consumer.
  • Specify explicit Produced/Repartitioned serdes rather than relying on defaults, especially after a key change.
  • Chain stateless operators freely — they fuse into a single sub-topology and add negligible overhead compared to stateful steps.
Last updated June 1, 2026
Was this helpful?