Processor API
The Kafka Streams DSL (map, filter, aggregate, joins) covers the vast majority of streaming use cases, but it deliberately hides the underlying execution engine. When you need fine-grained control — emitting records on a timer, accessing multiple state stores from one node, or implementing logic the DSL cannot express — you drop down to the Processor API. It exposes the raw building blocks of a topology: processor nodes, state stores, and a ProcessorContext that lets you forward records and schedule work. In production, the Processor API is the escape hatch that turns “the DSL almost does what I want” into a clean, testable implementation.
Processor and ProcessorSupplier
A processor is a single node in the topology. You implement Processor<KIn, VIn, KOut, VOut>, which receives one record at a time through process(Record). Stateless setup happens in init(ProcessorContext), where you capture the context and look up any state stores by name. A ProcessorSupplier hands a fresh processor instance to each stream task, so never share mutable state across instances.
public class DedupeProcessor implements Processor<String, Order, String, Order> {
private ProcessorContext<String, Order> context;
private KeyValueStore<String, Long> seenStore;
@Override
public void init(ProcessorContext<String, Order> context) {
this.context = context;
this.seenStore = context.getStateStore("seen-orders");
}
@Override
public void process(Record<String, Order> record) {
String id = record.value().orderId();
if (seenStore.get(id) == null) {
seenStore.put(id, record.timestamp());
context.forward(record); // pass downstream only if new
}
// duplicates are silently dropped
}
}
Record carries the key, value, timestamp, and headers. Note the generics: input types may differ from output types, which is how a processor can transform records, not just filter them.
ProcessorContext and forward()
ProcessorContext is your handle to the runtime. The most important method is forward(Record), which sends a record to downstream processors. Unlike the DSL — where one operator produces exactly one output stream — a processor can call forward zero, one, or many times per input record, enabling fan-out, flat-mapping, and conditional emission.
When a node has multiple children, route to a specific one with context.forward(record, "child-name"). The context also exposes metadata about the record currently being processed: recordMetadata() returns the source topic, partition, and offset (useful for logging and dead-letter routing).
Calling
forwardfrom a punctuator uses the timestamp of the triggering punctuation, not a source record. Setrecord.withTimestamp(...)explicitly when downtstream time semantics matter.
Scheduling punctuators
Punctuators let a processor run code on a schedule rather than only when records arrive — essential for emitting aggregates periodically, expiring stale state, or flushing buffers. Schedule one in init with context.schedule(interval, type, punctuator), which returns a Cancellable you can stop later.
| Punctuation type | Clock used | Fires when | Typical use |
|---|---|---|---|
PunctuationType.STREAM_TIME | Event timestamps in the data | Stream time advances past the interval | Time-driven aggregation aligned to event time |
PunctuationType.WALL_CLOCK_TIME | System wall-clock | Real elapsed time passes, even with no input | Heartbeats, idle-stream flushing, TTL cleanup |
@Override
public void init(ProcessorContext<String, Long> context) {
this.context = context;
this.counts = context.getStateStore("counts");
context.schedule(Duration.ofMinutes(1),
PunctuationType.WALL_CLOCK_TIME,
this::emitAndReset);
}
private void emitAndReset(long timestamp) {
try (var iter = counts.all()) {
while (iter.hasNext()) {
var entry = iter.next();
context.forward(new Record<>(entry.key, entry.value, timestamp));
counts.delete(entry.key);
}
}
}
Stream-time punctuation only advances when records flow, so a quiet partition will not fire it. Wall-clock punctuation fires regardless, which is why TTL/cleanup logic should use it.
Direct state store access
The Processor API gives unrestricted, read-write access to any state store the node was wired to. Register the store on the topology (or as a StoreBuilder in the DSL), then fetch it by name in init. You can hold several stores in one processor — something the DSL cannot do in a single operator — and iterate with range() or all() for batch logic inside a punctuator. Remember to close iterators to release resources, as shown above with try-with-resources.
Mixing with the DSL via process()
You rarely build an entire topology by hand. Instead, stay in the DSL for the easy parts and splice in a processor exactly where you need it. KStream.process(...) attaches a ProcessorSupplier (terminal — no return value), while processValues(...) preserves the key and returns a new KStream for further DSL operations. Declare the stores the processor needs as the trailing argument so Streams wires them up.
StoreBuilder<KeyValueStore<String, Long>> store =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("seen-orders"),
Serdes.String(), Serdes.Long());
builder.addStateStore(store);
builder.stream("orders", Consumed.with(Serdes.String(), orderSerde()))
.process(DedupeProcessor::new, "seen-orders") // custom node
.to("orders-deduped"); // back in the DSL
This hybrid style keeps your topology readable while letting the Processor API handle the one piece the DSL can’t.
Best Practices
- Capture
ProcessorContextand resolve state stores ininit, never in the constructor — the context isn’t available until the task starts. - Always wrap store iterators in try-with-resources; leaked iterators pin RocksDB resources and cause memory growth.
- Use
WALL_CLOCK_TIMEfor TTL/cleanup and idle flushing; useSTREAM_TIMEfor event-time-aligned emission. - Prefer
processValuesoverprocesswhen you stay key-partitioned and want to continue with DSL operators downstream. - Keep processor instances stateless beyond their state stores so each task gets an independent, thread-safe copy from the supplier.
- Set explicit timestamps with
record.withTimestamp(...)when forwarding from punctuators to keep downstream windowing correct.