Aggregations
Aggregations turn an unbounded stream of individual events into running summaries: order counts per customer, revenue totals per product, average session length per device. In Kafka Streams these are stateful operations — they accumulate results in a local state store backed by a changelog topic, so the running totals survive restarts and rebalances. Getting aggregations right is what separates a toy stream from a production analytics pipeline.
Grouping before you aggregate
Every aggregation starts with a grouping step. You cannot call count() directly on a KStream; you must first declare the key that records will be aggregated under. Grouping repartitions the data so that all records sharing a key land in the same partition (and therefore the same task instance), which is what makes a local, per-key state store correct.
There are two grouping operators:
| Operator | When to use | Repartition cost |
|---|---|---|
groupByKey() | The record key is already the aggregation key | None — uses existing partitioning |
groupBy((k, v) -> newKey) | You need to re-key by a value field | Triggers a repartition topic |
Both return a KGroupedStream, the entry point to count(), reduce(), and aggregate(). Prefer groupByKey() when possible; groupBy() writes every record to an internal repartition topic, which adds network and disk overhead.
KStream<String, Order> orders = builder.stream("orders",
Consumed.with(Serdes.String(), orderSerde));
// Re-key by customerId so we aggregate per customer
KGroupedStream<String, Order> byCustomer = orders.groupBy(
(key, order) -> order.customerId(),
Grouped.with(Serdes.String(), orderSerde));
Counting records
count() is the simplest aggregation: it produces a KTable<K, Long> of how many records have arrived per key. The result is an evolving table — each new record emits an updated count downstream.
KTable<String, Long> ordersPerCustomer = byCustomer.count(
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("orders-per-customer")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
Naming the store via Materialized.as(...) is important: it gives the changelog topic a stable name and makes the store queryable through Interactive Queries.
Reducing values
reduce() combines values of the same type using an associative function. It is ideal when the aggregate has the same shape as the input — for example, summing numeric values or keeping the latest record.
KStream<String, Long> amounts = orders.groupBy(
(key, order) -> order.productId(),
Grouped.with(Serdes.String(), orderSerde))
.reduce(
(aggValue, newValue) -> aggValue + newValue,
Materialized.as("revenue-per-product"))
.toStream();
Note:
reduce()never sees an initial value. The first record for a key becomes the seed, so the reducer is only invoked from the second record onward. If you need a starting value of a different type, useaggregate()instead.
Custom aggregation with aggregate()
aggregate() is the most general operator. It takes an initializer (the zero value), an aggregator (folds each new record into the accumulator), and a Serde for the accumulator type — which can differ from both key and value types. This is how you build totals, averages, min/max trackers, or any custom rollup.
The example below computes per-customer running totals — order count and summed revenue — into a record accumulator.
public record OrderStats(long count, double total) {
public static OrderStats empty() { return new OrderStats(0, 0.0); }
public OrderStats add(Order o) {
return new OrderStats(count + 1, total + o.amount());
}
}
KTable<String, OrderStats> stats = orders.groupBy(
(key, order) -> order.customerId(),
Grouped.with(Serdes.String(), orderSerde))
.aggregate(
OrderStats::empty, // Initializer
(custId, order, agg) -> agg.add(order), // Aggregator
Materialized.<String, OrderStats, KeyValueStore<Bytes, byte[]>>as("customer-stats")
.withKeySerde(Serdes.String())
.withValueSerde(statsSerde));
Emitting results to a topic
A KTable is a changelog: every update produces a new record. To publish those updates downstream, convert it back to a stream with toStream() and write it out. Each emission is the latest aggregate for that key.
stats.toStream()
.peek((custId, s) -> log.info("{} -> {} orders, ${}", custId, s.count(), s.total()))
.to("customer-stats-output",
Produced.with(Serdes.String(), statsSerde));
Consuming the output topic shows the running aggregate evolving as new orders arrive:
Output:
cust-42 -> {count=1, total=19.99}
cust-42 -> {count=2, total=64.98}
cust-7 -> {count=1, total=120.00}
cust-42 -> {count=3, total=84.97}
By default Kafka Streams buffers and de-duplicates updates in a record cache before emitting, so you see fewer intermediate records than inputs. Tune this with cache.max.bytes.buffering and commit.interval.ms; set the cache to 0 only when you genuinely need every intermediate update (at a throughput cost).
Best Practices
- Always name your state stores via
Materialized.as(...)— unnamed stores get auto-generated changelog topic names that change between topology edits and break upgrades. - Prefer
groupByKey()overgroupBy()when the key already matches; avoid an unnecessary repartition topic. - Use
reduce()when input and output types match; reach foraggregate()only when you need a different accumulator type or a non-trivial seed. - Pick a sensible
commit.interval.msand cache size to balance output latency against the volume of intermediate updates emitted downstream. - Make aggregators pure and associative — they may be re-applied during recovery, so side effects or order-dependence cause incorrect results.
- Provide robust Serdes for your accumulator type; a serialization failure mid-aggregate corrupts the changelog and is painful to recover from.