Обращение к хранилищу Iceberg напрямую с помощью Python

В этом примере мы покажем, как обращаться к хранилищу Iceberg напрямую с помощью Python. Используя этот способ, вы сможете читать и редактировать существующие таблицы, создавать новые таблицы, в том числе на основе данных из существующих таблиц, и совершать другие операции непосредственно в хранилище Iceberg.

Для начала в ячейке типа SQL создадим тестовую таблицу с одним столбцом и запишем в строки 1 миллион чисел начиная с 1. Для этого используем функции unnest и generate_series.

CREATE OR REPLACE TABLE demo.iceberg_test (numbers BIGINT);

INSERT INTO demo.iceberg_test (numbers)
    SELECT unnest(generate_series(1,1000000));
Done in 2.9 sec

+---------+
| Count   |
+---------+
| 1000000 |
+---------+

Выведем получившуюся таблицу:

SELECT * FROM demo.iceberg_test
    ORDER BY numbers
Done in 2.1 sec

+---------+
| numbers |
+---------+
| 1       |
+---------+
| 2       |
+---------+
| 3       |
+---------+
| 4       |
+---------+
| 5       |
+---------+
| ...     |
+---------+
999+ rows

Предположим, что на основе этой таблицы нам нужно создать новую таблицу, в которой должны быть все столбцы из исходной таблицы и один новый столбец. В новый столбец мы должны записать данные, вычисленные с помощью описанной на Python функции, передавая ей в качестве аргументов значения из столбцов исходной таблицы.

В качестве такой тестовой функции опишем функцию check_odd, которая возвращает строку odd для нечетных чисел и строку even — для четных.

Все это мы сделаем, обращаясь напрямую в Iceberg с помощью Python:

import tngri
import pyarrow
import pyiceberg
from contextlib import suppress
import pandas


# Задаем имя исходной таблицы
source_table_name = 'demo.iceberg_test'

# Задаем имя целевой таблицы
target_table_name = 'demo.iceberg_test_target'

# Задаем имя колонки в целевой таблице для записи новых данных
target_column = 'odd_or_even'

print(f'Source table: {source_table_name}')
print(f'Target table: {target_table_name}')

# Загружаем исходную таблицу
source_table = catalog.load_table(source_table_name)

# Создаем целевую таблицу, копируем схему из исходной
with suppress(Exception):
    catalog.drop_table(target_table_name)
sink = catalog.create_table(target_table_name, source_table.schema())

# Добавляем колону для записи новых данных (обязательно указать тип данных)
with sink.update_schema() as tx:
    tx.add_column(target_column, pyiceberg.schema.StringType())

# Тестовая функция для вычисления новых данных
def check_odd(num):
    if num % 2:
        return 'odd'
    else:
        return 'even'

# Делим исходную таблицу на батчи, чтобы не загружать в память целиком
table_batches = source_table.scan().to_arrow_batch_reader()

# Записываем новые данные по батчам
step = 0
for batch in table_batches:
    batch: pyarrow.RecordBatch

    step += 1
    print(f'Step: {step}')

    # Преобразуем батч в DataFrame
    part_df = batch.to_pandas()

    # Записываем новые данные в новую колонку DataFrame с тем же именем, что в таблице
    part_df[target_column] = part_df['numbers'].apply(lambda num: check_odd(num))

    # Выводим размер DataFrame с новыми данными
    print(f'Result dataframe shape: {part_df.shape}')

    # Преобразуем DataFrame обратно в таблицу
    sink_part = pyarrow.Table.from_pandas(part_df)

    # Аппендим данный батч в целевую таблицу
    sink.append(sink_part)

# Выводим длину целевой таблицы через scan().count()
print(f'Result table length: {sink.scan().count()}')
# Выводим длину целевой таблицы через tngri.sql
print(tngri.sql(f'SELECT count(*) FROM {target_table_name}'))
Done in 11.9 sec

Source table: demo.iceberg_test
Target table: demo.iceberg_test_target
Step: 1
Result dataframe shape: (16960, 2)
Step: 2
Result dataframe shape: (122880, 2)
Step: 3
Result dataframe shape: (122880, 2)
Step: 4
Result dataframe shape: (122880, 2)
Step: 5
Result dataframe shape: (122880, 2)
Step: 6
Result dataframe shape: (122880, 2)
Step: 7
Result dataframe shape: (122880, 2)
Step: 8
Result dataframe shape: (122880, 2)
Step: 9
Result dataframe shape: (122880, 2)
Result table length: 1000000
shape: (1, 1)
+----------+
│ column_0 │
│ ---      │
│ i64      │
+----------+
│ 1000000  │
+----------+

Проверим целевую таблицу. Для этого в ячейке типа SQL выведем ее первые пять строк, упорядочив строки по столбцу numbers:

SELECT * FROM demo.iceberg_test_target
    ORDER BY numbers
    LIMIT 5
Done in 2.1 sec

+---------+-------------+
| numbers | odd_or_even |
+---------+-------------+
| 1       | odd         |
+---------+-------------+
| 2       | even        |
+---------+-------------+
| 3       | odd         |
+---------+-------------+
| 4       | even        |
+---------+-------------+
| 5       | odd         |
+---------+-------------+