Использование процесса подмерживания изменения для реализации SCD2 в YDB

В этой статье описывается реализация паттерна Slowly Changing Dimensions Type 2 (SCD2) в YDB с использованием процесса подмерживания изменений.

Используемые инструменты

Для поставки данных в SCD2 таблицу в данной статье будет использоваться следующая комбинация из доступной в YDB функциональности:

  1. Таблица-источник dimension_scd_changes, содержащая информацию об атрибутах, их значениях и моментах изменений данных.
  2. Таблица-приёмник dimension_scd2_final для хранения результирующих данных.
  3. Периодически внешнее приложение должно вызывать запрос, который будет подмерживать изменения данных, накопившиеся в таблице dimension_scd_changes, в таблицу dimension_scd2_final.
  4. Для поставки данных из строковых таблиц для хранения их в формате SCD2 удобно использовать встроенный в YDB механизм трансфера.

Примечание

Таблицы dimension_scd_changes, dimension_scd2_final приведены для иллюстрации. Для реальных запросов вам нужно скорректировать структуру таблиц и их атрибутов.

Создание таблицы для приёма всех изменений dimension_scd_changes

CREATE TABLE dimension_scd_changes (
    id Utf8 NOT NULL, -- Бизнес-ключ
    attribute1 Utf8,  -- Атрибут данных
    attribute2 Utf8,  -- Атрибут данных
    change_time Timestamp NOT NULL, -- Момент изменения данных
    operation Utf8, -- Тип изменений данных
    PRIMARY KEY (change_time, id)
)
PARTITION BY HASH(change_time, id)
WITH (
    STORE=COLUMN
)

Описание полей таблицы:

  • id — бизнес-ключ записи;
  • attribute1, attribute2 — атрибуты измерения;
  • change_time — момент времени изменения данных;
  • operation — тип изменения данных: CREATE, UPDATE, DELETE.

Первичный ключ создается как PRIMARY KEY (change_time, id), так как по одному и тому же бизнес-ключу данных может происходить множество изменений и все эти изменения по одному ключу важно сохранять. Подробнее про выбор первичного ключа и ключа партиционирования, можно прочесть в документации - выбор первичного ключа, выбор ключа партиционирования

Создание финальной SCD2 таблицы dimension_scd2_final

CREATE TABLE dimension_scd2_final (
    id Utf8 NOT NULL, -- Бизнес-ключ данных
    attribute1 Utf8,  -- Атрибут данных
    attribute2 Utf8,  -- Атрибут данных
    valid_from Timestamp NOT NULL, -- Момент времени, с которого данные актуальны
    valid_to Timestamp,            -- Момент времени, до которого данные актуальны.
                                   -- Если данные актуальны прямо сейчас, то в valid_to находится NULL
    is_current Uint8,              -- Признак, что данные актуальны прямо сейчас.
    is_deleted Uint8,              -- Признак, что данные были удалены. Если данные были удалены, то is_current = FALSE
    PRIMARY KEY (valid_from, id)
)
PARTITION BY HASH(valid_from, id)
WITH(
    STORE=COLUMN
)

Описание полей таблицы:

  • id — бизнес-ключ записи;
  • attribute1, attribute2 — атрибуты измерения;
  • valid_from — момент времени, с которого запись становится актуальной;
  • valid_to — момент времени, до которого запись была актуальной, или NULL для текущих записей;
  • is_current — флаг, указывающий, является ли запись текущей (1 - текущая запись) или (0 - запись историческая);
  • is_deleted — флаг, указывающий, была ли запись удалена (1 - запись была удалена) или (0 - запись не была удалена).

Первичный ключ создается как PRIMARY KEY (valid_from, id), так как по одному и тому же ключу данных может происходить множество изменений и все эти изменения по одному ключу важно сохранять.

Загрузка данных в таблицу изменений

Для загрузки данных в таблицу изменений можно использовать любой способ загрузки данных и автоматическую поставку изменений с помощью механизма трансфер.

Пример запроса для явной загрузки изменений:

UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation)
VALUES ('CUSTOMER_1001', 'John Doe', 'Los Angeles', Unwrap(CAST('2025-08-22T17:00:00Z' as Timestamp)), 'CREATE');

UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation)
VALUES ('CUSTOMER_1002', 'John Doe', 'New York', Unwrap(CAST('2025-08-22T17:00:00Z' as Timestamp)), 'CREATE');

UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation)
VALUES ('CUSTOMER_1001', 'John Doe', 'San Francisco', Unwrap(CAST('2025-08-22T19:00:00Z' as Timestamp)), 'UPDATE');

UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation)
VALUES ('CUSTOMER_1002', 'John Doe', 'New York', Unwrap(CAST('2025-08-22T21:00:00Z' as Timestamp)), 'DELETE');

Запрос для размещения изменений в формате SCD2

Чтобы преобразовать данные из таблицы изменений в формат SCD2 и загрузить их в финальную таблицу, используется специальный запрос. Этот запрос нужно запускать регулярно — с такой периодичностью, с какой вы хотите обновлять данные в финальной таблице. Для автоматического запуска можно воспользоваться интеграцию YDB с Apache Airflow™:

-- Шаг 1: Читаем все новые события из таблицы `dimension_scd_changes`.
-- Это именованное выражение ($changes) является исходным набором данных для всей последующей обработки в рамках этого запуска.
$changes = (
    SELECT
        id,
        attribute1,
        attribute2,
        change_time,
        String::AsciiToUpper(operation) AS op
    FROM dimension_scd_changes
);

-- Шаг 2: Фильтруем события, оставляя только те, которых еще нет в целевой таблице.
-- Цель этого шага - обеспечить идемпотентность на уровне чтения, чтобы не обрабатывать
-- уже загруженные данные в случае сбоя и перезапуска скрипта.
$unprocessed_data = (
    SELECT
        chg.id AS id,
        chg.attribute1 AS attribute1,
        chg.attribute2 AS attribute2,
        chg.change_time AS change_time,
        chg.op AS op
    FROM $changes AS chg
    LEFT JOIN dimension_scd2_final AS scd
        ON chg.id = scd.id AND chg.change_time = scd.valid_from -- Ищем записи по каждой сущности (id) и времени изменения
    WHERE scd.id IS NULL -- для исключения строк, которые уже были перенесены в таблицу dimension_scd2_final ранее
);

-- Шаг 3: Находим в целевой таблице активные записи (`is_current=1`), для которых пришли обновления.
-- Формируем для них "закрывающие" версии, устанавливая `valid_to` равным времени
-- самого первого изменения из новой пачки ($unprocessed_data).
$close_open_intervals = (
    SELECT
        target.id AS id,
        target.attribute1 as attribute1,
        target.attribute2 as attribute2,
        target.valid_from as valid_from,
        0ut AS is_current, -- Закрываемая запись больше не является текущей
        unprocessed_data.change_time AS valid_to,
        target.is_deleted as is_deleted
    FROM dimension_scd2_final AS target
    INNER JOIN (
        SELECT
            id,
            MIN(change_time) AS change_time
        FROM $unprocessed_data
        GROUP BY id
    ) AS unprocessed_data
    ON target.id = unprocessed_data.id
    WHERE target.is_current = 1ut
);

-- Шаг 4: Преобразуем поток необработанных событий в версионные записи (строки для вставки).
-- Здесь вычисляются все необходимые атрибуты для новых версий: `valid_to`, `is_current`, `is_deleted`.
$updated_data = (
    SELECT
        t.id AS id,
        t.attribute1 AS attribute1,
        t.attribute2 AS attribute2,
        t.is_deleted AS is_deleted,
        -- Логика флага `is_current`: он устанавливается в 1 только для последней
        -- записи в цепочке (`next_change_time IS NULL`), и только если это не
        -- операция удаления (`is_deleted == 0`).
        IF(t.next_change_time IS NOT NULL OR t.is_deleted == 1ut, 0ut, 1ut) AS is_current,
        t.change_time AS valid_from,
        t.next_change_time AS valid_to
    FROM (
        -- Подзапрос вычисляет для каждой строки флаг удаления (`is_deleted`)
        -- и временную метку следующего события (`next_change_time`) с помощью оконной функции LEAD.
        SELECT
            unprocessed_data.id AS id,
            unprocessed_data.attribute1 AS attribute1,
            unprocessed_data.attribute2 AS attribute2,
            unprocessed_data.op AS op,
            unprocessed_data.change_time AS change_time,
            IF(unprocessed_data.op = "DELETE", 1ut, 0ut) AS is_deleted,
            LEAD(unprocessed_data.change_time) OVER (PARTITION BY id ORDER BY unprocessed_data.change_time) AS next_change_time
        FROM $unprocessed_data AS unprocessed_data
    ) AS t
);

