Testing with EmbeddedKafka
Unit tests with mocks tell you that your code calls KafkaTemplate.send(), but they never prove that a record is actually serialized, routed to the right topic, and picked up by your @KafkaListener. For that you need a real broker. The spring-kafka-test module ships an in-memory Kafka broker you can spin up inside a JUnit test with a single annotation, giving you fast, deterministic integration tests that exercise the full produce-and-consume path without Docker or an external cluster.
Adding the test dependency
spring-kafka-test brings in the embedded broker, KafkaTestUtils, and JUnit support. It is test-scoped, so it never ships in your application artifact.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
Modern versions of the embedded broker run in KRaft mode by default — there is no ZooKeeper involved, and startup typically takes a couple of seconds.
How @EmbeddedKafka wires up
Annotating a @SpringBootTest class with @EmbeddedKafka starts a broker before the context loads and publishes its bootstrap address to the Spring Environment under the property spring.embedded.kafka.brokers. The trick is to point your auto-configured producer and consumer factories at that property so the application code under test connects to the embedded broker rather than localhost:9092.
| Attribute | Purpose |
|---|---|
partitions | Partitions created for each auto-created topic |
topics | Topics to create up front so consumers can subscribe immediately |
bootstrapServersProperty | Property name to populate with the broker address |
count | Number of broker nodes in the cluster |
kraft | Whether to run in KRaft mode (default true) |
By setting bootstrapServersProperty = "spring.kafka.bootstrap-servers", Spring Boot’s Kafka auto-configuration picks up the embedded address transparently.
Asserting a listener consumed a record
The most common scenario is end-to-end: produce a record and verify that your real @KafkaListener was invoked. Because consumption is asynchronous, use a CountDownLatch to wait for the listener with a bounded timeout instead of sleeping.
package com.devcraftly.kafka;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest
@EmbeddedKafka(
partitions = 1,
topics = "orders",
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class OrdersKafkaIntegrationTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private TestListener testListener;
@Autowired
private EmbeddedKafkaBroker broker;
@Test
void listenerConsumesPublishedRecord() throws Exception {
kafkaTemplate.send("orders", "order-42", "CONFIRMED");
boolean received = testListener.latch.await(5, TimeUnit.SECONDS);
assertThat(received).isTrue();
assertThat(testListener.payload.get()).isEqualTo("CONFIRMED");
}
@TestConfiguration
static class TestListener {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<String> payload = new AtomicReference<>();
@KafkaListener(topics = "orders", groupId = "test-group")
void onMessage(String message) {
payload.set(message);
latch.countDown();
}
}
}
The await returns true only if the listener fired within the timeout, so the assertion fails fast on a misconfigured topic or serializer rather than hanging forever.
Consuming directly with KafkaTestUtils
Sometimes you only want to verify what your producer wrote, without involving a listener. KafkaTestUtils builds a throwaway consumer bound to the embedded broker and polls records for you. This is ideal for testing a service that publishes domain events.
@Test
void producerWritesExpectedRecord() {
kafkaTemplate.send("orders", "order-99", "SHIPPED");
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps("verifier-group", "true", broker);
consumerProps.put("key.deserializer",
org.apache.kafka.common.serialization.StringDeserializer.class);
consumerProps.put("value.deserializer",
org.apache.kafka.common.serialization.StringDeserializer.class);
try (Consumer<String, String> consumer =
new KafkaConsumer<>(consumerProps)) {
broker.consumeFromAnEmbeddedTopic(consumer, "orders");
ConsumerRecord<String, String> record =
KafkaTestUtils.getSingleRecord(consumer, "orders",
Duration.ofSeconds(5));
assertThat(record.key()).isEqualTo("order-99");
assertThat(record.value()).isEqualTo("SHIPPED");
}
}
For multiple records, use KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(5)), which returns a ConsumerRecords<K, V> collection you can iterate and assert over.
Output:
ConsumerRecord(topic = orders, partition = 0, offset = 0,
key = order-99, value = SHIPPED)
Always set
auto.offset.resettoearliest(the second argument ofconsumerPropsdoes this) — otherwise a fresh consumer may join after your record was produced and read nothing.
Testcontainers as an alternative
@EmbeddedKafka is fast and dependency-free, but it runs a JVM-embedded broker that can differ subtly from a production Apache Kafka image. When you need exact parity — specific broker versions, real network behavior, or features like SASL — use Testcontainers with the KafkaContainer (or the official apache/kafka image), wiring its bootstrap servers in via @DynamicPropertySource.
@Testcontainers
@SpringBootTest
class OrdersTestcontainersTest {
@Container
static final KafkaContainer KAFKA =
new KafkaContainer("apache/kafka:3.9.0");
@DynamicPropertySource
static void kafkaProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers",
KAFKA::getBootstrapServers);
}
}
Use embedded Kafka for the bulk of your fast feedback loop and reserve Testcontainers for a smaller suite of high-fidelity checks.
Best practices
- Inject the embedded address via
bootstrapServersProperty = "spring.kafka.bootstrap-servers"so application code uses Spring Boot’s normal auto-configuration unchanged. - Replace
Thread.sleepwith aCountDownLatch(or Awaitility) and a bounded timeout to keep tests deterministic and fast. - Set
auto.offset.reset=earlieston test consumers so they never miss records produced before they subscribe. - Always close test consumers (use try-with-resources) to free broker connections between tests.
- Give each test a unique
groupIdto avoid cross-test offset interference when sharing a class-level broker. - Pre-declare topics in
@EmbeddedKafka(topics = ...)so listeners can subscribe immediately instead of racing topic auto-creation. - Promote to Testcontainers for the handful of tests that must match a specific broker version or security configuration.