Apache Airflow™
Интеграция YDB с Apache Airflow™ позволяет автоматизировать сложные рабочие процессы и управлять ими. Apache Airflow™ предоставляет возможности для планирования задач, мониторинга их выполнения и управления зависимостями между ними — оркестрацией. Использование Airflow для оркестрации задач, таких как загрузка данных в YDB, выполнение запросов и управление транзакциями, позволяет автоматизировать и оптимизировать операционные процессы. Это особенно важно для задач ETL, где данные больших объемов требуют регулярного извлечения, преобразования и загрузки.
Для работы под управлением Apache Airflow™ YDB предоставляет пакет провайдера apache-airflow-providers-ydb. Задания Apache Airflow™ представляют собой приложения на языке Python, состоящие из набора операторов Apache Airflow™ и их зависимостей, определяющих порядок выполнения.
Установка
Для корректной работы пакета apache-airflow-providers-ydb необходимо на всех хостах Apache Airflow™ выполнить следующие команды:
pip install ydb apache-airflow-providers-ydb
Для работы требуется Python версии не ниже, чем 3.8.
Объектная модель
Пакет airflow.providers.ydb содержит набор компонентов для взаимодействия с YDB:
- Оператор YDBExecuteQueryOperator для интеграции задач в планировщик Apache Airflow™.
- Хук YDBHook для прямого взаимодействия с YDB.
YDBExecuteQueryOperator
Для выполнения запросов к YDB используется Apache Airflow™ оператор YDBExecuteQueryOperator.
Обязательные аргументы:
task_id— название задания Apache Airflow™.sql— текст SQL-запроса, который необходимо выполнить в YDB.
Опциональные аргументы:
ydb_conn_id— идентификатор подключения с типомYDB, содержащий параметры соединения с YDB. Если не указан, то используется соединение с именемydb_default. Соединениеydb_defaultпредустанавливается в составе Apache Airflow™, отдельно его заводить не нужно.is_ddl— признак, что выполняется SQL DDL запрос. Если аргумент не указан, или установлен вFalse, то будет выполняться SQL DML запрос.params— словарь параметров запроса.
Пример:
ydb_operator = YDBExecuteQueryOperator(task_id="ydb_operator", sql="SELECT 'Hello, world!'")
В данном примере создается задание Apache Airflow™ с идентификатором ydb_operator, которое выполняет запрос SELECT 'Hello, world!'.
YDBHook
Для выполнения низкоуровневых команд в YDB используется Apache Airflow™ класс YDBHook.
Опциональные аргументы:
ydb_conn_id— идентификатор подключения с типомYDB, содержащий параметры соединения с YDB. Если не указан, то используется соединение с именемydb_default. Соединениеydb_defaultпредустанавливается в составе Apache Airflow™, отдельно его заводить не нужно.is_ddl— признак, что выполняется SQL DDL запрос. Если аргумент не указан, или установлен вFalse, то будет выполняться SQL DML запрос.
YDBHook поддерживает следующие методы:
bulk_upsert
Выполняет пакетную вставку данных в таблицы YDB.
Обязательные аргументы:
table_name— название таблицы YDB, куда будет выполняться вставка данных.rows— массив строк для вставки.column_types— описание типов колонок.
Пример:
hook = YDBHook(ydb_conn_id=...)
column_types = (
ydb.BulkUpsertColumns()
.add_column("pet_id", ydb.OptionalType(ydb.PrimitiveType.Int32))
.add_column("name", ydb.PrimitiveType.Utf8)
.add_column("pet_type", ydb.PrimitiveType.Utf8)
.add_column("birth_date", ydb.PrimitiveType.Utf8)
.add_column("owner", ydb.PrimitiveType.Utf8)
)
rows = [
{"pet_id": 3, "name": "Lester", "pet_type": "Hamster", "birth_date": "2020-06-23", "owner": "Lily"},
{"pet_id": 4, "name": "Quincy", "pet_type": "Parrot", "birth_date": "2013-08-11", "owner": "Anne"},
]
hook.bulk_upsert("pet", rows=rows, column_types=column_types)
В данном примере создается объект YDBHook, через который выполняется операция пакетной вставки данных bulk_upsert.
get_conn
Возвращает объект YDBConnection, реализующий интерфейс DbApiConnection для работы с данными. Класс DbApiConnection обеспечивает стандартизированный интерфейс для взаимодействия с базой данных, позволяющий выполнять такие операции, как подключение, выполнение SQL-запросов и управление транзакциями, независимо от конкретной системы управления базами данных.
Пример:
hook = YDBHook(ydb_conn_id=...)
# Выполняем SQL-запрос и получаем курсор
connection = hook.get_conn()
cursor = connection.cursor()
cursor.execute("SELECT * from pet;")
# Извлекаем результат и имена колонок
result = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
# Закрываем курсор и соединение
cursor.close()
connection.close()
В данном примере создается объект YDBHook, у созданного объекта запрашивается объект YDBConnection, через который выполняется чтение данных и получение списка колонок.
Подключение к YDB
Для подключения к YDB необходимо создать новое или отредактировать существующее подключение Apache Airflow™ с типом YDB.

