Skip to content
Apache Kafka kf spring 4 min read

Consuming with @KafkaListener

The @KafkaListener annotation is the cornerstone of message consumption in Spring for Apache Kafka. You annotate a method on a Spring-managed bean, and the framework wires up a polling loop, deserialization, threading, and offset management behind the scenes — letting you focus on business logic instead of consumer plumbing. In production this is where the bulk of your event-processing code lives, so understanding how topics, consumer groups, and payload binding work together is essential to building reliable, horizontally scalable consumers.

How @KafkaListener works

When Spring detects @KafkaListener on a bean method, it registers a MessageListenerContainer for that method. The container owns a Kafka consumer that polls the broker, deserializes each record, and invokes your method once per message (or once per batch). The annotated bean must be a Spring component so the container factory can find it during startup.

At minimum you specify the topic(s) and a consumer group. The groupId determines how partitions are shared: every consumer instance using the same group splits the topic’s partitions between them, while consumers in different groups each receive a full copy of the stream.

@Component
public class OrderListener {

    @KafkaListener(topics = "orders", groupId = "order-processing")
    public void onMessage(OrderEvent event) {
        // Spring deserializes the value into OrderEvent automatically
        System.out.println("Received order: " + event.orderId());
    }
}

Payload and header binding

Spring binds the deserialized record value to the method parameter automatically. For finer control — or when you need Kafka metadata such as the topic, partition, offset, or message key — annotate parameters with @Payload and @Header. The constants in KafkaHeaders cover every standard piece of metadata.

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class OrderListener {

    @KafkaListener(topics = "orders", groupId = "order-processing")
    public void onMessage(
            @Payload OrderEvent event,
            @Header(KafkaHeaders.RECEIVED_KEY) String key,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset) {

        System.out.printf(
            "key=%s partition=%d offset=%d order=%s amount=%.2f%n",
            key, partition, offset, event.orderId(), event.amount());
    }
}

Assuming the OrderEvent record produced earlier:

public record OrderEvent(String orderId, String customerId, double amount) {}

Output:

key=ORD-1001 partition=0 offset=42 order=ORD-1001 amount=149.99
key=ORD-1002 partition=1 offset=17 order=ORD-1002 amount=89.50

Use KafkaHeaders.RECEIVED_KEY (not KafkaHeaders.KEY) on the consumer side. The RECEIVED_* constants represent metadata read from an inbound record; the non-prefixed ones are for outbound messages.

Receiving the full ConsumerRecord

When you need everything about a record in one object — including timestamps and the raw header set — declare a parameter of type ConsumerRecord<K, V>. Spring passes the native record straight through, giving you direct access to the underlying Kafka client API.

import org.apache.kafka.clients.consumer.ConsumerRecord;

@Component
public class OrderAuditListener {

    @KafkaListener(topics = "orders", groupId = "order-audit")
    public void audit(ConsumerRecord<String, OrderEvent> record) {
        System.out.printf("topic=%s ts=%d key=%s value=%s%n",
            record.topic(), record.timestamp(), record.key(), record.value());
    }
}

Listening to multiple topics

A single listener can subscribe to several topics by passing an array, or you can match a family of topics with topicPattern (a regex evaluated against broker metadata at refresh time). Pattern matching is handy for multi-tenant or sharded topic naming schemes.

@KafkaListener(topics = {"orders", "orders-priority"}, groupId = "order-processing")
public void onMessage(OrderEvent event) { /* ... */ }

@KafkaListener(topicPattern = "orders\\..*", groupId = "order-processing")
public void onTenantMessage(OrderEvent event) { /* ... */ }

id and containerFactory

Two attributes give you operational control over the listener. id names the container so you can pause, resume, or inspect it via the KafkaListenerEndpointRegistry. containerFactory selects a non-default ConcurrentKafkaListenerContainerFactory bean — useful when one application consumes value types that need different deserializers or concurrency settings.

AttributePurposeDefault
topicsExplicit topic names to subscribe tonone (required unless topicPattern)
topicPatternRegex matched against existing topicsnone
groupIdConsumer group; overrides factory-level configfactory group.id
idUnique container id for runtime lookup/controlgenerated
containerFactoryBean name of the container factory to usekafkaListenerContainerFactory
concurrencyNumber of consumer threads (partitions permitting)factory concurrency
@KafkaListener(
    id = "order-processor",
    topics = "orders",
    groupId = "order-processing",
    containerFactory = "orderListenerContainerFactory",
    concurrency = "3")
public void onMessage(OrderEvent event) { /* ... */ }

Setting concurrency higher than the topic’s partition count wastes threads — extra consumers in the group sit idle because partitions are the unit of parallelism. Size concurrency to your partitions, not the other way around.

Best Practices

  • Always set an explicit groupId per logical consumer so rebalancing and offset tracking behave predictably across deployments and scaling events.
  • Prefer typed payloads (records) plus @Header parameters over parsing the raw ConsumerRecord unless you genuinely need broker metadata or the native record.
  • Keep listener methods fast and non-blocking; offload slow I/O so you do not stall the poll loop and trigger a rebalance from max.poll.interval.ms expiry.
  • Give long-lived or operationally important listeners a stable id so you can pause/resume them through the KafkaListenerEndpointRegistry.
  • Use a dedicated containerFactory when different listeners need different deserializers, error handlers, or concurrency rather than overloading one factory.
  • Match concurrency to partition count, and remember that throwing from a listener triggers the configured error handler — design for retries and dead-letter routing explicitly.
Last updated June 1, 2026
Was this helpful?