-- Шаг 5: Атомарно применяем все рассчитанные изменения к целевой таблице.
-- UPSERT обновит существующие записи (из $close_open_intervals) и вставит новые (из $updated_data).
UPSERT INTO dimension_scd2_final (id, attribute1, attribute2, is_current, is_deleted, valid_from, valid_to)
SELECT
    id,
    attribute1,
    attribute2,
    is_current,
    is_deleted,
    valid_from,
    valid_to
FROM $close_open_intervals
UNION ALL
SELECT
    id,
    attribute1,
    attribute2,
    is_current,
    is_deleted,
    valid_from,
    valid_to
FROM $updated_data;

-- Шаг 6: Очищает стейджинг-таблицу от обработанных записей.
DELETE FROM dimension_scd_changes ON
SELECT id, change_time FROM $changes;

Демонстрация работы

В примере ниже рассматривается сущность Customer:

  • бизнес-ключ — поле id,
  • атрибуты — attribute1 (полное имя) и attribute2 (город).

В момент времени 2025-08-22 17:00 создаются два клиента (John в Los Angeles - с id CUSTOMER_1001 и Judy в New York с id CUSTOMER_1002), в момент времени 2025-08-22 19:00 клиент CUSTOMER_1001 меняет город на San Francisco UPDATE, а в момент 2025-08-22 21:00 клиент CUSTOMER_1002 удаляется DELETE.

id attribute1 attribute2 change_time operation
CUSTOMER_1001 John Doe Los Angeles 2025-08-22 17:00 CREATE
CUSTOMER_1002 Judy Doe New York 2025-08-22 17:00 CREATE
CUSTOMER_1001 John Doe San Francisco 2025-08-22 19:00 UPDATE
CUSTOMER_1002 Judy Doe New York 2025-08-22 21:00 DELETE

Процесс SCD2 преобразует такие события в интервальные версии записей с полями valid_from и valid_to. Например, у CUSTOMER_1001 получится две последовательные версии: сначала с городом LA, затем с городом SF (актуальная запись, у которой valid_to = NULL). У CUSTOMER_1002 будет одна устаревшая версия и последняя запись с флагами is_deleted=1 и is_current=0, которая показывает, что пользователь удалён.

Ниже показаны исходные события и соответствующие им версии в финальной таблице.

id attribute1 attribute2 valid_from valid_to is_current is_deleted
CUSTOMER_1001 John Doe Los Angeles 2025-08-22 17:00 2025-08-22 19:00 0 0
CUSTOMER_1001 John Doe San Francisco 2025-08-22 19:00 NULL 1 0
CUSTOMER_1002 Judy Doe New York 2025-08-22 17:00 2025-08-22 21:00 0 0
CUSTOMER_1002 Judy Doe New York 2025-08-22 21:00 NULL 0 1

Получение данных из SCD2-таблицы

Получение актуальных данных

SELECT
    id,
    attribute1,
    attribute2,
    valid_from,
    valid_to
FROM dimension_scd2_final
WHERE is_current = 1ut;

Результат:

id attribute1 attribute2 valid_from valid_to is_current is_deleted
CUSTOMER_1001 John Doe San Francisco 2025-08-22 19:00 NULL 1 0

Получение данных на определённый момент времени

$as_of = Timestamp("2025-08-22T19:11:30.000000Z");

SELECT
    id,
    attribute1,
    attribute2,
    valid_from,
    valid_to
FROM dimension_scd2_final
WHERE valid_from <= $as_of
    AND (valid_to IS NULL OR valid_to > $as_of) -- Получаем записи, которые действовали в $as_of момент времени
    AND is_deleted = 0ut -- Только записи, которые не удалены

Результат:

id attribute1 attribute2 valid_from valid_to
CUSTOMER_1001 John Doe San Francisco 2025-08-22 19:00 NULL
CUSTOMER_1002 Judy Doe New York 2025-08-22 17:00 2025-08-22 21:00

Получение истории изменений для конкретной записи

SELECT
    id,
    attribute1,
    attribute2,
    valid_from,
    valid_to,
    is_current,
    is_deleted
FROM dimension_scd2_final
WHERE id = 'CUSTOMER_1001'
ORDER BY valid_from;

Результат:

id attribute1 attribute2 valid_from valid_to is_current is_deleted
CUSTOMER_1001 John Doe Los Angeles 2025-08-22 17:00 2025-08-22 19:00 0 0
CUSTOMER_1001 John Doe San Francisco 2025-08-22 19:00 NULL 1 0