Skip to content
Apache Kafka kf consumers 5 min read

Pause, Resume & Flow Control

A Kafka consumer pulls records as fast as poll() is called, but downstream systems — databases, external APIs, rate-limited services — rarely keep up at the same rate. When processing falls behind, you need a way to slow ingestion without abandoning the partition or triggering a rebalance. Kafka gives you two complementary levers: per-partition pause()/resume() for dynamic back-pressure, and fetch-sizing configuration (max.poll.records, fetch.max.bytes) for steady-state memory and throughput control. Used together, they let a consumer apply precise flow control while staying alive in its group.

Why flow control matters

A consumer in a group must call poll() at least every max.poll.interval.ms (default 5 minutes). Miss that deadline and the broker assumes the consumer is dead, removes it from the group, and rebalances its partitions to others. The naive fix for slow processing — simply not polling — is therefore dangerous: it starves the heartbeat/poll loop and gets you evicted.

The correct pattern is to keep polling (which keeps the consumer alive) but stop fetching new records for partitions you cannot currently process. That is exactly what pause() does: a paused partition is skipped during fetches, yet the consumer continues to poll, heartbeat, and participate in the group normally.

Pausing and resuming partitions

pause(Collection<TopicPartition>) tells the consumer to stop returning records for those partitions on subsequent poll() calls. resume(...) reverses it. Both operate on the partitions currently assigned to the consumer, and the pause state is local — it is cleared automatically on rebalance, so you should re-apply it from a rebalance listener if it must survive reassignment.

A common use case is back-pressure against a slow downstream: when an in-flight buffer or bounded queue fills up, pause; when it drains below a threshold, resume.

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collections;
import java.util.List;

public class FlowControlledConsumer {

    private static final int HIGH_WATERMARK = 5_000;
    private static final int LOW_WATERMARK = 1_000;

    void run(KafkaConsumer<String, String> consumer, BoundedSink sink) {
        consumer.subscribe(List.of("orders"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));

            for (ConsumerRecord<String, String> record : records) {
                sink.offer(record.value());
            }

            for (TopicPartition tp : consumer.assignment()) {
                boolean paused = consumer.paused().contains(tp);

                if (!paused && sink.pendingFor(tp) >= HIGH_WATERMARK) {
                    consumer.pause(Collections.singleton(tp));   // back off
                } else if (paused && sink.pendingFor(tp) <= LOW_WATERMARK) {
                    consumer.resume(Collections.singleton(tp));  // catch up
                }
            }

            consumer.commitSync();
        }
    }
}

Even while a partition is paused, the loop keeps calling poll(), so the consumer never violates max.poll.interval.ms and never drops out of the group.

Tip: pause() and resume() are idempotent and cheap. Re-applying pause() to an already-paused partition is a no-op, so you can drive them straight from a watermark check on every loop iteration without tracking extra state.

Fetch sizing for memory control

Back-pressure handles bursts; fetch sizing handles the baseline. These settings cap how much data a single poll() can pull into memory, which directly bounds heap usage and the time each batch takes to process.

SettingDefaultControls
max.poll.records500Max records returned by a single poll() call
fetch.max.bytes52428800 (50 MB)Max data the broker returns per fetch request
max.partition.fetch.bytes1048576 (1 MB)Max data returned per partition per fetch
fetch.min.bytes1Min data the broker waits to accumulate before responding
max.poll.interval.ms300000 (5 min)Max delay between polls before eviction

max.poll.records is the most important knob for slow processing. If each record takes 200 ms to process and you fetch 500 per poll, a single batch needs 100 seconds — still under the 5-minute limit, but tighten the count if processing is slower or more variable. Lowering it shrinks the batch, keeps the poll loop responsive, and reduces the risk of exceeding max.poll.interval.ms.

# Conservative settings for a slow, memory-sensitive consumer
max.poll.records=50
fetch.max.bytes=10485760
max.partition.fetch.bytes=1048576
max.poll.interval.ms=600000
fetch.min.bytes=16384
fetch.max.wait.ms=500

In Spring for Apache Kafka, the same intent is expressed declaratively, and the listener container manages the poll loop and pausing for you.

spring:
  kafka:
    consumer:
      max-poll-records: 50
      properties:
        fetch.max.bytes: 10485760
        max.poll.interval.ms: 600000
    listener:
      type: single
      idle-between-polls: 0

Spring exposes pause/resume at the container level. You can pause a specific listener container — and Spring re-applies the paused state across rebalances — without writing your own loop.

import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;

@Component
public class ConsumerThrottle {

    private final KafkaListenerEndpointRegistry registry;

    public ConsumerThrottle(KafkaListenerEndpointRegistry registry) {
        this.registry = registry;
    }

    public void pause(String listenerId) {
        MessageListenerContainer container = registry.getListenerContainer(listenerId);
        if (container != null) {
            container.pause();   // finishes current batch, then stops fetching
        }
    }

    public void resume(String listenerId) {
        MessageListenerContainer container = registry.getListenerContainer(listenerId);
        if (container != null && container.isPauseRequested()) {
            container.resume();
        }
    }
}

Output:

INFO  o.s.k.l.KafkaMessageListenerContainer - Pausing container; partitions [orders-0, orders-1]
INFO  o.s.k.l.KafkaMessageListenerContainer - Paused consumption from: [orders-0, orders-1]
INFO  o.s.k.l.KafkaMessageListenerContainer - Resuming container; partitions [orders-0, orders-1]

Warning: Pausing does not commit offsets or stop the consumer — lag grows on paused partitions while peers keep consuming. Pause for back-pressure, not as a long-term off switch; monitor consumer lag so a stuck resume() does not silently accumulate an unbounded backlog.

Best practices

  • Keep calling poll() even when paused — it is what preserves group membership; never use a sleep or blocked thread to slow down.
  • Drive pause/resume from explicit high/low watermarks with hysteresis to avoid rapidly flapping between states on every loop.
  • Re-apply pause state in a ConsumerRebalanceListener (plain client) or rely on Spring’s container, since raw pause state is cleared on reassignment.
  • Tune max.poll.records down before raising max.poll.interval.ms; smaller batches keep the loop tight and bound per-record latency better than a longer timeout.
  • Size fetch.max.bytes and max.partition.fetch.bytes to your heap and record size so a single fetch cannot trigger memory pressure.
  • Always alert on consumer lag for paused topics so an unintended or forgotten pause surfaces quickly.
Last updated June 1, 2026
Was this helpful?