Перейти к основному содержимому
Перейти к основному содержимому

Коннектор Spark

Этот коннектор использует специфические оптимизации ClickHouse, такие как продвинутое разделение и предикатный спуск, чтобы улучшить производительность запросов и обработку данных. Коннектор основан на официальном JDBC коннекторе ClickHouse и управляет своим собственным каталогом.

Перед Spark 3.0 в Spark отсутствовала концепция встроенного каталога, поэтому пользователи обычно полагались на внешние каталоги, такие как Hive Metastore или AWS Glue. С этими внешними решениями пользователям приходилось вручную регистрировать свои таблицы источника данных перед доступом к ним в Spark. Однако с момента введения концепции каталога в Spark 3.0 Spark теперь может автоматически обнаруживать таблицы, регистрируя плагины каталога.

По умолчанию каталог Spark называется spark_catalog, а таблицы идентифицируются по формату {catalog name}.{database}.{table}. С новой функцией каталога теперь возможно добавлять и работать с несколькими каталогами в одном приложении Spark.

Требования

  • Java 8 или 17
  • Scala 2.12 или 2.13
  • Apache Spark 3.3 или 3.4 или 3.5

Матрица совместимости

ВерсияСовместимые версии SparkВерсия ClickHouse JDBC
mainSpark 3.3, 3.4, 3.50.6.3
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3Не зависит от
0.3.0Spark 3.2, 3.3Не зависит от
0.2.1Spark 3.2Не зависит от
0.1.2Spark 3.2Не зависит от

Установка и настройка

Для интеграции ClickHouse с Spark есть несколько вариантов установки, которые подходят для различных настроек проектов. Вы можете добавить ClickHouse Spark коннектор в качестве зависимости прямо в файл сборки вашего проекта (например, в pom.xml для Maven или build.sbt для SBT). Кроме того, вы можете положить необходимые JAR-файлы в папку $SPARK_HOME/jars/, или передать их непосредственно как опцию Spark, используя флаг --jars в команде spark-submit. Оба подхода гарантируют, что коннектор ClickHouse доступен в вашей среде Spark.

Импорт как зависимость

<dependency>
  <groupId>com.clickhouse.spark</groupId>
  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
  <version>{{ stable_version }}</version>
</dependency>
<dependency>
  <groupId>com.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <classifier>all</classifier>
  <version>{{ clickhouse_jdbc_version }}</version>
  <exclusions>
    <exclusion>
      <groupId>*</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

Добавьте следующий репозиторий, если хотите использовать SNAPSHOT-версию.

<repositories>
  <repository>
    <id>sonatype-oss-snapshots</id>
    <name>Sonatype OSS Snapshots Repository</name>
    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
  </repository>
</repositories>

Загрузка библиотеки

Шаблон имени бинарного JAR:

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

Вы можете найти все доступные выпущенные JAR-файлы в Maven Central Repository и все ежедневные сборки SNAPSHOT JAR файлов в Sonatype OSS Snapshots Repository.

к сведению

Важно включить clickhouse-jdbc JAR с классификатором "all", так как коннектор зависит от clickhouse-http и clickhouse-client — оба из которых собраны в clickhouse-jdbc:all. В качестве альтернативы, вы можете добавить clickhouse-client JAR и clickhouse-http отдельно, если вы предпочитаете не использовать полный пакет JDBC.

В любом случае, убедитесь, что версии пакетов совместимы в соответствии с Матрицей совместимости.

Регистрация каталога (обязательная)

Для доступа к вашим таблицам ClickHouse необходимо настроить новый каталог Spark с помощью следующих параметров:

ПараметрЗначениеЗначение по умолчаниюОбязательно
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/AДа
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostНет
spark.sql.catalog.<catalog_name>.protocolhttphttpНет
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123Нет
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultНет
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(пустая строка)Нет
spark.sql.catalog.<catalog_name>.database<database>defaultНет
spark.<catalog_name>.write.formatjsonarrowНет

Эти настройки могут быть заданы с помощью одного из следующих способов:

  • Редактировать/Создать spark-defaults.conf.
  • Передать конфигурацию вашей команде spark-submit (или вашим командам spark-shell/spark-sql).
  • Добавить конфигурацию при инициализации вашего контекста.
к сведению

При работе с кластером ClickHouse вам необходимо задать уникальное имя каталога для каждого экземпляра. Например:

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host           10.0.0.1
spark.sql.catalog.clickhouse1.protocol       https
spark.sql.catalog.clickhouse1.http_port      8443
spark.sql.catalog.clickhouse1.user           default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database       default
spark.sql.catalog.clickhouse1.option.ssl     true

spark.sql.catalog.clickhouse2                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host           10.0.0.2
spark.sql.catalog.clickhouse2.protocol       https
spark.sql.catalog.clickhouse2.http_port      8443
spark.sql.catalog.clickhouse2.user           default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database       default
spark.sql.catalog.clickhouse2.option.ssl     true

