Skip to content
Apache Kafka kf spring 4 min read

Message Headers & Metadata

Kafka records carry more than just a key and a value: each record has a set of headers, an ordered collection of key/byte-array pairs that travel alongside the payload. Headers are the idiomatic place to put cross-cutting metadata — correlation IDs, trace context, tenant identifiers, schema versions, or content types — without polluting your business payload. Spring for Apache Kafka exposes both the headers you set yourself and the metadata Kafka attaches automatically (topic, partition, offset, timestamp), making them trivial to read and write from a clean, annotation-driven API.

Why headers instead of the payload

Putting routing or tracing data inside the value couples infrastructure concerns to your domain model and forces every consumer to deserialize the full payload just to read a tracing ID. Headers let interceptors, error handlers, and observability tooling inspect metadata cheaply and independently. They are also what Spring’s own machinery (the JsonSerializer type-info headers, the dead-letter KafkaHeaders.DLT_* headers, and Micrometer trace propagation) relies on, so understanding them is essential in production.

Writing headers from a producer

The lowest-level approach uses the native ProducerRecord, whose constructor accepts an Iterable<Header>. Each RecordHeader takes a String key and a byte[] value.

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import java.nio.charset.StandardCharsets;
import java.util.List;

ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
        "orders", null, "order-42", event,
        List.of(new RecordHeader("X-Correlation-Id",
                correlationId.getBytes(StandardCharsets.UTF_8))));

kafkaTemplate.send(record);

More commonly in Spring you build a Message<T> with MessageBuilder, which keeps headers as ordinary objects and lets Spring handle serialization. This is the cleanest way to send a payload plus metadata in one shot.

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.kafka.support.KafkaHeaders;

public void publish(OrderEvent event, String correlationId) {
    Message<OrderEvent> message = MessageBuilder
            .withPayload(event)
            .setHeader(KafkaHeaders.TOPIC, "orders")
            .setHeader(KafkaHeaders.KEY, event.orderId())
            .setHeader("X-Correlation-Id", correlationId)
            .setHeader("X-Source-Service", "checkout")
            .build();

    kafkaTemplate.send(message);
}

Custom headers set as plain Strings are serialized by the KafkaHeaderMapper. By default Spring maps String header values to UTF-8 bytes on the way out and back to String on the way in, so a header you write as text is read as text on the consumer.

Reading headers and metadata in a listener

On the consuming side, @KafkaListener methods can declare parameters annotated with @Header to pull individual headers, including the well-known metadata headers that Kafka and Spring populate for every record. The constants live in org.springframework.kafka.support.KafkaHeaders.

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;

@KafkaListener(topics = "orders", groupId = "fulfilment")
public void onOrder(
        @Payload OrderEvent event,
        @Header(KafkaHeaders.RECEIVED_KEY) String key,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.OFFSET) long offset,
        @Header(name = "X-Correlation-Id", required = false) String correlationId) {

    log.info("topic={} partition={} offset={} key={} corrId={}",
            topic, partition, offset, key, correlationId);
}

Setting required = false is important for optional application headers: if the header is absent and the parameter is required, message conversion fails and the record is treated as an error. The metadata headers below are always present.

ConstantTypeMeaning
KafkaHeaders.RECEIVED_KEYkey typeThe record key
KafkaHeaders.RECEIVED_TOPICStringTopic the record came from
KafkaHeaders.RECEIVED_PARTITIONintSource partition
KafkaHeaders.OFFSETlongOffset within the partition
KafkaHeaders.RECEIVED_TIMESTAMPlongRecord timestamp (epoch millis)
KafkaHeaders.GROUP_IDStringConsumer group id

Reading all headers at once with @Headers

When you need to forward or inspect the whole header set — for example to copy tracing headers onto a downstream message — inject the immutable MessageHeaders map with @Headers.

import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;

@KafkaListener(topics = "orders", groupId = "audit")
public void audit(@Payload OrderEvent event, @Headers MessageHeaders headers) {
    String corrId = (String) headers.get("X-Correlation-Id");
    log.info("auditing order {} (corrId={})", event.orderId(), corrId);
}

Output:

auditing order order-42 (corrId=8f3c1d9e-2b71-4a55-9c0e-4f2b1a7d9e10)

Use case: propagating a correlation ID

A correlation ID lets you stitch together logs across the producer, the broker, and every downstream consumer. The pattern is: generate (or accept) the ID at the edge, attach it as a header on send, read it on receive, and re-attach it to any messages you produce in turn. Because the ID lives in a header rather than the payload, no service needs to know your full domain schema to participate in tracing — and your error handlers and dead-letter routing can log the same ID for any failed record.

Best Practices

  • Reserve headers for metadata (tracing, routing, versioning); keep domain data in the payload.
  • Read application headers with required = false and handle null, since older or external producers may not set them.
  • Use the KafkaHeaders constants instead of hard-coded strings for metadata so your code survives library upgrades.
  • Prefix custom headers consistently (e.g. X- or a company namespace) to avoid clashing with Spring’s reserved kafka_* and __TypeId__ headers.
  • Propagate correlation/trace IDs through every hop, copying inbound headers onto outbound messages.
  • Prefer Micrometer’s automatic context propagation over hand-rolled tracing headers when you already have observability wired up.
Last updated June 1, 2026
Was this helpful?