Skip to content
Apache Kafka kf streams 4 min read

Aggregations

Aggregations turn an unbounded stream of individual events into running summaries: order counts per customer, revenue totals per product, average session length per device. In Kafka Streams these are stateful operations — they accumulate results in a local state store backed by a changelog topic, so the running totals survive restarts and rebalances. Getting aggregations right is what separates a toy stream from a production analytics pipeline.

Grouping before you aggregate

Every aggregation starts with a grouping step. You cannot call count() directly on a KStream; you must first declare the key that records will be aggregated under. Grouping repartitions the data so that all records sharing a key land in the same partition (and therefore the same task instance), which is what makes a local, per-key state store correct.

There are two grouping operators:

OperatorWhen to useRepartition cost
groupByKey()The record key is already the aggregation keyNone — uses existing partitioning
groupBy((k, v) -> newKey)You need to re-key by a value fieldTriggers a repartition topic

Both return a KGroupedStream, the entry point to count(), reduce(), and aggregate(). Prefer groupByKey() when possible; groupBy() writes every record to an internal repartition topic, which adds network and disk overhead.

KStream<String, Order> orders = builder.stream("orders",
        Consumed.with(Serdes.String(), orderSerde));

// Re-key by customerId so we aggregate per customer
KGroupedStream<String, Order> byCustomer = orders.groupBy(
        (key, order) -> order.customerId(),
        Grouped.with(Serdes.String(), orderSerde));

Counting records

count() is the simplest aggregation: it produces a KTable<K, Long> of how many records have arrived per key. The result is an evolving table — each new record emits an updated count downstream.

KTable<String, Long> ordersPerCustomer = byCustomer.count(
        Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("orders-per-customer")
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.Long()));

Naming the store via Materialized.as(...) is important: it gives the changelog topic a stable name and makes the store queryable through Interactive Queries.

Reducing values

reduce() combines values of the same type using an associative function. It is ideal when the aggregate has the same shape as the input — for example, summing numeric values or keeping the latest record.

KStream<String, Long> amounts = orders.groupBy(
                (key, order) -> order.productId(),
                Grouped.with(Serdes.String(), orderSerde))
        .reduce(
                (aggValue, newValue) -> aggValue + newValue,
                Materialized.as("revenue-per-product"))
        .toStream();

Note: reduce() never sees an initial value. The first record for a key becomes the seed, so the reducer is only invoked from the second record onward. If you need a starting value of a different type, use aggregate() instead.

Custom aggregation with aggregate()

aggregate() is the most general operator. It takes an initializer (the zero value), an aggregator (folds each new record into the accumulator), and a Serde for the accumulator type — which can differ from both key and value types. This is how you build totals, averages, min/max trackers, or any custom rollup.

The example below computes per-customer running totals — order count and summed revenue — into a record accumulator.

public record OrderStats(long count, double total) {
    public static OrderStats empty() { return new OrderStats(0, 0.0); }
    public OrderStats add(Order o) {
        return new OrderStats(count + 1, total + o.amount());
    }
}

KTable<String, OrderStats> stats = orders.groupBy(
                (key, order) -> order.customerId(),
                Grouped.with(Serdes.String(), orderSerde))
        .aggregate(
                OrderStats::empty,                       // Initializer
                (custId, order, agg) -> agg.add(order),  // Aggregator
                Materialized.<String, OrderStats, KeyValueStore<Bytes, byte[]>>as("customer-stats")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(statsSerde));

Emitting results to a topic

A KTable is a changelog: every update produces a new record. To publish those updates downstream, convert it back to a stream with toStream() and write it out. Each emission is the latest aggregate for that key.

stats.toStream()
     .peek((custId, s) -> log.info("{} -> {} orders, ${}", custId, s.count(), s.total()))
     .to("customer-stats-output",
         Produced.with(Serdes.String(), statsSerde));

Consuming the output topic shows the running aggregate evolving as new orders arrive:

Output:

cust-42 -> {count=1, total=19.99}
cust-42 -> {count=2, total=64.98}
cust-7  -> {count=1, total=120.00}
cust-42 -> {count=3, total=84.97}

By default Kafka Streams buffers and de-duplicates updates in a record cache before emitting, so you see fewer intermediate records than inputs. Tune this with cache.max.bytes.buffering and commit.interval.ms; set the cache to 0 only when you genuinely need every intermediate update (at a throughput cost).

Best Practices

  • Always name your state stores via Materialized.as(...) — unnamed stores get auto-generated changelog topic names that change between topology edits and break upgrades.
  • Prefer groupByKey() over groupBy() when the key already matches; avoid an unnecessary repartition topic.
  • Use reduce() when input and output types match; reach for aggregate() only when you need a different accumulator type or a non-trivial seed.
  • Pick a sensible commit.interval.ms and cache size to balance output latency against the volume of intermediate updates emitted downstream.
  • Make aggregators pure and associative — they may be re-applied during recovery, so side effects or order-dependence cause incorrect results.
  • Provide robust Serdes for your accumulator type; a serialization failure mid-aggregate corrupts the changelog and is painful to recover from.
Last updated June 1, 2026
Was this helpful?