Перейти к основному содержимому
Перейти к основному содержимому

Интеграция Apache Beam и ClickHouse

ClickHouse Supported

Apache Beam — это открытый, унифицированный программный интерфейс, который позволяет разработчикам определять и исполнять как пакетные, так и потоковые (непрерывные) конвейеры обработки данных. Гибкость Apache Beam заключается в его способности поддерживать широкий спектр сценариев обработки данных, от операций ETL (Извлечение, Преобразование, Загрузка) до комплексной обработки событий и аналитики в реальном времени. Эта интеграция использует официальный JDBC-коннектор ClickHouse для основной вставки данных.

Интеграционный пакет

Интеграционный пакет, необходимый для интеграции Apache Beam и ClickHouse, поддерживается и разрабатывается в рамках Apache Beam I/O Connectors — набора интеграций множества популярных систем хранения данных и баз данных. Реализация org.apache.beam.sdk.io.clickhouse.ClickHouseIO расположена в репозитории Apache Beam.

Настройка пакета Apache Beam ClickHouse

Установка пакета

Добавьте следующую зависимость в вашу систему управления пакетами:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-clickhouse</artifactId>
    <version>${beam.version}</version>
</dependency>
Рекомендуемая версия Beam

Коннектор ClickHouseIO рекомендуется использовать начиная с версии Apache Beam 2.59.0. Более ранние версии могут не полностью поддерживать функциональность коннектора.

Артефакты можно найти в официальном репозитории maven.

Пример кода

Следующий пример считывает CSV-файл с именем input.csv как PCollection, преобразует его в объект Row (с использованием определенной схемы) и вставляет его в локальный экземпляр ClickHouse с использованием ClickHouseIO:


package org.example;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.clickhouse.ClickHouseIO;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;

public class Main {

    public static void main(String[] args) {
        // Create a Pipeline object.
        Pipeline p = Pipeline.create();

        Schema SCHEMA =
                Schema.builder()
                        .addField(Schema.Field.of("name", Schema.FieldType.STRING).withNullable(true))
                        .addField(Schema.Field.of("age", Schema.FieldType.INT16).withNullable(true))
                        .addField(Schema.Field.of("insertion_time", Schema.FieldType.DATETIME).withNullable(false))
                        .build();

        // Apply transforms to the pipeline.
        PCollection<String> lines = p.apply("ReadLines", TextIO.read().from("src/main/resources/input.csv"));

        PCollection<Row> rows = lines.apply("ConvertToRow", ParDo.of(new DoFn<String, Row>() {
            @ProcessElement
            public void processElement(@Element String line, OutputReceiver<Row> out) {

                String[] values = line.split(",");
                Row row = Row.withSchema(SCHEMA)
                        .addValues(values[0], Short.parseShort(values[1]), DateTime.now())
                        .build();
                out.output(row);
            }
        })).setRowSchema(SCHEMA);

        rows.apply("Write to ClickHouse",
                        ClickHouseIO.write("jdbc:clickhouse://localhost:8123/default?user=default&password=******", "test_table"));

        // Run the pipeline.
        p.run().waitUntilFinish();
    }
}

Поддерживаемые типы данных

ClickHouseApache BeamПоддерживаетсяПримечания
TableSchema.TypeName.FLOAT32Schema.TypeName#FLOAT
TableSchema.TypeName.FLOAT64Schema.TypeName#DOUBLE
TableSchema.TypeName.INT8Schema.TypeName#BYTE
TableSchema.TypeName.INT16Schema.TypeName#INT16
TableSchema.TypeName.INT32Schema.TypeName#INT32
TableSchema.TypeName.INT64Schema.TypeName#INT64
TableSchema.TypeName.STRINGSchema.TypeName#STRING
TableSchema.TypeName.UINT8Schema.TypeName#INT16
TableSchema.TypeName.UINT16Schema.TypeName#INT32
TableSchema.TypeName.UINT32Schema.TypeName#INT64
TableSchema.TypeName.UINT64Schema.TypeName#INT64
TableSchema.TypeName.DATESchema.TypeName#DATETIME
TableSchema.TypeName.DATETIMESchema.TypeName#DATETIME
TableSchema.TypeName.ARRAYSchema.TypeName#ARRAY
TableSchema.TypeName.ENUM8Schema.TypeName#STRING
TableSchema.TypeName.ENUM16Schema.TypeName#STRING
TableSchema.TypeName.BOOLSchema.TypeName#BOOLEAN
TableSchema.TypeName.TUPLESchema.TypeName#ROW
TableSchema.TypeName.FIXEDSTRINGFixedBytesFixedBytes — это LogicalType, представляющий фиксированный
массив байтов, расположенный по адресу
org.apache.beam.sdk.schemas.logicaltypes
Schema.TypeName#DECIMAL
Schema.TypeName#MAP

Параметры ClickHouseIO.Write

Вы можете настроить конфигурацию ClickHouseIO.Write с помощью следующих функций-сеттеров:

Функция-сеттер параметраТип аргументаЗначение по умолчаниюОписание
withMaxInsertBlockSize(long maxInsertBlockSize)1000000Максимальный размер блока строк для вставки.
withMaxRetries(int maxRetries)5Максимальное количество повторных попыток для неудачных вставок.
withMaxCumulativeBackoff(Duration maxBackoff)Duration.standardDays(1000)Максимальная кумулятивная задержка для повтора.
withInitialBackoff(Duration initialBackoff)Duration.standardSeconds(5)Начальная задержка перед первой попыткой повторения.
withInsertDistributedSync(Boolean sync)trueЕсли true, синхронизирует операции вставки для распределенных таблиц.
withInsertQuorum(Long quorum)nullКоличество реплик, необходимых для подтверждения операции вставки.
withInsertDeduplicate(Boolean deduplicate)trueЕсли true, включена дедупликация для операций вставки.
withTableSchema(TableSchema schema)nullСхема целевой таблицы ClickHouse.

Ограничения

Пожалуйста, учитывайте следующие ограничения при использовании коннектора:

  • На сегодняшний день поддерживается только операция Sink. Коннектор не поддерживает операцию Source.
  • ClickHouse выполняет дедупликацию при вставке в ReplicatedMergeTree или Distributed таблицы, построенные на основе ReplicatedMergeTree. Без репликации вставка в обычный MergeTree может привести к дубликатам, если вставка завершается неудачей и затем успешно повторяется. Однако каждый блок вставляется атомарно, и размер блока можно настроить с помощью ClickHouseIO.Write.withMaxInsertBlockSize(long). Дедупликация достигается с использованием контрольных сумм вставленных блоков. Для получения дополнительной информации о дедупликации, пожалуйста, посетите Deduplication и Deduplicate insertion config.
  • Коннектор не выполняет никаких операций DDL; поэтому целевая таблица должна существовать перед вставкой.