Request-Reply Messaging
Kafka is built for fire-and-forget, one-way streaming, but plenty of real systems need a response to a message: validate a payment, look up a price, or score a transaction and wait for the answer. Spring for Apache Kafka offers ReplyingKafkaTemplate, which sends a record to a request topic and returns a Future that completes when the matching reply lands on a separate reply topic. It is a clean abstraction, but it stitches together two asynchronous flows over a broker, so it carries trade-offs you must understand before reaching for it in production.
How request-reply works over Kafka
There is no native concept of a reply in Kafka. Spring layers one on using two ingredients: a correlation ID and a reply topic. When you send a request, ReplyingKafkaTemplate generates a unique correlation ID and writes it into the KafkaHeaders.CORRELATION_ID header along with a KafkaHeaders.REPLY_TOPIC header telling the server where to send the answer. The server processes the request and publishes a response carrying the same correlation ID. A dedicated listener container on the client side consumes the reply topic, matches the correlation ID to the pending Future, and completes it.
Client Broker Server
| send(request) | |
| CORRELATION_ID=abc123 ------> [requests topic] -----------> @KafkaListener
| REPLY_TOPIC=replies | | @SendTo
| | | process + reply
| <----- reply container <----- [replies topic] <------------ CORRELATION_ID=abc123
| Future completes | |
Client side: the ReplyingKafkaTemplate
ReplyingKafkaTemplate wraps a normal producer factory plus a listener container that owns the reply topic. The container must be subscribed to the reply topic and run independently so it can correlate responses as they arrive.
@Configuration
public class ReplyConfig {
@Bean
public ConcurrentMessageListenerContainer<String, PriceResponse> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, PriceResponse> factory) {
ConcurrentMessageListenerContainer<String, PriceResponse> container =
factory.createContainer("price-replies");
container.getContainerProperties().setGroupId("price-client");
container.setAutoStartup(false); // started by the template
return container;
}
@Bean
public ReplyingKafkaTemplate<String, PriceRequest, PriceResponse> replyingKafkaTemplate(
ProducerFactory<String, PriceRequest> pf,
ConcurrentMessageListenerContainer<String, PriceResponse> repliesContainer) {
ReplyingKafkaTemplate<String, PriceRequest, PriceResponse> template =
new ReplyingKafkaTemplate<>(pf, repliesContainer);
template.setDefaultReplyTimeout(Duration.ofSeconds(5));
template.setSharedReplyTopic(true);
return template;
}
}
The DTOs are plain records serialized as JSON (see the JSON serialization page for the producer/consumer factory setup):
public record PriceRequest(String sku, int quantity) {}
public record PriceResponse(String sku, BigDecimal totalPrice) {}
Sending a request and blocking for the answer:
@Service
public class PriceClient {
private final ReplyingKafkaTemplate<String, PriceRequest, PriceResponse> template;
public PriceClient(ReplyingKafkaTemplate<String, PriceRequest, PriceResponse> template) {
this.template = template;
}
public PriceResponse quote(String sku, int quantity)
throws ExecutionException, InterruptedException, TimeoutException {
ProducerRecord<String, PriceRequest> record =
new ProducerRecord<>("price-requests", new PriceRequest(sku, quantity));
record.headers().add(new RecordHeader(
KafkaHeaders.REPLY_TOPIC, "price-replies".getBytes(StandardCharsets.UTF_8)));
RequestReplyFuture<String, PriceRequest, PriceResponse> future =
template.sendAndReceive(record);
ConsumerRecord<String, PriceResponse> reply = future.get(5, TimeUnit.SECONDS);
return reply.value();
}
}
sendAndReceive returns a RequestReplyFuture. Calling get() with a timeout blocks the caller until the reply arrives or the timeout fires — that is what gives the synchronous feel.
Server side: @KafkaListener with @SendTo
On the responding service, the listener returns a value and @SendTo routes it back. When the request carries a REPLY_TOPIC header, Spring sends the response there and copies the correlation ID automatically — you do not manage it by hand.
@Component
public class PriceServer {
@KafkaListener(topics = "price-requests", groupId = "price-service")
@SendTo // honors the REPLY_TOPIC header from the request
public PriceResponse handle(PriceRequest request) {
BigDecimal unit = lookupUnitPrice(request.sku());
BigDecimal total = unit.multiply(BigDecimal.valueOf(request.quantity()));
return new PriceResponse(request.sku(), total);
}
private BigDecimal lookupUnitPrice(String sku) {
return new BigDecimal("19.99");
}
}
Output:
PriceClient.quote("SKU-42", 3) -> PriceResponse[sku=SKU-42, totalPrice=59.97]
If you specify a literal topic in
@SendTo("price-replies"), it is used only as a fallback. TheREPLY_TOPICheader from the request always wins, which lets different clients use their own reply topics against one server.
Correlation and topic strategies
Two reply-topic strategies cover most cases.
| Strategy | How it works | When to use |
|---|---|---|
| Shared reply topic | All client instances share one reply topic; setSharedReplyTopic(true) filters by correlation ID | Few instances, simple topology |
| Per-instance reply topic | Each instance owns a unique reply topic (e.g. suffixed with the hostname) | Many instances; avoids cross-instance traffic |
With a shared topic, every instance consumes every reply but discards those whose correlation ID it did not issue. With many instances this wastes broker traffic, so prefer a unique reply topic per instance for horizontal scale. The reply listener container should use a unique group.id (or no committed offsets) so each instance reads all replies on its topic.
Caveats: this is not truly synchronous
The blocking get() hides a fully asynchronous round trip. Keep these realities in mind:
- A timed-out request does not cancel server work. The server may still process and reply after your
Futurehas already failed; that late reply is simply unmatched and dropped. - Reply topics need partitions and retention like any topic. Short retention is fine since replies are consumed immediately, but they must exist before the template starts.
- Throughput is bounded by request latency because callers block a thread per call. Under load this can exhaust thread pools — size them deliberately.
- Ordering and once-only delivery still follow Kafka semantics; a redelivered request can produce a duplicate reply with the same correlation ID.
Request-reply is a convenience over an event log, not an RPC framework. If you need strict, low-latency synchronous calls, a direct REST/gRPC call is usually the better tool. Reach for Kafka request-reply when you want a reply and the durability, replay, and decoupling of the log.
Best practices
- Always set a sane
defaultReplyTimeoutand pass an explicit timeout tofuture.get(...); never block unbounded. - Make request handlers idempotent so redelivered requests do not corrupt state or send conflicting replies.
- Use a unique reply topic per client instance for scale-out; reserve the shared reply topic for small deployments.
- Pre-create reply topics with appropriate partitions and short retention rather than relying on auto-creation.
- Handle
TimeoutExceptionexplicitly and surface a meaningful error to the caller instead of hanging the request thread. - Prefer JSON records with type-safe deserialization for requests and responses, and validate inputs on the server side.
- For high-throughput, latency-sensitive synchronous needs, evaluate REST/gRPC instead of forcing request-reply onto Kafka.