Таким образом, вы сможете получить доступ к таблице clickhouse1 <ck_db>.<ck_table> из Spark SQL по clickhouse1.<ck_db>.<ck_table>, и получить доступ к таблице clickhouse2 <ck_db>.<ck_table> по clickhouse2.<ck_db>.<ck_table>.

Настройки ClickHouse Cloud

При подключении к ClickHouse Cloud убедитесь, что включен SSL и установлен соответствующий режим SSL. Например:

spark.sql.catalog.clickhouse.option.ssl        true
spark.sql.catalog.clickhouse.option.ssl_mode   NONE

Чтение данных

public static void main(String[] args) {
        // Create a Spark session
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

        df.show();

        spark.stop();
    }

Запись данных

public static void main(String[] args) throws AnalysisException {

       // Create a Spark session
       SparkSession spark = SparkSession.builder()
               .appName("example")
               .master("local[*]")
               .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
               .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
               .config("spark.sql.catalog.clickhouse.protocol", "http")
               .config("spark.sql.catalog.clickhouse.http_port", "8123")
               .config("spark.sql.catalog.clickhouse.user", "default")
               .config("spark.sql.catalog.clickhouse.password", "123456")
               .config("spark.sql.catalog.clickhouse.database", "default")
               .config("spark.clickhouse.write.format", "json")
               .getOrCreate();

       // Define the schema for the DataFrame
       StructType schema = new StructType(new StructField[]{
               DataTypes.createStructField("id", DataTypes.IntegerType, false),
               DataTypes.createStructField("name", DataTypes.StringType, false),
       });

       List<Row> data = Arrays.asList(
               RowFactory.create(1, "Alice"),
               RowFactory.create(2, "Bob")
       );

       // Create a DataFrame
       Dataset<Row> df = spark.createDataFrame(data, schema);

       df.writeTo("clickhouse.default.example_table").append();

       spark.stop();
   }

DDL операции

Вы можете выполнять DDL операции на вашем экземпляре ClickHouse, используя Spark SQL, при этом все изменения немедленно сохраняются в ClickHouse. Spark SQL позволяет писать запросы точно так же, как вы бы сделали это в ClickHouse, поэтому вы можете напрямую выполнять команды, такие как CREATE TABLE, TRUNCATE и другие — без модификации, например:

примечание

При использовании Spark SQL можно выполнять только одно выражение за раз.

USE clickhouse; 

CREATE TABLE test_db.tbl_sql (
  create_time TIMESTAMP NOT NULL,
  m           INT       NOT NULL COMMENT 'part key',
  id          BIGINT    NOT NULL COMMENT 'sort key',
  value       STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  settings.index_granularity = 8192
);

Приведенные выше примеры демонстрируют запросы Spark SQL, которые вы можете выполнять в своем приложении, используя любой API — Java, Scala, PySpark или оболочку.

Конфигурации

Следующие параметры конфигурации доступны в коннекторе:


