Skip to content
Apache Kafka kf producers 4 min read

Producer Interceptors

A ProducerInterceptor lets you inject cross-cutting behavior into the producer’s send path without touching your business code. Every record that flows through the producer passes through the interceptor, making it the natural place for distributed tracing, auditing, payload mutation, and custom metrics. Because interceptors are configured declaratively, you can layer the same observability logic across many services by shipping a single library and a one-line config change.

How interceptors fit into the send path

When you call producer.send(record), the Kafka client runs the record through every configured interceptor before serialization and partition assignment. Interceptors form a chain: the output of one is fed as the input to the next, in the order they are listed in interceptor.classes. The final record returned by the chain is what actually gets serialized and queued.

The ProducerInterceptor<K, V> interface defines two callbacks:

MethodWhen it runsThreadTypical use
onSend(ProducerRecord<K,V>)Synchronously on the calling thread, before serializationYour application threadMutate the record, add headers, count sends
onAcknowledgement(RecordMetadata, Exception)When the broker acks or the send failsProducer I/O threadRecord latency, success/failure metrics

Both callbacks run on hot paths. onSend runs on your application thread and onAcknowledgement runs on the single producer I/O thread shared by all sends. Keep them fast and non-blocking — never call a remote service or take a lock that could stall, or you will throttle throughput for the entire producer.

A key detail: onSend may mutate and return a new ProducerRecord. Most interceptors should copy the original and add to it (for example, append a header) rather than dropping fields, because the next interceptor in the chain and the producer itself rely on the record being well-formed.

A tracing-and-metrics interceptor

The example below stamps a trace ID header onto every outbound record (so downstream consumers can correlate the message with the originating request) and counts successes and failures in onAcknowledgement.

package com.devcraftly.kafka;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.LongAdder;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;

public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {

    public static final String TRACE_HEADER = "x-trace-id";

    private final LongAdder sent = new LongAdder();
    private final LongAdder acked = new LongAdder();
    private final LongAdder failed = new LongAdder();

    @Override
    public void configure(Map<String, ?> configs) {
        // Read custom config keys here if needed, e.g. configs.get("trace.service.name").
    }

    @Override
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        Headers headers = record.headers();
        if (headers.lastHeader(TRACE_HEADER) == null) {
            String traceId = UUID.randomUUID().toString();
            headers.add(TRACE_HEADER, traceId.getBytes(StandardCharsets.UTF_8));
        }
        sent.increment();
        // Returning the same record is fine; headers were mutated in place.
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            acked.increment();
        } else {
            failed.increment();
        }
    }

    @Override
    public void close() {
        System.out.printf("Producer interceptor closing: sent=%d acked=%d failed=%d%n",
                sent.sum(), acked.sum(), failed.sum());
    }
}

Wiring it up

Interceptors are activated through the interceptor.classes property — a comma-separated list of fully-qualified class names. With the plain kafka-clients API you set it on the producer config map:

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        "com.devcraftly.kafka.TracingProducerInterceptor");

try (var producer = new KafkaProducer<String, String>(props)) {
    producer.send(new ProducerRecord<>("orders", "order-42", "{\"id\":42}"));
    producer.flush();
}

Output:

Producer interceptor closing: sent=1 acked=1 failed=0

In Spring Boot, expose the same property under spring.kafka.producer.properties so the auto-configured ProducerFactory picks it up:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        interceptor.classes: com.devcraftly.kafka.TracingProducerInterceptor

To chain multiple interceptors, list them in order — the first runs first:

interceptor.classes=com.devcraftly.kafka.TracingProducerInterceptor,com.devcraftly.kafka.AuditProducerInterceptor

Interceptors are instantiated by Kafka via the no-arg constructor and configured through configure(Map), so they are not Spring beans. You cannot inject other beans into them. If you need a shared collaborator (like a MeterRegistry), expose it through a static holder or read connection details from the config map passed to configure.

Error handling and isolation

Exceptions thrown from onSend or onAcknowledgement are caught and logged by the Kafka client; they do not propagate to your send() call and do not abort the send. This protects throughput, but it also means a buggy interceptor can silently swallow work. Guard your logic defensively and avoid letting one interceptor’s failure corrupt the record for the next one in the chain.

Best practices

  • Keep onSend and onAcknowledgement allocation-light and non-blocking; offload anything slow (network calls, disk writes) to an async queue.
  • Prefer mutating headers over rewriting the whole record, and never null out the key, value, topic, or partition.
  • Use thread-safe counters (LongAdder, AtomicLong) because onAcknowledgement runs on a shared I/O thread.
  • Make interceptors idempotent on retries — onSend can run again for the same logical message if you resend, so check for an existing header before adding one.
  • Ship reusable interceptors as a small library so every service enables tracing with a single interceptor.classes line.
  • Log and swallow internal errors rather than throwing, and add a metric for interceptor failures so silent breakage is visible.
  • Always implement close() to flush buffered metrics and release any resources.
Last updated June 1, 2026
Was this helpful?