Message Headers & Metadata
Kafka records carry more than just a key and a value: each record has a set of headers, an ordered collection of key/byte-array pairs that travel alongside the payload. Headers are the idiomatic place to put cross-cutting metadata — correlation IDs, trace context, tenant identifiers, schema versions, or content types — without polluting your business payload. Spring for Apache Kafka exposes both the headers you set yourself and the metadata Kafka attaches automatically (topic, partition, offset, timestamp), making them trivial to read and write from a clean, annotation-driven API.
Why headers instead of the payload
Putting routing or tracing data inside the value couples infrastructure concerns to your domain model and forces every consumer to deserialize the full payload just to read a tracing ID. Headers let interceptors, error handlers, and observability tooling inspect metadata cheaply and independently. They are also what Spring’s own machinery (the JsonSerializer type-info headers, the dead-letter KafkaHeaders.DLT_* headers, and Micrometer trace propagation) relies on, so understanding them is essential in production.
Writing headers from a producer
The lowest-level approach uses the native ProducerRecord, whose constructor accepts an Iterable<Header>. Each RecordHeader takes a String key and a byte[] value.
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import java.nio.charset.StandardCharsets;
import java.util.List;
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
"orders", null, "order-42", event,
List.of(new RecordHeader("X-Correlation-Id",
correlationId.getBytes(StandardCharsets.UTF_8))));
kafkaTemplate.send(record);
More commonly in Spring you build a Message<T> with MessageBuilder, which keeps headers as ordinary objects and lets Spring handle serialization. This is the cleanest way to send a payload plus metadata in one shot.
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.kafka.support.KafkaHeaders;
public void publish(OrderEvent event, String correlationId) {
Message<OrderEvent> message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.TOPIC, "orders")
.setHeader(KafkaHeaders.KEY, event.orderId())
.setHeader("X-Correlation-Id", correlationId)
.setHeader("X-Source-Service", "checkout")
.build();
kafkaTemplate.send(message);
}
Custom headers set as plain Strings are serialized by the
KafkaHeaderMapper. By default Spring mapsStringheader values to UTF-8 bytes on the way out and back toStringon the way in, so a header you write as text is read as text on the consumer.
Reading headers and metadata in a listener
On the consuming side, @KafkaListener methods can declare parameters annotated with @Header to pull individual headers, including the well-known metadata headers that Kafka and Spring populate for every record. The constants live in org.springframework.kafka.support.KafkaHeaders.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
@KafkaListener(topics = "orders", groupId = "fulfilment")
public void onOrder(
@Payload OrderEvent event,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(name = "X-Correlation-Id", required = false) String correlationId) {
log.info("topic={} partition={} offset={} key={} corrId={}",
topic, partition, offset, key, correlationId);
}
Setting required = false is important for optional application headers: if the header is absent and the parameter is required, message conversion fails and the record is treated as an error. The metadata headers below are always present.
| Constant | Type | Meaning |
|---|---|---|
KafkaHeaders.RECEIVED_KEY | key type | The record key |
KafkaHeaders.RECEIVED_TOPIC | String | Topic the record came from |
KafkaHeaders.RECEIVED_PARTITION | int | Source partition |
KafkaHeaders.OFFSET | long | Offset within the partition |
KafkaHeaders.RECEIVED_TIMESTAMP | long | Record timestamp (epoch millis) |
KafkaHeaders.GROUP_ID | String | Consumer group id |
Reading all headers at once with @Headers
When you need to forward or inspect the whole header set — for example to copy tracing headers onto a downstream message — inject the immutable MessageHeaders map with @Headers.
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
@KafkaListener(topics = "orders", groupId = "audit")
public void audit(@Payload OrderEvent event, @Headers MessageHeaders headers) {
String corrId = (String) headers.get("X-Correlation-Id");
log.info("auditing order {} (corrId={})", event.orderId(), corrId);
}
Output:
auditing order order-42 (corrId=8f3c1d9e-2b71-4a55-9c0e-4f2b1a7d9e10)
Use case: propagating a correlation ID
A correlation ID lets you stitch together logs across the producer, the broker, and every downstream consumer. The pattern is: generate (or accept) the ID at the edge, attach it as a header on send, read it on receive, and re-attach it to any messages you produce in turn. Because the ID lives in a header rather than the payload, no service needs to know your full domain schema to participate in tracing — and your error handlers and dead-letter routing can log the same ID for any failed record.
Best Practices
- Reserve headers for metadata (tracing, routing, versioning); keep domain data in the payload.
- Read application headers with
required = falseand handlenull, since older or external producers may not set them. - Use the
KafkaHeadersconstants instead of hard-coded strings for metadata so your code survives library upgrades. - Prefix custom headers consistently (e.g.
X-or a company namespace) to avoid clashing with Spring’s reservedkafka_*and__TypeId__headers. - Propagate correlation/trace IDs through every hop, copying inbound headers onto outbound messages.
- Prefer Micrometer’s automatic context propagation over hand-rolled tracing headers when you already have observability wired up.