КлючПо умолчаниюОписаниеС момента
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse поддерживает использование сложных выражений в качестве ключей шардирования или значений партиций, например, cityHash64(col_1, col_2), которые в настоящее время не поддерживаются Spark. Если true, игнорировать неподдерживаемые выражения, в противном случае быстро выходить с исключением. Обратите внимание, что при включении spark.clickhouse.write.distributed.convertLocal игнорирование неподдерживаемых ключей шардирования может привести к повреждению данных.0.4.0
spark.clickhouse.read.compression.codeclz4Кодек, используемый для декомпрессии данных при чтении. Поддерживаемые кодеки: none, lz4.0.5.0
spark.clickhouse.read.distributed.convertLocaltrueПри чтении распределенной таблицы считывать локальную таблицу вместо самой себя. Если true, игнорировать spark.clickhouse.read.distributed.useClusterNodes.0.1.0
spark.clickhouse.read.fixedStringAsbinaryЧтение типа FixedString ClickHouse как указанный тип данных Spark. Поддерживаемые типы: binary, string0.8.0
spark.clickhouse.read.formatjsonФормат сериализации для чтения. Поддерживаемые форматы: json, binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalseВключить фильтр времени выполнения для чтения.0.8.0
spark.clickhouse.read.splitByPartitionIdtrueЕсли true, конструировать фильтр входной партиции по виртуальной колонке _partition_id, вместо значения партиции. Известны проблемы с составлением SQL предикатов по значению партиции. Эта функция требует ClickHouse Server v21.6+0.4.0
spark.clickhouse.useNullableQuerySchemafalseЕсли true, пометить все поля схемы запроса как допускающие null значения при выполнении CREATE/REPLACE TABLE ... AS SELECT ... для создания таблицы. Обратите внимание, что эта конфигурация требует SPARK-43390 (доступен в Spark 3.5), без этой патча она всегда ведет себя как true.0.8.0
spark.clickhouse.write.batchSize10000Количество записей на партию при записи в ClickHouse.0.1.0
spark.clickhouse.write.compression.codeclz4Кодек, используемый для сжатия данных при записи. Поддерживаемые кодеки: none, lz4.0.3.0
spark.clickhouse.write.distributed.convertLocalfalseПри записи в распределенную таблицу записывать локальную таблицу вместо самой себя. Если true, игнорировать spark.clickhouse.write.distributed.useClusterNodes.0.1.0
spark.clickhouse.write.distributed.useClusterNodestrueЗаписывать на все узлы кластера при записи в распределенную таблицу.0.1.0
spark.clickhouse.write.formatarrowФормат сериализации для записи. Поддерживаемые форматы: json, arrow0.4.0
spark.clickhouse.write.localSortByKeytrueЕсли true, выполнять локальную сортировку по ключам сортировки перед записью.0.3.0
spark.clickhouse.write.localSortByPartitionзначение spark.clickhouse.write.repartitionByPartitionЕсли true, выполнять локальную сортировку по партиции перед записью. Если не задано, то равно spark.clickhouse.write.repartitionByPartition.0.3.0
spark.clickhouse.write.maxRetry3Максимальное количество попыток записи, которые мы будем повторять для одной неудачной записи партии с кодами, допускающими повтор.0.1.0
spark.clickhouse.write.repartitionByPartitiontrueНеобходимость повторной партиции данных по ключам партиционной ClickHouse, чтобы соответствовать распределению таблицы ClickHouse перед записью.0.3.0
spark.clickhouse.write.repartitionNum0Повторная партиция данных для соответствия распределению таблицы ClickHouse необходима перед записью, используйте эту конфигурацию для указания числа повторной партиции, значение меньше 1 означает отсутствие требования.0.1.0
spark.clickhouse.write.repartitionStrictlyfalseЕсли true, Spark строго распределит входящие записи по партициям, чтобы удовлетворить требуемое распределение перед передачей записей в таблицу источника данных на запись. В противном случае Spark может применить определенные оптимизации для ускорения запроса, но нарушить требование по распределению. Обратите внимание, что эта конфигурация требует SPARK-37523 (доступен в Spark 3.4), без этой патча она всегда ведет себя как true.0.3.0
spark.clickhouse.write.retryInterval10sИнтервал в секундах между попытками повторной записи.0.1.0
spark.clickhouse.write.retryableErrorCodes241Код ошибки, допускаемые для повторной записи, возвращаемые сервером ClickHouse при неудаче записи.0.1.0

Поддерживаемые типы данных

В этом разделе описывается сопоставление типов данных между Spark и ClickHouse. Таблицы ниже предоставляют быстрые ссылки для преобразования типов данных при чтении из ClickHouse в Spark и при вставке данных из Spark в ClickHouse.

Чтение данных из ClickHouse в Spark

Тип данных ClickHouseТип данных SparkПоддерживаемыйЯвляется примитивнымПримечания
NothingNullTypeДа
BoolBooleanTypeДа
UInt8, Int16ShortTypeДа
Int8ByteTypeДа
UInt16,Int32IntegerTypeДа
UInt32,Int64, UInt64LongTypeДа
Int128,UInt128, Int256, UInt256DecimalType(38, 0)Да
Float32FloatTypeДа
Float64DoubleTypeДа
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeДа
FixedStringBinaryType, StringTypeДаКонтролируется конфигурацией READ_FIXED_STRING_AS
DecimalDecimalTypeДаТочность и масштаб до Decimal128
Decimal32DecimalType(9, scale)Да
Decimal64DecimalType(18, scale)Да
Decimal128DecimalType(38, scale)Да
Date, Date32DateTypeДа
DateTime, DateTime32, DateTime64TimestampTypeДа
ArrayArrayTypeНетТип элемента массива также преобразуется
MapMapTypeНетКлючи ограничены StringType
IntervalYearYearMonthIntervalType(Year)Да
IntervalMonthYearMonthIntervalType(Month)Да
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeНетИспользуется специфический тип интервала
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

Вставка данных из Spark в ClickHouse

Тип данных SparkТип данных ClickHouseПоддерживаемыйЯвляется примитивнымПримечания
BooleanTypeUInt8Да
ByteTypeInt8Да
ShortTypeInt16Да
IntegerTypeInt32Да
LongTypeInt64Да
FloatTypeFloat32Да
DoubleTypeFloat64Да
StringTypeStringДа
VarcharTypeStringДа
CharTypeStringДа
DecimalTypeDecimal(p, s)ДаТочность и масштаб до Decimal128
DateTypeDateДа
TimestampTypeDateTimeДа
ArrayType (список, кортеж или массив)ArrayНетТип элемента массива также преобразуется
MapTypeMapНетКлючи ограничены StringType

Участие и поддержка

Если вы хотите внести свой вклад в проект или сообщить о каких-либо проблемах, мы будем рады вашему участию! Посетите наш репозиторий на GitHub, чтобы открыть новую проблему, предложить улучшения или отправить запрос на внесение изменений. Мы приветствуем любой вклад! Пожалуйста, ознакомьтесь с рекомендациями по участию в репозитории перед тем, как начинать. Спасибо за помощь в улучшении нашего ClickHouse Spark connector!