Где:
Connection Id- название подключения Apache Airflow™.Host- протокол и адрес кластера YDB.Port- порт для подключения к кластеру YDB.Database name- название базы данных YDB.
Укажите реквизиты для одного из следующих способов аутентификации на кластере YDB:
LoginиPassword- укажите реквизиты пользователя для аутентификации по логину и паролю.Service account auth JSON- укажите значениеService Account Key.Service account auth JSON file path- укажите путь к файлу, содержащемуService Account Key.IAM token- укажите IAM токен.Use VM metadata- указание использовать метаданные виртуальной машины.
Соответствие YQL и Python-типов
Ниже приведены правила преобразования YQL-типов в Python-результаты. Типы, не указанные в списке ниже, не поддерживаются.
Скалярные типы
| YQL-тип | Python-тип | Пример в Python |
|---|---|---|
Int8, Int16, Int32, Uint8, Uint16, Uint32, Int64, Uint64 |
int |
647713 |
Bool |
bool |
True |
Float, float |
floatNaN и Inf представляются в виде None |
7.88731023None |
Decimal |
Decimal |
45.23410083 |
Utf8 |
str |
Текст строки |
String |
str |
Текст строки |
Сложные типы
| YQL-тип | Python-тип | Пример в Python |
|---|---|---|
Json, JsonDocument |
str (весь узел вставляется как строка) |
{"a":[1,2,3]} |
Date |
datetime.date |
2022-02-09 |
Datetime, Timestamp |
datetime.datetime |
2022-02-09 10:13:11 |
Опциональные типы
| YQL-тип | Python-тип | Пример в Python |
|---|---|---|
Optional |
Оригинальный тип или None | 1 |
Контейнеры
| YQL-тип | Python-тип | Пример в Python |
|---|---|---|
List<Type> |
list |
[1,2,3,4] |
Dict<KeyType, ValueType> |
dict |
{key1: "value1", key2: "value2"} |
Set<KeyType> |
set |
set(key_value1, key_value2) |
Tuple<Type1, Type2> |
tuple |
(element1, element2) |
Struct<Name:Utf8,Age:Int32> |
dict |
{ "Name": "value1", "Age": value2 } |
Специальные типы
| YQL-тип | Python-тип |
|---|---|
Void, Null |
None |
EmptyList |
[] |
EmptyDict |
{} |
Пример
Для выполнения запросов к YDB в составе пакета содержится оператор Apache Airflow™ YDBExecuteQueryOperator и хук YDBHook.
В примере ниже создается задание create_pet_table, создающее таблицу в YDB. После успешного создания таблицы вызывается задание populate_pet_table, заполняющее таблицу данными с помощью команд UPSERT, и задание populate_pet_table_via_bulk_upsert, заполняющее таблицу с помощью команд пакетной вставки данных bulk_upsert. После выполнения вставки данных выполняется операция чтения с помощью задания get_all_pets и задание для параметризованного чтения данных get_birth_date.

Для выполнения запросов к базе данных YDB используется предварительно созданное соединение c YDB типа YDB Connection c именем test_ydb_connection.
from __future__ import annotations
import datetime
import ydb
from airflow import DAG
from airflow.decorators import task
from airflow.providers.ydb.hooks.ydb import YDBHook
from airflow.providers.ydb.operators.ydb import YDBExecuteQueryOperator
@task
def populate_pet_table_via_bulk_upsert():
hook = YDBHook(ydb_conn_id="test_ydb_connection")
column_types = (
ydb.BulkUpsertColumns()
.add_column("pet_id", ydb.OptionalType(ydb.PrimitiveType.Int32))
.add_column("name", ydb.PrimitiveType.Utf8)
.add_column("pet_type", ydb.PrimitiveType.Utf8)
.add_column("birth_date", ydb.PrimitiveType.Utf8)
.add_column("owner", ydb.PrimitiveType.Utf8)
)
rows = [
{"pet_id": 3, "name": "Lester", "pet_type": "Hamster", "birth_date": "2020-06-23", "owner": "Lily"},
{"pet_id": 4, "name": "Quincy", "pet_type": "Parrot", "birth_date": "2013-08-11", "owner": "Anne"},
]
hook.bulk_upsert("pet", rows=rows, column_types=column_types)
with DAG(
dag_id="ydb_demo_dag",
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
) as dag:
create_pet_table = YDBExecuteQueryOperator(
task_id="create_pet_table",
sql="""
CREATE TABLE pet (
pet_id INT,
name TEXT NOT NULL,
pet_type TEXT NOT NULL,
birth_date TEXT NOT NULL,
owner TEXT NOT NULL,
PRIMARY KEY (pet_id)
);
""",
is_ddl=True, # must be specified for DDL queries
ydb_conn_id="test_ydb_connection"
)
populate_pet_table = YDBExecuteQueryOperator(
task_id="populate_pet_table",
sql="""
UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
VALUES (1, 'Max', 'Dog', '2018-07-05', 'Jane');
UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
VALUES (2, 'Susie', 'Cat', '2019-05-01', 'Phil');
""",
ydb_conn_id="test_ydb_connection"
)
get_all_pets = YDBExecuteQueryOperator(task_id="get_all_pets", sql="SELECT * FROM pet;", ydb_conn_id="test_ydb_connection")
get_birth_date = YDBExecuteQueryOperator(
task_id="get_birth_date",
sql="SELECT * FROM pet WHERE birth_date BETWEEN 'not_var{{params.begin_date}}' AND 'not_var{{params.end_date}}'",
params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
ydb_conn_id="test_ydb_connection"
)
(
create_pet_table
>> populate_pet_table
>> populate_pet_table_via_bulk_upsert()
>> get_all_pets
>> get_birth_date
)