Skip to content
Apache Kafka kf spring 4 min read

Testing with EmbeddedKafka

Unit tests with mocks tell you that your code calls KafkaTemplate.send(), but they never prove that a record is actually serialized, routed to the right topic, and picked up by your @KafkaListener. For that you need a real broker. The spring-kafka-test module ships an in-memory Kafka broker you can spin up inside a JUnit test with a single annotation, giving you fast, deterministic integration tests that exercise the full produce-and-consume path without Docker or an external cluster.

Adding the test dependency

spring-kafka-test brings in the embedded broker, KafkaTestUtils, and JUnit support. It is test-scoped, so it never ships in your application artifact.

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

Modern versions of the embedded broker run in KRaft mode by default — there is no ZooKeeper involved, and startup typically takes a couple of seconds.

How @EmbeddedKafka wires up

Annotating a @SpringBootTest class with @EmbeddedKafka starts a broker before the context loads and publishes its bootstrap address to the Spring Environment under the property spring.embedded.kafka.brokers. The trick is to point your auto-configured producer and consumer factories at that property so the application code under test connects to the embedded broker rather than localhost:9092.

AttributePurpose
partitionsPartitions created for each auto-created topic
topicsTopics to create up front so consumers can subscribe immediately
bootstrapServersPropertyProperty name to populate with the broker address
countNumber of broker nodes in the cluster
kraftWhether to run in KRaft mode (default true)

By setting bootstrapServersProperty = "spring.kafka.bootstrap-servers", Spring Boot’s Kafka auto-configuration picks up the embedded address transparently.

Asserting a listener consumed a record

The most common scenario is end-to-end: produce a record and verify that your real @KafkaListener was invoked. Because consumption is asynchronous, use a CountDownLatch to wait for the listener with a bounded timeout instead of sleeping.

package com.devcraftly.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest
@EmbeddedKafka(
        partitions = 1,
        topics = "orders",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class OrdersKafkaIntegrationTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private TestListener testListener;

    @Autowired
    private EmbeddedKafkaBroker broker;

    @Test
    void listenerConsumesPublishedRecord() throws Exception {
        kafkaTemplate.send("orders", "order-42", "CONFIRMED");

        boolean received = testListener.latch.await(5, TimeUnit.SECONDS);

        assertThat(received).isTrue();
        assertThat(testListener.payload.get()).isEqualTo("CONFIRMED");
    }

    @TestConfiguration
    static class TestListener {
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicReference<String> payload = new AtomicReference<>();

        @KafkaListener(topics = "orders", groupId = "test-group")
        void onMessage(String message) {
            payload.set(message);
            latch.countDown();
        }
    }
}

The await returns true only if the listener fired within the timeout, so the assertion fails fast on a misconfigured topic or serializer rather than hanging forever.

Consuming directly with KafkaTestUtils

Sometimes you only want to verify what your producer wrote, without involving a listener. KafkaTestUtils builds a throwaway consumer bound to the embedded broker and polls records for you. This is ideal for testing a service that publishes domain events.

@Test
void producerWritesExpectedRecord() {
    kafkaTemplate.send("orders", "order-99", "SHIPPED");

    Map<String, Object> consumerProps =
            KafkaTestUtils.consumerProps("verifier-group", "true", broker);
    consumerProps.put("key.deserializer",
            org.apache.kafka.common.serialization.StringDeserializer.class);
    consumerProps.put("value.deserializer",
            org.apache.kafka.common.serialization.StringDeserializer.class);

    try (Consumer<String, String> consumer =
                 new KafkaConsumer<>(consumerProps)) {
        broker.consumeFromAnEmbeddedTopic(consumer, "orders");

        ConsumerRecord<String, String> record =
                KafkaTestUtils.getSingleRecord(consumer, "orders",
                        Duration.ofSeconds(5));

        assertThat(record.key()).isEqualTo("order-99");
        assertThat(record.value()).isEqualTo("SHIPPED");
    }
}

For multiple records, use KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(5)), which returns a ConsumerRecords<K, V> collection you can iterate and assert over.

Output:

ConsumerRecord(topic = orders, partition = 0, offset = 0,
  key = order-99, value = SHIPPED)

Always set auto.offset.reset to earliest (the second argument of consumerProps does this) — otherwise a fresh consumer may join after your record was produced and read nothing.

Testcontainers as an alternative

@EmbeddedKafka is fast and dependency-free, but it runs a JVM-embedded broker that can differ subtly from a production Apache Kafka image. When you need exact parity — specific broker versions, real network behavior, or features like SASL — use Testcontainers with the KafkaContainer (or the official apache/kafka image), wiring its bootstrap servers in via @DynamicPropertySource.

@Testcontainers
@SpringBootTest
class OrdersTestcontainersTest {

    @Container
    static final KafkaContainer KAFKA =
            new KafkaContainer("apache/kafka:3.9.0");

    @DynamicPropertySource
    static void kafkaProps(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers",
                KAFKA::getBootstrapServers);
    }
}

Use embedded Kafka for the bulk of your fast feedback loop and reserve Testcontainers for a smaller suite of high-fidelity checks.

Best practices

  • Inject the embedded address via bootstrapServersProperty = "spring.kafka.bootstrap-servers" so application code uses Spring Boot’s normal auto-configuration unchanged.
  • Replace Thread.sleep with a CountDownLatch (or Awaitility) and a bounded timeout to keep tests deterministic and fast.
  • Set auto.offset.reset=earliest on test consumers so they never miss records produced before they subscribe.
  • Always close test consumers (use try-with-resources) to free broker connections between tests.
  • Give each test a unique groupId to avoid cross-test offset interference when sharing a class-level broker.
  • Pre-declare topics in @EmbeddedKafka(topics = ...) so listeners can subscribe immediately instead of racing topic auto-creation.
  • Promote to Testcontainers for the handful of tests that must match a specific broker version or security configuration.
Last updated June 1, 2026
Was this helpful?