Producer Interceptors
A ProducerInterceptor lets you inject cross-cutting behavior into the producer’s send path without touching your business code. Every record that flows through the producer passes through the interceptor, making it the natural place for distributed tracing, auditing, payload mutation, and custom metrics. Because interceptors are configured declaratively, you can layer the same observability logic across many services by shipping a single library and a one-line config change.
How interceptors fit into the send path
When you call producer.send(record), the Kafka client runs the record through every configured interceptor before serialization and partition assignment. Interceptors form a chain: the output of one is fed as the input to the next, in the order they are listed in interceptor.classes. The final record returned by the chain is what actually gets serialized and queued.
The ProducerInterceptor<K, V> interface defines two callbacks:
| Method | When it runs | Thread | Typical use |
|---|---|---|---|
onSend(ProducerRecord<K,V>) | Synchronously on the calling thread, before serialization | Your application thread | Mutate the record, add headers, count sends |
onAcknowledgement(RecordMetadata, Exception) | When the broker acks or the send fails | Producer I/O thread | Record latency, success/failure metrics |
Both callbacks run on hot paths.
onSendruns on your application thread andonAcknowledgementruns on the single producer I/O thread shared by all sends. Keep them fast and non-blocking — never call a remote service or take a lock that could stall, or you will throttle throughput for the entire producer.
A key detail: onSend may mutate and return a new ProducerRecord. Most interceptors should copy the original and add to it (for example, append a header) rather than dropping fields, because the next interceptor in the chain and the producer itself rely on the record being well-formed.
A tracing-and-metrics interceptor
The example below stamps a trace ID header onto every outbound record (so downstream consumers can correlate the message with the originating request) and counts successes and failures in onAcknowledgement.
package com.devcraftly.kafka;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.LongAdder;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
public static final String TRACE_HEADER = "x-trace-id";
private final LongAdder sent = new LongAdder();
private final LongAdder acked = new LongAdder();
private final LongAdder failed = new LongAdder();
@Override
public void configure(Map<String, ?> configs) {
// Read custom config keys here if needed, e.g. configs.get("trace.service.name").
}
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
Headers headers = record.headers();
if (headers.lastHeader(TRACE_HEADER) == null) {
String traceId = UUID.randomUUID().toString();
headers.add(TRACE_HEADER, traceId.getBytes(StandardCharsets.UTF_8));
}
sent.increment();
// Returning the same record is fine; headers were mutated in place.
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
acked.increment();
} else {
failed.increment();
}
}
@Override
public void close() {
System.out.printf("Producer interceptor closing: sent=%d acked=%d failed=%d%n",
sent.sum(), acked.sum(), failed.sum());
}
}
Wiring it up
Interceptors are activated through the interceptor.classes property — a comma-separated list of fully-qualified class names. With the plain kafka-clients API you set it on the producer config map:
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.devcraftly.kafka.TracingProducerInterceptor");
try (var producer = new KafkaProducer<String, String>(props)) {
producer.send(new ProducerRecord<>("orders", "order-42", "{\"id\":42}"));
producer.flush();
}
Output:
Producer interceptor closing: sent=1 acked=1 failed=0
In Spring Boot, expose the same property under spring.kafka.producer.properties so the auto-configured ProducerFactory picks it up:
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
interceptor.classes: com.devcraftly.kafka.TracingProducerInterceptor
To chain multiple interceptors, list them in order — the first runs first:
interceptor.classes=com.devcraftly.kafka.TracingProducerInterceptor,com.devcraftly.kafka.AuditProducerInterceptor
Interceptors are instantiated by Kafka via the no-arg constructor and configured through
configure(Map), so they are not Spring beans. You cannot inject other beans into them. If you need a shared collaborator (like aMeterRegistry), expose it through a static holder or read connection details from the config map passed toconfigure.
Error handling and isolation
Exceptions thrown from onSend or onAcknowledgement are caught and logged by the Kafka client; they do not propagate to your send() call and do not abort the send. This protects throughput, but it also means a buggy interceptor can silently swallow work. Guard your logic defensively and avoid letting one interceptor’s failure corrupt the record for the next one in the chain.
Best practices
- Keep
onSendandonAcknowledgementallocation-light and non-blocking; offload anything slow (network calls, disk writes) to an async queue. - Prefer mutating headers over rewriting the whole record, and never null out the key, value, topic, or partition.
- Use thread-safe counters (
LongAdder,AtomicLong) becauseonAcknowledgementruns on a shared I/O thread. - Make interceptors idempotent on retries —
onSendcan run again for the same logical message if you resend, so check for an existing header before adding one. - Ship reusable interceptors as a small library so every service enables tracing with a single
interceptor.classesline. - Log and swallow internal errors rather than throwing, and add a metric for interceptor failures so silent breakage is visible.
- Always implement
close()to flush buffered metrics and release any resources.