Change Data Capture (CDC)
Warning
Supported only for row-oriented tables. Support for column-oriented tables is currently under development.
Change Data Capture (CDC) captures changes to YDB table rows, uses these changes to generate a changefeed, writes them to distributed storage, and provides access to these records for further processing. It uses a topic as distributed storage to efficiently store the table change log.
When adding, updating, or deleting a table row, CDC generates a change record by specifying the primary key of the row and writes it to the topic partition corresponding to this key.
Guarantees
- Change records are sharded across topic partitions by primary key.
- Each change is only delivered once (exactly-once delivery).
- Changes by the same primary key are delivered to the same topic partition in the order they took place in the table.
- Change record is delivered to the topic partition only after the corresponding transaction in the table has been committed.
Limitations
-
The number of topic partitions is fixed as of changefeed creation and remains unchanged (unlike tables, topics are not elastic).
-
Changefeeds support records of the following types of operations:
- Updates
- Erases
Adding rows is a special update case, and a record of adding a row in a changefeed will look similar to an update record.
Virtual timestamps
All changes in YDB tables are arranged according to the order in which transactions are performed. Each change is marked with a virtual timestamp which consists of two elements:
- Global coordinator time.
- Unique transaction ID.
Using these stamps, you can arrange records from different partitions of the topic relative to each other or use them for filtering (for example, to exclude old change records).
Note
By default, virtual timestamps are not uploaded to the changefeed. To enable them, use the appropriate parameter when creating a changefeed.
Initial table scan
By default, a changefeed only includes records about those table rows that changed after the changefeed was created. Initial table scan enables you to export, to the changefeed, the values of all the rows that existed at the time of changefeed creation.
The scan runs in the background mode on top of the table snapshot. The following situations are possible:
- A non-scanned row changes in the table. The changefeed will receive, one after another: a record with the source value and a record about the update. When the same record is changed again, only the update record is exported.
- A changed row is found during scanning. Nothing is exported to the changefeed because the source value has already been exported at the time of change (see the previous paragraph).
- A scanned row changes in the table. Only an update record exports to the changefeed.
This ensures that, for the same row (the same primary key), the source value is exported first, and then the updated value is exported.
Note
The record with the source row value is labeled as an update record. When using virtual timestamps, records are marked by the snapshot's timestamp.
During the scanning process, depending on the table update frequency, you might see too many OVERLOADED
errors. This is because, besides the update records, you also need to deliver records with the source row values. When the scan is complete, the changefeed switches to normal operation.
Warning
Automatic partitioning processes are suspended in the table during the initial scan.
Record structure
Depending on the changefeed parameters, the structure of a record may differ.
JSON format
A JSON record has the following structure:
{
"key": [<key components>],
"update": {<columns>},
"erase": {},
"newImage": {<columns>},
"oldImage": {<columns>},
"ts": [<step>, <txId>]
}
key
: An array of primary key component values. Always present.update
: Update flag. Present if a record matches the update operation. InUPDATES
mode, it also contains the names and values of updated columns.erase
: Erase flag. Present if a record matches the erase operation.newImage
: Row snapshot that results from its being changed. Present inNEW_IMAGE
andNEW_AND_OLD_IMAGES
modes. Contains column names and values.oldImage
: Row snapshot before the change. Present inOLD_IMAGE
andNEW_AND_OLD_IMAGES
modes. Contains column names and values.ts
: Virtual timestamp. Present if theVIRTUAL_TIMESTAMPS
setting is enabled. Contains the value of the global coordinator time (step
) and the unique transaction ID (txId
).
Sample record of an update in UPDATES
mode:
{
"key": [1, "one"],
"update": {
"payload": "lorem ipsum",
"date": "2022-02-22"
}
}
Record of an erase:
{
"key": [2, "two"],
"erase": {}
}
Record with row snapshots:
{
"key": [1, 2, 3],
"update": {},
"newImage": {
"textColumn": "value1",
"intColumn": 101,
"boolColumn": true
},
"oldImage": {
"textColumn": null,
"intColumn": 100,
"boolColumn": false
}
}
Record with virtual timestamps:
{
"key": [1],
"update": {
"created": "2022-12-12T00:00:00.000000Z",
"customer": "Name123"
},
"ts": [1670792400890, 562949953607163]
}
Note
- The same record may not contain the
update
anderase
fields simultaneously, since these fields are operation flags (you can't update and erase a table row at the same time). However, each record contains one of these fields (any operation is either an update or an erase). - In
UPDATES
mode, theupdate
field for update operations is an operation flag (update) and contains the names and values of updated columns. - JSON object fields containing column names and values (
newImage
,oldImage
, andupdate
inUPDATES
mode), do not include the columns that are primary key components. - If a record contains the
erase
field (indicating that the record matches the erase operation), this is always an empty JSON object ({}
).
Debezium-compatible JSON format
A Debezium-compatible JSON record structure has the following format:
{
"payload": {
"op": <op>,
"before": {<columns>},
"after": {<columns>},
"source": {
"connector": <connector>,
"version": <version>,
"ts_ms": <ts_ms>,
"step": <step>,
"txId": <txId>,
"snapshot": <bool>
}
}
}
-
op
: Operation that was performed on a row:c
— create. Applicable only inNEW_AND_OLD_IMAGES
mode.u
— update.d
— delete.r
— read from snapshot.
-
before
: Row snapshot before the change. Present inOLD_IMAGE
andNEW_AND_OLD_IMAGES
modes. Contains column names and values. -
after
: Row snapshot after the change. Present inNEW_IMAGE
andNEW_AND_OLD_IMAGES
modes. Contains column names and values. -
source
: Source metadata for the event.connector
: Connector name. Current name isydb
.version
: Connector version that was used to generate the record. Current version is1.0.0
.ts_ms
: Approximate time when the change was applied, in milliseconds.step
: Global coordinator time. Part of the virtual timestamp.txId
: Unique transaction ID. Part of the virtual timestamp.snapshot
: Whether the event is part of a snapshot.
When reading using Kafka API, the Debezium-compatible primary key of the modified row is specified as the message key:
{
"payload": {<columns>}
}
payload
: Key of a row that was changed. Contains names and values of the columns that are components of the primary key.
Record retention period
By default, records are stored in the changefeed for 24 hours from the time they are sent. Depending on usage scenarios, the retention period can be reduced or increased up to 30 days.
Warning
Records whose retention time has expired are deleted, regardless of whether they were processed (read) or not.
Deleting records before they are processed by the client will cause offset skips, which means that the offsets of the last record read from the partition and the earliest available record will differ by more than one.
To set up the record retention period, specify the RETENTION_PERIOD parameter when creating a changefeed.
Topic partitions
By default, the number of topic partitions is equal to the number of table partitions. The number of topic partitions can be redefined by specifying TOPIC_MIN_ACTIVE_PARTITIONS parameter when creating a changefeed.
Note
Currently, the ability to explicitly specify the number of topic partitions is available only for tables whose first primary key component is of type Uint64
or Uint32
.
Creating and deleting a changefeed
You can add a changefeed to an existing table or erase it using the ADD CHANGEFEED and DROP CHANGEFEED directives of the YQL ALTER TABLE
statement. When erasing a table, the changefeed added to it is also deleted.
CDC purpose and use
For information about using CDC when developing apps, see best practices.