Change Data Capture with Debezium
Change Data Capture (CDC) turns a database’s write-ahead log into a stream of events: every insert, update, and delete that hits a table is captured and published to Kafka, in commit order, with almost no load on the source. Debezium is the de-facto open-source CDC platform, shipping as a family of Kafka Connect source connectors for MySQL, PostgreSQL, MongoDB, SQL Server, Oracle, and more. Unlike a JDBC source that polls and re-queries tables, Debezium reads the database’s own transaction log — MySQL’s binlog or Postgres’s WAL — so it sees deletes, never misses a change between polls, and captures the exact before/after state of every row. This page explains how log-based CDC works, the shape of a Debezium change event, snapshotting, and how to configure the MySQL and Postgres connectors.
Why log-based CDC beats polling
A polling source connector issues a SELECT ... WHERE updated_at > ? on a schedule. That approach can’t see hard deletes (the row is simply gone), misses intermediate updates that happen between two polls, and adds query load that grows with table size. Log-based CDC sidesteps all of this by tailing the structure the database already maintains for replication and crash recovery.
| Aspect | JDBC polling source | Debezium (log-based CDC) |
|---|---|---|
| Captures deletes | No | Yes |
| Captures every intermediate update | No (only latest at poll time) | Yes (every committed change) |
| Source load | Repeated SELECT queries | Reads the log, near-zero query load |
| Ordering | Per-poll, best-effort | Strict commit order |
| Requires schema column | Needs a monotonic/timestamp column | No application schema requirement |
Log-based CDC requires database-side configuration: MySQL needs
binlog_format=ROWandbinlog_row_image=FULL; Postgres needswal_level=logical. Without these the connector cannot capture full row images and will fail to start.
The Debezium change-event envelope
Every captured change becomes a Kafka record whose value is a structured envelope. The key carries the row’s primary key; the value carries the before state, the after state, an op code, and source metadata describing where the change came from in the log.
{
"before": null,
"after": {
"id": 1001,
"email": "[email protected]",
"status": "ACTIVE"
},
"source": {
"db": "shop",
"table": "customers",
"ts_ms": 1717200000000,
"pos": 154892,
"snapshot": "false"
},
"op": "c",
"ts_ms": 1717200000123
}
The op field is the most important part of the envelope:
op | Meaning | before | after |
|---|---|---|---|
c | Create (insert) | null | new row |
u | Update | old row | new row |
d | Delete | old row | null |
r | Read (snapshot row) | null | row image |
A delete also emits a follow-up tombstone record — the same key with a null value — so log-compacted topics can physically remove the deleted key. You can disable tombstones with tombstones.on.delete=false when you don’t compact.
Snapshotting: capturing existing data
When a connector starts for the first time it has no log offset to resume from, and the historical rows that existed before CDC began are not in the log. Debezium solves this with a snapshot: it reads the current contents of the configured tables and emits them as op: "r" (“read”) events, then seamlessly switches to streaming live changes from the log position captured at snapshot start. The snapshot.mode setting controls this behaviour:
snapshot.mode | Behaviour |
|---|---|
initial (default) | Snapshot tables on first start, then stream |
initial_only | Snapshot once, then stop |
no_data (schema_only) | Capture schema only, stream changes from now on |
when_needed | Snapshot only if no valid offset/log position exists |
Configuring a Debezium MySQL connector
You register the connector by POSTing JSON to the Connect REST API. Debezium needs unique topic.prefix (the logical server name) and database.server.id values, plus a Kafka topic to store schema-change history.
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "shop-mysql-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz-secret",
"database.server.id": "184054",
"topic.prefix": "shop",
"database.include.list": "shop",
"table.include.list": "shop.customers,shop.orders",
"snapshot.mode": "initial",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.shop",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}'
Records for the customers table land on the topic shop.shop.customers (<topic.prefix>.<database>.<table>).
Configuring a Debezium PostgreSQL connector
The Postgres connector reads logical decoding output from the WAL. The modern default plugin is pgoutput, which is built into Postgres and needs no extra extension. It also requires a replication slot that persists the connector’s WAL position.
{
"name": "shop-postgres-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz-secret",
"database.dbname": "shop",
"topic.prefix": "shop",
"plugin.name": "pgoutput",
"slot.name": "debezium_shop",
"publication.autocreate.mode": "filtered",
"table.include.list": "public.customers,public.orders",
"snapshot.mode": "initial"
}
}
The replication slot retains WAL until Debezium consumes it. If the connector is stopped for a long time the WAL accumulates and can fill the disk — monitor
pg_replication_slotsand drop unused slots, or the database can grind to a halt.
Reading change events
The envelope is verbose. For most downstream consumers you only want the after state, so Debezium ships the ExtractNewRecordState single-message transform that flattens the envelope down to the new row (and re-routes deletes). Add it to the connector config:
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
A Spring @KafkaListener can then consume the simplified record as a plain DTO record:
public record CustomerChange(Long id, String email, String status) {}
@KafkaListener(topics = "shop.shop.customers", groupId = "cdc-consumer")
public void onCustomerChange(CustomerChange change) {
log.info("Customer {} now {}", change.id(), change.status());
}
Output:
Customer 1001 now ACTIVE
Customer 1001 now SUSPENDED
Best Practices
- Configure the database first:
binlog_format=ROW/binlog_row_image=FULLfor MySQL,wal_level=logicalfor Postgres, before registering the connector. - Give the CDC user least-privilege replication rights (
REPLICATION SLAVE,REPLICATION CLIENTon MySQL;REPLICATIONrole on Postgres) rather than full admin. - Use
table.include.listto capture only the tables you need — narrower scope means smaller snapshots and less log traffic. - Monitor Postgres replication-slot lag and disk usage; a stalled connector silently grows the WAL until the disk fills.
- Run one task per connector (
tasks.max=1) — log readers are single-threaded by design; scale by adding connectors, not tasks. - Use the
ExtractNewRecordStateSMT to flatten the envelope for consumers that don’t need before/after history, but keep the full envelope when you need auditability. - Store schema-change history (MySQL) on a dedicated, replicated internal topic and never delete it, or the connector loses its ability to interpret older log positions.