Connect Architecture
Kafka Connect is a framework for streaming data between Kafka and external systems without writing bespoke producer or consumer code. Understanding its runtime architecture — how workers host connectors, how connectors fan out into parallel tasks, and where offsets are stored — is the difference between a pipeline that scales and self-heals versus one that silently stalls or duplicates data. This page breaks down those moving parts and contrasts standalone and distributed deployment modes so you can run Connect reliably in production.
The core abstractions
Connect is built from four layered concepts. Each connector you deploy ultimately resolves down to threads doing real I/O against Kafka and your external system.
| Concept | What it is | Lifecycle |
|---|---|---|
| Worker | A JVM process running the Connect runtime; the unit of deployment and scaling | Long-lived; you start/stop the process |
| Connector | A logical job (e.g. “pull this Postgres table”); decides how work is divided | Created/updated/deleted via REST or config |
| Task | A unit of parallel execution that actually moves records | Created by the framework from the connector’s plan |
| Offset | A position marker so a restarted task resumes where it left off | Committed periodically to an internal topic |
A connector’s job is not to move data itself — it is to produce a set of Task configurations. The framework then schedules those tasks across the available workers. A SourceConnector reading 8 partitions can hand back up to 8 task configs, and Connect will spread them out for parallelism, bounded by the connector’s tasks.max setting.
How tasks create parallelism
The connector computes how the source can be sharded, and tasks.max caps how many tasks run concurrently. A JDBC source might split by table; a file source might be single-task; a sink connector partitions consumption by Kafka topic partitions using a standard consumer group.
┌──────────────────────── Connect cluster ────────────────────────┐
│ │
│ Worker A Worker B Worker C │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Task 0 │ │ Task 1 │ │ Task 2 │ │
│ │ Task 3 │ │ Task 4 │ │ │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
└───────┼────────────────────────┼───────────────────────┼─────────┘
│ │ │
┌───────────▼─────────────────────────▼───────────────────────▼────────┐
│ Kafka cluster │
│ topics + internal: connect-configs / connect-offsets / status │
└───────────────────────────────────────────────────────────────────────┘
▲ │
external source (DB, files, API) external sink (S3, ES)
Setting
tasks.maxhigher than the connector can actually shard buys you nothing — a single-table JDBC connector withtasks.max=10still runs one task. Match it to real source parallelism (partitions, tables, shards).
Where offsets live
Source connectors track source offsets — an opaque map describing the last-read position in the external system (a file byte offset, a database SCN/LSN, a timestamp+id). These are committed to the internal connect-offsets topic. Sink connectors are ordinary consumers, so their progress is tracked as standard consumer-group offsets in the __consumer_offsets topic. This separation is why a crashed worker can recover: a rebalance reassigns the task to a healthy worker, which reads the last committed offset and resumes.
Standalone vs distributed mode
Standalone runs a single worker with state stored in a local file — simple, but a single point of failure with no horizontal scaling. Distributed mode runs multiple workers that coordinate through Kafka itself (using the group membership protocol), storing config, offsets, and status in internal topics. This makes the cluster fault tolerant and elastically scalable, and connectors are managed exclusively through the REST API.
| Aspect | Standalone | Distributed |
|---|---|---|
| Processes | One worker | Many workers (a group) |
| State storage | Local file (offset.storage.file.filename) | Internal Kafka topics |
| Fault tolerance | None | Automatic rebalance on failure |
| Scaling | No | Add workers to the group |
| Management | Config file or REST | REST API only |
| Best for | Dev, demos, single-machine agents | Production |
Running distributed workers
A distributed worker is configured with a group.id (all workers sharing it form one cluster) and the names of the three internal topics. Start two or more workers with the same group.id and identical internal-topic names and they automatically discover each other.
# connect-distributed.properties
bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
group.id=connect-prod
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# Internal topics (auto-created; set RF >= 3 in production)
config.storage.topic=connect-configs
config.storage.replication.factor=3
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=5
rest.advertised.host.name=worker1.internal
plugin.path=/opt/kafka/plugins
Launch the worker:
connect-distributed.sh /etc/kafka/connect-distributed.properties
Output:
[INFO] Kafka Connect distributed worker initializing...
[INFO] Joining group connect-prod with generation 4
[INFO] Starting connectors and tasks using config offset 17
[INFO] REST server listening at http://worker1.internal:8083/
[INFO] Finished starting connectors and tasks
Inspect the internal topics that back the cluster’s state:
kafka-topics.sh --bootstrap-server broker1:9092 --list | grep ^connect-
Output:
connect-configs
connect-offsets
connect-status
The
config.storage.topicmust be a single-partition, compacted topic — Connect relies on total ordering of configuration changes. If you pre-create it, use--partitions 1 --config cleanup.policy=compact. Auto-creation handles this for you; manual creation with the wrong partition count will break the cluster.
Best Practices
- Run distributed mode in production with at least 2-3 workers and a replication factor of 3 on every internal topic so connector state survives broker loss.
- Set
tasks.maxto the real parallelism of the source (partitions/tables/shards), not an arbitrary high number. - Give each environment its own unique
group.idand distinct internal-topic names; sharing them across clusters corrupts state. - Keep
plugin.pathclean — one directory per plugin — to avoid classloader conflicts between connector dependencies. - Monitor task status via the REST API (
/connectors/<name>/status) and alert onFAILEDtasks; a failed task does not restart itself. - Pre-create the
config.storage.topicas single-partition + compacted if you manage topics by hand.