Producer Transactions
The idempotent producer protects you against duplicate writes caused by retries, but it only guarantees exactly-once delivery to a single partition within a single producer session. Real pipelines often need more: writing to several topics atomically, or consuming a record, processing it, and producing a result as one indivisible unit. Kafka transactions extend the idempotent producer to give you atomic, all-or-nothing writes across multiple partitions and topics — the foundation of the exactly-once read-process-write pattern that powers stream processing.
What a transaction guarantees
A Kafka transaction lets a producer group multiple send calls (across any number of partitions and topics) into a single atomic commit. Either every record in the transaction becomes visible to read_committed consumers, or none of them do. If the producer crashes mid-transaction, the broker aborts it and downstream consumers never see the partial output.
Transactions build directly on idempotence, so enabling them turns on idempotence automatically and forces acks=all. You get ordering and deduplication plus atomicity.
The transactional.id
The key to transactions is the transactional.id config. It is a stable, unique identifier that ties a producer to its transactional state on the broker. When a producer with a given transactional.id restarts and calls initTransactions(), the broker fences off any older producer instance using the same ID and rolls back its in-flight transaction. This zombie fencing is what makes exactly-once safe across crashes.
| Property | Value / meaning |
|---|---|
transactional.id | Stable unique string per logical producer; enables transactions and fencing |
enable.idempotence | Forced to true when transactional.id is set |
acks | Forced to all |
transaction.timeout.ms | Max time a transaction may stay open before the broker aborts it (default 60000) |
isolation.level (consumer) | Set to read_committed to hide aborted/in-flight records |
The
transactional.idmust be stable across restarts but unique per producer instance. Two live producers sharing onetransactional.idwill fence each other in a loop. For partitioned read-process-write apps, derive it deterministically from the input partition assignment.
The transaction lifecycle
A transactional producer follows a strict sequence of API calls:
initTransactions()— called once after construction. Registers thetransactional.id, fences zombies, and recovers state.beginTransaction()— starts a new transaction.send(...)— one or more sends enrolled in the current transaction.sendOffsetsToTransaction(...)— (read-process-write only) commit consumer offsets as part of the transaction.commitTransaction()— atomically commits all sends and offsets.abortTransaction()— discards everything in the transaction on error.
Atomic writes across partitions
The simplest use is writing to multiple topics atomically — for example, an order and its audit event must both land or neither must.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-writer-1");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders", "order-42", "{\"id\":42}"));
producer.send(new ProducerRecord<>("audit", "order-42", "created"));
producer.commitTransaction();
System.out.println("Committed both records atomically");
} catch (KafkaException e) {
producer.abortTransaction();
System.out.println("Aborted: " + e.getMessage());
}
}
Output:
Committed both records atomically
Read-process-write for exactly-once
The flagship use case is consuming, transforming, and producing in one atomic step. The trick is sendOffsetsToTransaction: instead of committing consumer offsets separately, you fold them into the producer transaction. The output records and the input offsets commit together, so a record is never reprocessed and its output is never duplicated.
Properties cProps = new Properties();
cProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
cProps.put(ConsumerConfig.GROUP_ID_CONFIG, "etl-group");
cProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
cProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
cProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // offsets committed via the transaction
cProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // never read aborted records
Properties pProps = new Properties();
pProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
pProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "etl-processor-1");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(cProps);
KafkaProducer<String, String> producer = new KafkaProducer<>(pProps)) {
consumer.subscribe(List.of("input"));
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
if (records.isEmpty()) continue;
producer.beginTransaction();
try {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> rec : records) {
producer.send(new ProducerRecord<>("output", rec.key(), rec.value().toUpperCase()));
offsets.put(
new TopicPartition(rec.topic(), rec.partition()),
new OffsetAndMetadata(rec.offset() + 1));
}
// Commit input offsets as part of the same transaction
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction(); // offsets are NOT advanced; records are reprocessed
}
}
}
Always pass consumer.groupMetadata() (not just the group ID) so the broker can enforce fencing against rebalances. Offsets advance only on commit, so an abort safely replays the same input batch.
Why consumers need read_committed
By default consumers use isolation.level=read_uncommitted and will see records from open and aborted transactions. To get exactly-once semantics end to end, downstream consumers must set read_committed. Those consumers read only up to the Last Stable Offset (LSO) — the offset before the earliest open transaction — so a long-running transaction can delay visibility for everyone reading that partition.
Best Practices
- Assign each logical producer a stable, unique
transactional.id; never share one across concurrent instances. - Set downstream consumers to
isolation.level=read_committedor you lose the atomicity guarantee on the read side. - Keep transactions short — long transactions stall
read_committedconsumers at the LSO and risk hittingtransaction.timeout.ms. - For read-process-write, always use
sendOffsetsToTransactionwithconsumer.groupMetadata()and disable auto-commit. - Treat
ProducerFencedException,OutOfOrderSequenceException, andAuthorizationExceptionas fatal — close the producer and exit; abort only on retriableKafkaExceptions. - Prefer Spring Kafka’s
KafkaTransactionManager/@Transactionalor Kafka Streams (processing.guarantee=exactly_once_v2) over hand-rolled loops when possible.