Testing Streams
A Kafka Streams topology is just business logic wired together, and you should be able to test it the same way you test any pure function — fast, in-process, and without external dependencies. The TopologyTestDriver from kafka-streams-test-utils runs your entire topology in a single thread with no broker, no networking, and no real consumer group. You pipe records in, read records out, and inspect state stores directly, all in milliseconds. This makes it the foundation of a fast, deterministic test suite for stateless transformations, joins, windows, and aggregations alike.
Why TopologyTestDriver over an embedded broker
Integration tests against an embedded broker (or Testcontainers) are valuable, but they are slow, flaky under load, and they exercise plumbing you do not own. For the logic in your topology — filters, maps, joins, windowed counts — TopologyTestDriver is the right tool. It uses an in-memory mock time source, so windowing and punctuation are fully under your control: you advance time explicitly rather than waiting on a wall clock.
| Concern | TopologyTestDriver | Embedded broker / Testcontainers |
|---|---|---|
| Speed | Milliseconds per test | Seconds per test |
| Broker required | No | Yes |
| Time control | Deterministic (you advance it) | Real wall-clock |
| Best for | Topology logic, windows, aggregations | End-to-end wiring, serdes over the wire, config |
Adding the dependency
Pull in the test-utils artifact alongside your normal Streams dependency. Match the version to your kafka-streams version.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>3.9.0</version>
<scope>test</scope>
</dependency>
The build-pipe-assert pattern
Every test follows the same three steps: build a Topology, create test topics from the driver, then pipe and assert. Crucially, you build the topology exactly the way production does — extract it into a reusable method so the test and the app share the same code.
Consider a simple topology that uppercases values on a words topic and writes them to WORDS:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
public final class WordTopology {
public static Topology build() {
StreamsBuilder builder = new StreamsBuilder();
builder.stream("words", Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(String::toUpperCase)
.to("WORDS", Produced.with(Serdes.String(), Serdes.String()));
return builder.build();
}
}
The test wires this into the driver. Note the try-with-resources block — closing the driver releases its in-memory state stores so tests stay isolated.
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.jupiter.api.Test;
import java.util.Properties;
import static org.assertj.core.api.Assertions.assertThat;
class WordTopologyTest {
@Test
void uppercasesValues() {
Properties props = new Properties();
props.put("application.id", "word-test");
props.put("bootstrap.servers", "dummy:9092"); // never contacted
try (TopologyTestDriver driver = new TopologyTestDriver(WordTopology.build(), props)) {
TestInputTopic<String, String> in = driver.createInputTopic(
"words", new StringSerializer(), new StringSerializer());
TestOutputTopic<String, String> out = driver.createOutputTopic(
"WORDS", new StringDeserializer(), new StringDeserializer());
in.pipeInput("k1", "hello");
in.pipeInput("k2", "world");
assertThat(out.readKeyValue()).extracting("key", "value")
.containsExactly("k1", "HELLO");
assertThat(out.readValue()).isEqualTo("WORLD");
assertThat(out.isEmpty()).isTrue();
}
}
}
bootstrap.servers is required by config validation but is never actually contacted — the driver short-circuits all I/O.
Reading outputs
TestOutputTopic gives you several ways to drain results depending on what you want to assert:
| Method | Returns | Use when |
|---|---|---|
readValue() | Next value | You only care about the value |
readKeyValue() | KeyValue<K,V> | Key matters (e.g. re-keyed streams) |
readRecord() | TestRecord with headers + timestamp | Asserting headers or event time |
readValuesToList() | All remaining values | Bulk assertion on full output |
readKeyValuesToMap() | Latest value per key | Changelog/upsert semantics (KTable) |
isEmpty() | boolean | Verifying no extra records leaked |
Tip:
readKeyValuesToMap()collapses to the latest value per key, which mirrors KTable update semantics — perfect for asserting the final state of an aggregation rather than every intermediate emission.
Testing windowed and stateful logic deterministically
Because the driver uses a mock clock, time-based logic becomes a pure function of the timestamps you supply. For windowed aggregations, advance time by controlling record timestamps via pipeInput(key, value, timestamp). This is what makes window-boundary tests reliable instead of timing-dependent.
import java.time.Duration;
import java.time.Instant;
@Test
void countsPerTumblingWindow() {
Instant base = Instant.parse("2026-06-01T10:00:00Z");
try (TopologyTestDriver driver = new TopologyTestDriver(ClickTopology.build(), props)) {
TestInputTopic<String, String> in = driver.createInputTopic(
"clicks", new StringSerializer(), new StringSerializer());
TestOutputTopic<String, Long> out = driver.createOutputTopic(
"click-counts", new StringDeserializer(), new LongDeserializer());
// Two events inside the first 1-minute window
in.pipeInput("user-1", "view", base);
in.pipeInput("user-1", "view", base.plus(Duration.ofSeconds(30)));
// One event in the next window
in.pipeInput("user-1", "view", base.plus(Duration.ofSeconds(75)));
var counts = out.readKeyValuesToList();
assertThat(counts).extracting("value").containsExactly(1L, 2L, 1L);
}
}
You can also reach straight into state stores to assert intermediate state without consuming output topics:
KeyValueStore<String, Long> store = driver.getKeyValueStore("counts-store");
assertThat(store.get("user-1")).isEqualTo(3L);
To exercise Punctuator callbacks or suppressed windows that emit on time advance, call driver.advanceWallClockTime(Duration.ofMinutes(1)), which fires wall-clock punctuations deterministically.
Best Practices
- Extract topology construction into a static
build()method shared by production code and tests so you test the real wiring, not a copy. - Always use
try-with-resources (or@AfterEachdriver.close()) to release in-memory state and keep tests isolated. - Drive time explicitly with timestamped
pipeInputandadvanceWallClockTime— never rely on the real clock for window assertions. - Assert
output.isEmpty()at the end to catch records you did not expect (leaks, double-emits). - Use
readKeyValuesToMap()for KTable/aggregation results andreadValuesToList()for ordered stream results. - Reserve embedded-broker or Testcontainers tests for serde-over-the-wire and config concerns; keep the bulk of coverage in fast
TopologyTestDriverunit tests.