Skip to content
Apache Kafka kf streams 4 min read

Testing Streams

A Kafka Streams topology is just business logic wired together, and you should be able to test it the same way you test any pure function — fast, in-process, and without external dependencies. The TopologyTestDriver from kafka-streams-test-utils runs your entire topology in a single thread with no broker, no networking, and no real consumer group. You pipe records in, read records out, and inspect state stores directly, all in milliseconds. This makes it the foundation of a fast, deterministic test suite for stateless transformations, joins, windows, and aggregations alike.

Why TopologyTestDriver over an embedded broker

Integration tests against an embedded broker (or Testcontainers) are valuable, but they are slow, flaky under load, and they exercise plumbing you do not own. For the logic in your topology — filters, maps, joins, windowed counts — TopologyTestDriver is the right tool. It uses an in-memory mock time source, so windowing and punctuation are fully under your control: you advance time explicitly rather than waiting on a wall clock.

ConcernTopologyTestDriverEmbedded broker / Testcontainers
SpeedMilliseconds per testSeconds per test
Broker requiredNoYes
Time controlDeterministic (you advance it)Real wall-clock
Best forTopology logic, windows, aggregationsEnd-to-end wiring, serdes over the wire, config

Adding the dependency

Pull in the test-utils artifact alongside your normal Streams dependency. Match the version to your kafka-streams version.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>3.9.0</version>
    <scope>test</scope>
</dependency>

The build-pipe-assert pattern

Every test follows the same three steps: build a Topology, create test topics from the driver, then pipe and assert. Crucially, you build the topology exactly the way production does — extract it into a reusable method so the test and the app share the same code.

Consider a simple topology that uppercases values on a words topic and writes them to WORDS:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;

public final class WordTopology {

    public static Topology build() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("words", Consumed.with(Serdes.String(), Serdes.String()))
               .mapValues(String::toUpperCase)
               .to("WORDS", Produced.with(Serdes.String(), Serdes.String()));
        return builder.build();
    }
}

The test wires this into the driver. Note the try-with-resources block — closing the driver releases its in-memory state stores so tests stay isolated.

import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.jupiter.api.Test;

import java.util.Properties;

import static org.assertj.core.api.Assertions.assertThat;

class WordTopologyTest {

    @Test
    void uppercasesValues() {
        Properties props = new Properties();
        props.put("application.id", "word-test");
        props.put("bootstrap.servers", "dummy:9092"); // never contacted

        try (TopologyTestDriver driver = new TopologyTestDriver(WordTopology.build(), props)) {
            TestInputTopic<String, String> in = driver.createInputTopic(
                    "words", new StringSerializer(), new StringSerializer());
            TestOutputTopic<String, String> out = driver.createOutputTopic(
                    "WORDS", new StringDeserializer(), new StringDeserializer());

            in.pipeInput("k1", "hello");
            in.pipeInput("k2", "world");

            assertThat(out.readKeyValue()).extracting("key", "value")
                    .containsExactly("k1", "HELLO");
            assertThat(out.readValue()).isEqualTo("WORLD");
            assertThat(out.isEmpty()).isTrue();
        }
    }
}

bootstrap.servers is required by config validation but is never actually contacted — the driver short-circuits all I/O.

Reading outputs

TestOutputTopic gives you several ways to drain results depending on what you want to assert:

MethodReturnsUse when
readValue()Next valueYou only care about the value
readKeyValue()KeyValue<K,V>Key matters (e.g. re-keyed streams)
readRecord()TestRecord with headers + timestampAsserting headers or event time
readValuesToList()All remaining valuesBulk assertion on full output
readKeyValuesToMap()Latest value per keyChangelog/upsert semantics (KTable)
isEmpty()booleanVerifying no extra records leaked

Tip: readKeyValuesToMap() collapses to the latest value per key, which mirrors KTable update semantics — perfect for asserting the final state of an aggregation rather than every intermediate emission.

Testing windowed and stateful logic deterministically

Because the driver uses a mock clock, time-based logic becomes a pure function of the timestamps you supply. For windowed aggregations, advance time by controlling record timestamps via pipeInput(key, value, timestamp). This is what makes window-boundary tests reliable instead of timing-dependent.

import java.time.Duration;
import java.time.Instant;

@Test
void countsPerTumblingWindow() {
    Instant base = Instant.parse("2026-06-01T10:00:00Z");

    try (TopologyTestDriver driver = new TopologyTestDriver(ClickTopology.build(), props)) {
        TestInputTopic<String, String> in = driver.createInputTopic(
                "clicks", new StringSerializer(), new StringSerializer());
        TestOutputTopic<String, Long> out = driver.createOutputTopic(
                "click-counts", new StringDeserializer(), new LongDeserializer());

        // Two events inside the first 1-minute window
        in.pipeInput("user-1", "view", base);
        in.pipeInput("user-1", "view", base.plus(Duration.ofSeconds(30)));
        // One event in the next window
        in.pipeInput("user-1", "view", base.plus(Duration.ofSeconds(75)));

        var counts = out.readKeyValuesToList();
        assertThat(counts).extracting("value").containsExactly(1L, 2L, 1L);
    }
}

You can also reach straight into state stores to assert intermediate state without consuming output topics:

KeyValueStore<String, Long> store = driver.getKeyValueStore("counts-store");
assertThat(store.get("user-1")).isEqualTo(3L);

To exercise Punctuator callbacks or suppressed windows that emit on time advance, call driver.advanceWallClockTime(Duration.ofMinutes(1)), which fires wall-clock punctuations deterministically.

Best Practices

  • Extract topology construction into a static build() method shared by production code and tests so you test the real wiring, not a copy.
  • Always use try-with-resources (or @AfterEach driver.close()) to release in-memory state and keep tests isolated.
  • Drive time explicitly with timestamped pipeInput and advanceWallClockTime — never rely on the real clock for window assertions.
  • Assert output.isEmpty() at the end to catch records you did not expect (leaks, double-emits).
  • Use readKeyValuesToMap() for KTable/aggregation results and readValuesToList() for ordered stream results.
  • Reserve embedded-broker or Testcontainers tests for serde-over-the-wire and config concerns; keep the bulk of coverage in fast TopologyTestDriver unit tests.
Last updated June 1, 2026
Was this helpful?