Converters & Serialization
Kafka stores everything as opaque byte arrays, but Kafka Connect works with a richer, structured representation of records. Converters are the pluggable bridge between those two worlds: they serialize Connect’s internal data model into bytes on the way out and deserialize bytes back into that model on the way in. Choosing and aligning converters correctly is one of the most common sources of production Connect failures, so it pays to understand exactly what they do.
The role of a converter
Every Connect worker runs source connectors (Kafka is the sink) and sink connectors (Kafka is the source). Internally, Connect never touches raw bytes directly. Instead, a connector produces or consumes SourceRecord/SinkRecord objects whose keys and values are expressed in Connect’s data model. The converter sits at the edge:
- For a source connector: connector →
Struct/value → converter serializes → bytes → Kafka topic. - For a sink connector: Kafka topic → bytes → converter deserializes →
Struct/value → connector.
You configure converters with two keys that apply per worker (as defaults) or per connector (as overrides):
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
The key and value are handled by separate converters, so it is perfectly valid to use StringConverter for keys and AvroConverter for values.
The Connect data model: Schema and Struct
Connect defines a runtime-neutral type system in the org.apache.kafka.connect.data package. The two central abstractions are Schema (the shape and types of the data) and Struct (a concrete record conforming to a schema). This lets connectors describe data once and let any converter decide how to wire it.
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
Schema orderSchema = SchemaBuilder.struct().name("Order")
.field("id", Schema.INT64_SCHEMA)
.field("customer", Schema.STRING_SCHEMA)
.field("amount", Schema.OPTIONAL_FLOAT64_SCHEMA)
.build();
Struct order = new Struct(orderSchema)
.put("id", 1001L)
.put("customer", "acme")
.put("amount", 249.99);
A converter takes this Struct plus its Schema and turns it into bytes; on the consuming side it reconstructs them. Whether the schema actually travels with the data depends on which converter you pick.
Built-in and common converters
| Converter | Class | Schema carried? | Typical use |
|---|---|---|---|
| String | org.apache.kafka.connect.storage.StringConverter | No | Keys, plain text values |
| JSON | org.apache.kafka.connect.json.JsonConverter | Optional (envelope) | Schema-aware JSON, no registry |
| ByteArray | org.apache.kafka.connect.converters.ByteArrayConverter | No | Pass-through binary |
| Avro | io.confluent.connect.avro.AvroConverter | Yes (Schema Registry) | Compact, evolvable production data |
| Protobuf | io.confluent.connect.protobuf.ProtobufConverter | Yes (Schema Registry) | gRPC-style schemas |
| JSON Schema | io.confluent.connect.json.JsonSchemaConverter | Yes (Schema Registry) | Validated JSON |
StringConverter, JsonConverter, and ByteArrayConverter ship with Apache Kafka. The Avro, Protobuf, and JSON Schema converters come from Confluent and require Schema Registry on the classpath.
JsonConverter with and without schemas
JsonConverter has a critical toggle, schemas.enable. When true, every message is wrapped in an envelope containing both schema and payload, making the data self-describing but verbose:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
Output:
{
"schema": { "type": "struct", "fields": [
{ "field": "id", "type": "int64" },
{ "field": "customer", "type": "string" }
] },
"payload": { "id": 1001, "customer": "acme" }
}
Setting schemas.enable=false emits plain JSON ({"id":1001,"customer":"acme"}) but discards type information, which downstream sinks may need.
Gotcha: A sink connector reading plain JSON written with
schemas.enable=falsecannot reconstruct aSchema. Sinks that need typed structures (e.g. JDBC sink building DDL) will fail or treat everything as a schemaless map. Keep schemas enabled when a sink relies on them.
AvroConverter with Schema Registry
Avro is the production default for high-volume pipelines. Instead of embedding the schema in every message, the converter registers it once in Schema Registry and stores only a compact schema ID in the record bytes. This yields small payloads plus enforced schema evolution.
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
For a source connector, the Avro schema is derived from the connector’s Connect Schema. For a sink, the converter looks up the schema ID, fetches the writer schema, and rebuilds the Struct.
Matching converters across source and sink
The single most important rule: the converter that reads a topic must understand how the data was written. Bytes carry no inherent format, so a mismatch produces deserialization errors, not graceful degradation.
[Source: AvroConverter] --> topic "orders" (Avro + schema id) --> [Sink: JsonConverter]
|
v
DataException: Unknown magic byte / failed to deserialize
If a producer (or another source connector) wrote Avro, the sink must use AvroConverter pointed at the same registry. Likewise, mixing schemaless JSON producers with a schema-expecting sink will fail. When a topic has heterogeneous producers, standardize on one serialization format first.
You can override converters per connector via the REST API, which is essential when one worker hosts connectors consuming different formats:
{
"name": "jdbc-sink-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "orders",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Tip: Internal converters (
internal.key.converter/internal.value.converter) are deprecated and hardcoded to schemaless JSON for storing offsets and config. Do not change them; they are unrelated to your data converters.
Best Practices
- Use Avro or Protobuf with Schema Registry for production data — compact payloads plus enforced, evolvable schemas beat self-describing JSON envelopes at scale.
- Keep
key.converterandvalue.converterindependent;StringConverterkeys with structured values is a common, sensible combination. - Always confirm that the writer’s format matches the reader’s converter before deploying a sink — most “magic byte” and
DataExceptionerrors are converter mismatches. - If you use
JsonConverter, decideschemas.enabledeliberately: enable it when a sink needs typed records, disable it only for lightweight schemaless flows. - Override converters per connector rather than at the worker level when a cluster handles mixed formats.
- Point every Avro/Protobuf converter at the same Schema Registry URL and configure a sensible compatibility mode to prevent breaking consumers.