Skip to content
Apache Kafka kf admin-ops 4 min read

AdminClient API

Shell scripts like kafka-topics.sh are great for one-off tasks, but real operational tooling needs to manage a cluster from code. The AdminClient is Kafka’s official Java API for administrative operations: creating and deleting topics, reading and altering configs, managing ACLs, and inspecting consumer groups. It speaks the same admin protocol the CLI tools use, so anything you can do from bin/ you can do programmatically — inside provisioning jobs, health checks, CI pipelines, or self-service platform APIs. Because every operation is asynchronous and returns a future, AdminClient fits naturally into automation that must create resources on demand and verify the result before proceeding.

Creating an AdminClient

AdminClient is a thread-safe, long-lived object. Build one from a Properties map, reuse it across operations, and close it when done. Only bootstrap.servers is strictly required; in production you also supply security settings.

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import java.util.Properties;

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30_000);

try (Admin admin = Admin.create(props)) {
    // run admin operations here
}

Every method returns a *Result object whose accessors hand back KafkaFuture instances. Call .get() to block until the broker responds, or chain non-blocking callbacks. Failures surface as an ExecutionException wrapping a Kafka error such as TopicExistsException or UnknownTopicOrPartitionException.

Creating and listing topics

createTopics takes a collection of NewTopic, each describing a name, partition count, and replication factor. Wrapping .get() in a try/catch lets you treat “already exists” as success — a common pattern in idempotent provisioning.

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.errors.TopicExistsException;
import java.util.*;
import java.util.concurrent.ExecutionException;

NewTopic orders = new NewTopic("orders", 3, (short) 1)
        .configs(Map.of(
            "cleanup.policy", "delete",
            "retention.ms", "604800000"));

CreateTopicsResult result = admin.createTopics(List.of(orders));
try {
    result.all().get();
    System.out.println("Created topic 'orders'");
} catch (ExecutionException e) {
    if (e.getCause() instanceof TopicExistsException) {
        System.out.println("Topic 'orders' already exists");
    } else {
        throw e;
    }
}

Set<String> topics = admin.listTopics().names().get();
System.out.println("Topics: " + topics);

Output:

Created topic 'orders'
Topics: [orders, __consumer_offsets]

By default listTopics() hides internal topics like __consumer_offsets. Pass a ListTopicsOptions().listInternal(true) to include them.

Describing and deleting topics

describeTopics returns a TopicDescription per topic, exposing each partition’s leader, replicas, and in-sync replica (ISR) set — exactly what you need to verify replication health after a create.

import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartitionInfo;

Map<String, TopicDescription> described =
        admin.describeTopics(List.of("orders")).allTopicNames().get();

TopicDescription desc = described.get("orders");
System.out.println("Partitions: " + desc.partitions().size());
for (TopicPartitionInfo p : desc.partitions()) {
    System.out.printf("  p%d leader=%d isr=%d%n",
        p.partition(), p.leader().id(), p.isr().size());
}

Output:

Partitions: 3
  p0 leader=1 isr=1
  p1 leader=1 isr=1
  p2 leader=1 isr=1

Deleting is just as direct. deleteTopics marks the topics for deletion; the call returns once the controller accepts it, though physical log removal happens asynchronously.

admin.deleteTopics(List.of("orders")).all().get();

Topic deletion is irreversible and asynchronous. After deleteTopics().all().get() succeeds, a brief window remains where the topic still appears in metadata. If you immediately recreate the same name, retry on TopicExistsException until the old topic fully drains.

Reading and altering configs

Configs are addressed by ConfigResource, which pairs a type (TOPIC, BROKER) with a name. Use describeConfigs to read effective settings and incrementalAlterConfigs — the modern, additive API — to change them. Avoid the deprecated alterConfigs, which overwrites the entire config set and silently drops anything you omit.

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType;

ConfigResource topicRes = new ConfigResource(ConfigResource.Type.TOPIC, "orders");

// Read one config value
Config cfg = admin.describeConfigs(List.of(topicRes)).all().get().get(topicRes);
System.out.println("retention.ms = " + cfg.get("retention.ms").value());

// Change retention without disturbing other settings
AlterConfigOp op = new AlterConfigOp(
        new ConfigEntry("retention.ms", "259200000"), OpType.SET);
admin.incrementalAlterConfigs(Map.of(topicRes, List.of(op))).all().get();
OpTypeEffect
SETSet or overwrite a single config key
DELETERemove an override, reverting to the broker default
APPENDAdd values to a list-type config (e.g. cleanup.policy)
SUBTRACTRemove values from a list-type config

Inspecting consumer groups

AdminClient is the programmatic equivalent of kafka-consumer-groups.sh. listConsumerGroups enumerates groups, describeConsumerGroups reports state and members, and listConsumerGroupOffsets returns committed offsets per partition — the inputs for a lag calculation.

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;

for (ConsumerGroupListing g : admin.listConsumerGroups().all().get()) {
    System.out.println("Group: " + g.groupId());
}

ConsumerGroupDescription cg = admin.describeConsumerGroups(List.of("order-processors"))
        .all().get().get("order-processors");
System.out.printf("State: %s, members: %d%n", cg.state(), cg.members().size());

Map<TopicPartition, OffsetAndMetadata> offsets =
        admin.listConsumerGroupOffsets("order-processors")
             .partitionsToOffsetAndMetadata().get();
offsets.forEach((tp, om) ->
        System.out.printf("  %s-%d committed=%d%n", tp.topic(), tp.partition(), om.offset()));

Output:

Group: order-processors
State: STABLE, members: 2
  orders-0 committed=120
  orders-1 committed=98
  orders-2 committed=205

To compute lag, pair these committed offsets with the partitions’ end offsets from admin.listOffsets(...) using OffsetSpec.latest(), then subtract.

Best Practices

  • Create one AdminClient per application and reuse it; it pools connections and is fully thread-safe. Always close it in a try-with-resources block.
  • Treat provisioning as idempotent — catch TopicExistsException on create and UnknownTopicOrPartitionException on delete so reruns are safe.
  • Prefer incrementalAlterConfigs over the deprecated alterConfigs to avoid accidentally wiping configs you did not explicitly set.
  • Set REQUEST_TIMEOUT_MS_CONFIG and add retry handling; admin calls can fail transiently during controller failover.
  • Never log raw credentials — load security configs (SASL/SSL) from a secrets manager, not hard-coded Properties.
  • Restrict who can call destructive operations with ACLs; an unprotected AdminClient can delete every topic in the cluster.
Last updated June 1, 2026
Was this helpful?