Skip to content
Apache Kafka kf consumers 4 min read

Seeking & Replaying

Kafka’s log is durable and immutable, so messages stay on the broker until retention expires regardless of whether a consumer has read them. That property lets you rewind a consumer to any point in the log and replay history — to recover from a buggy consumer that committed bad results, to rebuild a downstream cache or read model, or to backfill a new feature against months of past events. The mechanism is the seek family of methods plus offsetsForTimes, which together let you position the consumer at an arbitrary offset before the next poll.

How seeking works

A consumer tracks a fetch position per assigned partition. Each poll() returns records starting at that position and advances it. seek overrides the position in memory; it does not touch committed offsets and takes effect on the next poll. Because seeking operates on assigned partitions, the consumer must already own the partitions you want to move — call poll() once (or wait for the assignment callback) before seeking.

var props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "orders-replay");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

try (var consumer = new KafkaConsumer<String, String>(props)) {
    var tp = new TopicPartition("orders", 0);
    consumer.assign(List.of(tp));   // manual assignment, no group rebalance
    consumer.seek(tp, 1500L);       // next poll starts at offset 1500

    while (true) {
        var records = consumer.poll(Duration.ofMillis(500));
        records.forEach(r -> System.out.printf("%d -> %s%n", r.offset(), r.value()));
    }
}

assign() vs subscribe()

There are two ways to get partitions. subscribe() joins a consumer group and lets Kafka assign partitions dynamically, with rebalances. assign() pins an explicit list of partitions to this consumer with no group coordination — ideal for deterministic replay where you want full control and no rebalances stealing partitions mid-seek.

Aspectsubscribe(topics)assign(partitions)
Partition ownershipAssigned by group coordinatorYou choose explicitly
RebalancingYesNo
Scales across instancesYes (group splits work)No (each does its own list)
Best forNormal consumptionTargeted seek/replay, tooling

When you assign() instead of subscribe(), you opt out of the group’s automatic load balancing. Seeking with subscribe() works too, but a rebalance can revoke the partition you just seeked, so do it inside ConsumerRebalanceListener.onPartitionsAssigned.

seekToBeginning and seekToEnd

To replay an entire partition from the start, use seekToBeginning. To skip everything and only consume new records, use seekToEnd. Both are lazy — the actual offset lookup happens on the next poll.

consumer.assign(List.of(tp));
consumer.seekToBeginning(List.of(tp));   // full replay from earliest retained offset
// or:
consumer.seekToEnd(List.of(tp));         // jump to the tail, ignore backlog

Passing an empty collection seeks all currently assigned partitions, which is handy when you own many.

offsetsForTimes — seek by timestamp

Most operational replays are time-bounded: “reprocess everything since 09:00 today.” offsetsForTimes maps a wall-clock timestamp (epoch millis) to the earliest offset whose record timestamp is greater than or equal to it. You then seek to that offset.

long since = Instant.parse("2026-06-01T09:00:00Z").toEpochMilli();
var query = Map.of(tp, since);

Map<TopicPartition, OffsetAndTimestamp> found = consumer.offsetsForTimes(query);
found.forEach((partition, ot) -> {
    if (ot != null) {                       // null = no record at/after that time
        consumer.seek(partition, ot.offset());
        System.out.printf("Replaying %s from offset %d (ts=%d)%n",
                partition, ot.offset(), ot.timestamp());
    } else {
        consumer.seekToEnd(List.of(partition));   // nothing newer; go to tail
    }
});

Output:

Replaying orders-0 from offset 1842 (ts=1748768400000)

Timestamp lookups rely on the broker’s time index. By default Kafka uses CreateTime (the producer’s timestamp); if a topic is configured with message.timestamp.type=LogAppendTime, lookups resolve against broker append time instead.

Resetting a group’s offsets via CLI

For a whole consumer group, the cleanest way to replay is kafka-consumer-groups.sh --reset-offsets. The group must be inactive (all members stopped) for the reset to apply. Use --dry-run to preview, then --execute.

# Preview: reset every partition of a topic to earliest
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group orders-replay --topic orders \
  --reset-offsets --to-earliest --dry-run

# Execute: rewind to a point in time
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group orders-replay --topic orders \
  --reset-offsets --to-datetime 2026-06-01T09:00:00.000 --execute

# Other targets: --to-latest, --to-offset 5000, --shift-by -100, --by-duration PT1H

Output:

GROUP          TOPIC   PARTITION  NEW-OFFSET
orders-replay  orders  0          1842
orders-replay  orders  1          1610

The CLI rewrites committed offsets in __consumer_offsets. The next time the group runs it resumes from there. In-code seek only moves the live fetch position and never commits unless you call commitSync() afterward.

Best Practices

  • Use a dedicated group id for replays so you never disturb the offsets of the live production consumer group.
  • Make replay consumers idempotent — replaying re-delivers records, so downstream writes must tolerate duplicates (upserts, dedup keys).
  • Prefer assign() for deterministic, single-purpose replay jobs; reserve subscribe() for scaled-out steady-state consumption.
  • Always check offsetsForTimes for null results and fall back to seekToEnd or seekToBeginning.
  • Run CLI offset resets with --dry-run first, and only on a stopped group, then --execute.
  • Mind retention: seekToBeginning and old timestamps can only reach data still on disk; expired segments are gone.
Last updated June 1, 2026
Was this helpful?