Source Connectors
A source connector is the half of Kafka Connect that flows data into Kafka: it reads records from an external system — a relational database, a log file, an HTTP API — and produces them onto topics, while Connect tracks how far it has read so it can resume after a restart. In production this is how most pipelines begin, turning existing systems of record into a continuous stream of events without anyone hand-writing a producer. This page shows how to configure a JDBC and a FileStream source via the REST API, how polling and incrementing modes decide what gets pulled, how records map onto topics, and how to confirm the data actually landed.
How a source connector works
A source connector acts as a Kafka producer that you configure declaratively instead of code. Connect splits the connector into one or more tasks (tasks.max), each of which repeatedly polls the external system, converts each result into a Kafka record, and hands it to the runtime to be produced. Crucially, Connect also persists a source offset alongside every batch — an opaque marker that means “this is the last position I read” (a database column value, a file byte offset, a log sequence number). That offset lives in the internal connect-offsets topic, so a crashed or rescheduled task picks up exactly where it stopped rather than re-reading everything.
┌──────────────┐ poll ┌──────────────────┐ produce ┌────────┐
│ PostgreSQL │────────────▶│ source task │────────────▶│ topics │
│ (tables) │ │ + offset store │ └────────┘
└──────────────┘◀────────────└──────────────────┘
resume from last source offset
Configuring a JDBC source connector
The Confluent JDBC source connector reads rows from one or more tables and emits a record per row. You submit it as JSON to the Connect REST API. The example below ingests every table in the shop database whose name starts with the public. prefix and writes each to its own topic.
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "shop-jdbc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "2",
"connection.url": "jdbc:postgresql://db:5432/shop",
"connection.user": "connect",
"connection.password": "secret",
"mode": "timestamp+incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "updated_at",
"table.whitelist": "orders,customers",
"poll.interval.ms": "5000",
"topic.prefix": "shop-",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}'
Polling and incrementing modes
The mode setting is the heart of a JDBC source: it tells the connector how to decide which rows are new since the last poll. Choosing the wrong mode is the most common cause of missing or duplicated data.
| Mode | What it tracks | Catches inserts | Catches updates | Notes |
|---|---|---|---|---|
bulk | nothing | re-reads all rows every poll | n/a | Snapshots the whole table each interval; use for small, static lookup tables. |
incrementing | a strictly increasing column | yes | no | Needs a monotonic key like an auto-increment id. |
timestamp | a last-modified column | yes | yes | Relies on updated_at being set on every change. |
timestamp+incrementing | both columns | yes | yes | Most robust; the id breaks ties when rows share a timestamp. |
Every poll.interval.ms the task runs a query bounded by the stored offset, e.g. WHERE updated_at > ? OR (updated_at = ? AND id > ?), then advances the offset to the maximum it saw. None of these modes detect deletes — for delete-aware capture you want log-based change-data-capture instead (see Debezium).
Do not use
bulkmode against a large table on a short interval. It issues a full table scan every poll and re-emits every row, hammering your database and flooding the topic with duplicates. Reservebulkfor small reference data and use a timestamp/incrementing mode for anything that grows.
Topic mapping
The JDBC source builds each destination topic name as topic.prefix + table name. With topic.prefix: "shop-" and the orders and customers tables, records land in shop-orders and shop-customers. Connect does not auto-create topics unless the broker has auto.create.topics.enable=true or you enable Connect’s own topic creation — in production you should create the topics ahead of time with the partition count and retention you want. By default the record key is null; configuring key.converter plus a primary-key column (or an SMT such as ValueToKey) gives you a meaningful key so log compaction and partitioning behave sensibly.
Configuring a FileStream source connector
For quick demos and local testing, the built-in FileStreamSourceConnector tails a text file line by line and produces each line as a record to one topic. It ships with Kafka itself, so no plugin install is needed.
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "file-source",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max": "1",
"file": "/data/access.log",
"topic": "file-lines",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
The FileStream source is single-task by design (tasks.max above 1 is ignored) and stores the byte offset it has read so it resumes mid-file after a restart. It is excellent for learning Connect and verifying a cluster end-to-end, but it is not meant for production log shipping — use a purpose-built connector for that.
Verifying data lands in the topic
After submitting either connector, confirm it is healthy and then read the topic. First check the connector and its tasks are RUNNING:
curl -s http://localhost:8083/connectors/file-source/status
Output:
{"name":"file-source","connector":{"state":"RUNNING","worker_id":"10.0.0.4:8083"},
"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.0.0.4:8083"}],"type":"source"}
Then append a line to the file and consume from the topic to see it appear:
echo "GET /health 200" >> /data/access.log
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic file-lines --from-beginning --timeout-ms 5000
Output:
GET /health 200
Processed a total of 1 messages
If the topic stays empty, inspect the task error with GET /connectors/<name>/status — a FAILED task includes the stack trace, which is usually a bad connection URL, a missing driver JAR in plugin.path, or a misnamed offset column.
Best Practices
- Pick
timestamp+incrementingfor JDBC sources so you capture both inserts and updates and survive ties on the timestamp column. - Add a database index on the
incrementing.column.nameandtimestamp.column.nameso the bounded poll query stays fast as the table grows. - Set
tasks.maxto the number of tables (one task can own several tables, but never more tasks than tables — extras sit idle). - Pre-create destination topics with the right partitions and retention instead of relying on broker auto-creation.
- Give records a real key (a primary-key column or a
ValueToKeySMT) so partitioning and compaction work as intended. - Reach for log-based CDC (Debezium) when you need deletes, exact ordering, or low-latency capture — query-based JDBC polling cannot see deleted rows.
- Keep FileStream and
bulkmode for demos and small reference data only; never point them at high-volume production sources.