Коннектор Spark
Этот коннектор использует специфические оптимизации ClickHouse, такие как продвинутое разделение и предикатный спуск, чтобы улучшить производительность запросов и обработку данных. Коннектор основан на официальном JDBC коннекторе ClickHouse и управляет своим собственным каталогом.
Перед Spark 3.0 в Spark отсутствовала концепция встроенного каталога, поэтому пользователи обычно полагались на внешние каталоги, такие как Hive Metastore или AWS Glue. С этими внешними решениями пользователям приходилось вручную регистрировать свои таблицы источника данных перед доступом к ним в Spark. Однако с момента введения концепции каталога в Spark 3.0 Spark теперь может автоматически обнаруживать таблицы, регистрируя плагины каталога.
По умолчанию каталог Spark называется spark_catalog
, а таблицы идентифицируются по формату {catalog name}.{database}.{table}
. С новой
функцией каталога теперь возможно добавлять и работать с несколькими каталогами в одном приложении Spark.
Требования
- Java 8 или 17
- Scala 2.12 или 2.13
- Apache Spark 3.3 или 3.4 или 3.5
Матрица совместимости
Версия | Совместимые версии Spark | Версия ClickHouse JDBC |
---|---|---|
main | Spark 3.3, 3.4, 3.5 | 0.6.3 |
0.8.1 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
0.8.0 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
0.7.3 | Spark 3.3, 3.4 | 0.4.6 |
0.6.0 | Spark 3.3 | 0.3.2-patch11 |
0.5.0 | Spark 3.2, 3.3 | 0.3.2-patch11 |
0.4.0 | Spark 3.2, 3.3 | Не зависит от |
0.3.0 | Spark 3.2, 3.3 | Не зависит от |
0.2.1 | Spark 3.2 | Не зависит от |
0.1.2 | Spark 3.2 | Не зависит от |
Установка и настройка
Для интеграции ClickHouse с Spark есть несколько вариантов установки, которые подходят для различных настроек проектов.
Вы можете добавить ClickHouse Spark коннектор в качестве зависимости прямо в файл сборки вашего проекта (например, в pom.xml
для Maven или build.sbt
для SBT).
Кроме того, вы можете положить необходимые JAR-файлы в папку $SPARK_HOME/jars/
, или передать их непосредственно как опцию Spark,
используя флаг --jars
в команде spark-submit
.
Оба подхода гарантируют, что коннектор ClickHouse доступен в вашей среде Spark.
Импорт как зависимость
- Maven
- Gradle
- SBT
- Spark SQL/Shell CLI
Добавьте следующий репозиторий, если хотите использовать SNAPSHOT-версию.
Добавьте следующий репозиторий, если хотите использовать SNAPSHOT-версию:
При работе с опциями оболочки Spark (Spark SQL CLI, Spark Shell CLI и команда Spark Submit) зависимости можно зарегистрировать, передав необходимые JAR:
Если вы хотите избежать копирования JAR-файлов на узел клиента Spark, вы можете использовать следующее:
Примечание: Для случаев использования только SQL рекомендуется использовать Apache Kyuubi в производственных условиях.
Загрузка библиотеки
Шаблон имени бинарного JAR:
Вы можете найти все доступные выпущенные JAR-файлы в Maven Central Repository и все ежедневные сборки SNAPSHOT JAR файлов в Sonatype OSS Snapshots Repository.
Важно включить clickhouse-jdbc JAR с классификатором "all", так как коннектор зависит от clickhouse-http и clickhouse-client — оба из которых собраны в clickhouse-jdbc:all. В качестве альтернативы, вы можете добавить clickhouse-client JAR и clickhouse-http отдельно, если вы предпочитаете не использовать полный пакет JDBC.
В любом случае, убедитесь, что версии пакетов совместимы в соответствии с Матрицей совместимости.
Регистрация каталога (обязательная)
Для доступа к вашим таблицам ClickHouse необходимо настроить новый каталог Spark с помощью следующих параметров:
Параметр | Значение | Значение по умолчанию | Обязательно |
---|---|---|---|
spark.sql.catalog.<catalog_name> | com.clickhouse.spark.ClickHouseCatalog | N/A | Да |
spark.sql.catalog.<catalog_name>.host | <clickhouse_host> | localhost | Нет |
spark.sql.catalog.<catalog_name>.protocol | http | http | Нет |
spark.sql.catalog.<catalog_name>.http_port | <clickhouse_port> | 8123 | Нет |
spark.sql.catalog.<catalog_name>.user | <clickhouse_username> | default | Нет |
spark.sql.catalog.<catalog_name>.password | <clickhouse_password> | (пустая строка) | Нет |
spark.sql.catalog.<catalog_name>.database | <database> | default | Нет |
spark.<catalog_name>.write.format | json | arrow | Нет |
Эти настройки могут быть заданы с помощью одного из следующих способов:
- Редактировать/Создать
spark-defaults.conf
. - Передать конфигурацию вашей команде
spark-submit
(или вашим командамspark-shell
/spark-sql
). - Добавить конфигурацию при инициализации вашего контекста.
При работе с кластером ClickHouse вам необходимо задать уникальное имя каталога для каждого экземпляра. Например:
Таким образом, вы сможете получить доступ к таблице clickhouse1 <ck_db>.<ck_table>
из Spark SQL по
clickhouse1.<ck_db>.<ck_table>
, и получить доступ к таблице clickhouse2 <ck_db>.<ck_table>
по clickhouse2.<ck_db>.<ck_table>
.
Настройки ClickHouse Cloud
При подключении к ClickHouse Cloud убедитесь, что включен SSL и установлен соответствующий режим SSL. Например:
Чтение данных
- Java
- Scala
- Python
- Spark SQL
Запись данных
- Java
- Scala
- Python
- Spark SQL
DDL операции
Вы можете выполнять DDL операции на вашем экземпляре ClickHouse, используя Spark SQL, при этом все изменения немедленно сохраняются в ClickHouse. Spark SQL позволяет писать запросы точно так же, как вы бы сделали это в ClickHouse, поэтому вы можете напрямую выполнять команды, такие как CREATE TABLE, TRUNCATE и другие — без модификации, например:
При использовании Spark SQL можно выполнять только одно выражение за раз.
Приведенные выше примеры демонстрируют запросы Spark SQL, которые вы можете выполнять в своем приложении, используя любой API — Java, Scala, PySpark или оболочку.
Конфигурации
Следующие параметры конфигурации доступны в коннекторе:
Ключ | По умолчанию | Описание | С момента |
---|---|---|---|
spark.clickhouse.ignoreUnsupportedTransform | false | ClickHouse поддерживает использование сложных выражений в качестве ключей шардирования или значений партиций, например, cityHash64(col_1, col_2) , которые в настоящее время не поддерживаются Spark. Если true , игнорировать неподдерживаемые выражения, в противном случае быстро выходить с исключением. Обратите внимание, что при включении spark.clickhouse.write.distributed.convertLocal игнорирование неподдерживаемых ключей шардирования может привести к повреждению данных. | 0.4.0 |
spark.clickhouse.read.compression.codec | lz4 | Кодек, используемый для декомпрессии данных при чтении. Поддерживаемые кодеки: none, lz4. | 0.5.0 |
spark.clickhouse.read.distributed.convertLocal | true | При чтении распределенной таблицы считывать локальную таблицу вместо самой себя. Если true , игнорировать spark.clickhouse.read.distributed.useClusterNodes . | 0.1.0 |
spark.clickhouse.read.fixedStringAs | binary | Чтение типа FixedString ClickHouse как указанный тип данных Spark. Поддерживаемые типы: binary, string | 0.8.0 |
spark.clickhouse.read.format | json | Формат сериализации для чтения. Поддерживаемые форматы: json, binary | 0.6.0 |
spark.clickhouse.read.runtimeFilter.enabled | false | Включить фильтр времени выполнения для чтения. | 0.8.0 |
spark.clickhouse.read.splitByPartitionId | true | Если true , конструировать фильтр входной партиции по виртуальной колонке _partition_id , вместо значения партиции. Известны проблемы с составлением SQL предикатов по значению партиции. Эта функция требует ClickHouse Server v21.6+ | 0.4.0 |
spark.clickhouse.useNullableQuerySchema | false | Если true , пометить все поля схемы запроса как допускающие null значения при выполнении CREATE/REPLACE TABLE ... AS SELECT ... для создания таблицы. Обратите внимание, что эта конфигурация требует SPARK-43390 (доступен в Spark 3.5), без этой патча она всегда ведет себя как true . | 0.8.0 |
spark.clickhouse.write.batchSize | 10000 | Количество записей на партию при записи в ClickHouse. | 0.1.0 |
spark.clickhouse.write.compression.codec | lz4 | Кодек, используемый для сжатия данных при записи. Поддерживаемые кодеки: none, lz4. | 0.3.0 |
spark.clickhouse.write.distributed.convertLocal | false | При записи в распределенную таблицу записывать локальную таблицу вместо самой себя. Если true , игнорировать spark.clickhouse.write.distributed.useClusterNodes . | 0.1.0 |
spark.clickhouse.write.distributed.useClusterNodes | true | Записывать на все узлы кластера при записи в распределенную таблицу. | 0.1.0 |
spark.clickhouse.write.format | arrow | Формат сериализации для записи. Поддерживаемые форматы: json, arrow | 0.4.0 |
spark.clickhouse.write.localSortByKey | true | Если true , выполнять локальную сортировку по ключам сортировки перед записью. | 0.3.0 |
spark.clickhouse.write.localSortByPartition | значение spark.clickhouse.write.repartitionByPartition | Если true , выполнять локальную сортировку по партиции перед записью. Если не задано, то равно spark.clickhouse.write.repartitionByPartition . | 0.3.0 |
spark.clickhouse.write.maxRetry | 3 | Максимальное количество попыток записи, которые мы будем повторять для одной неудачной записи партии с кодами, допускающими повтор. | 0.1.0 |
spark.clickhouse.write.repartitionByPartition | true | Необходимость повторной партиции данных по ключам партиционной ClickHouse, чтобы соответствовать распределению таблицы ClickHouse перед записью. | 0.3.0 |
spark.clickhouse.write.repartitionNum | 0 | Повторная партиция данных для соответствия распределению таблицы ClickHouse необходима перед записью, используйте эту конфигурацию для указания числа повторной партиции, значение меньше 1 означает отсутствие требования. | 0.1.0 |
spark.clickhouse.write.repartitionStrictly | false | Если true , Spark строго распределит входящие записи по партициям, чтобы удовлетворить требуемое распределение перед передачей записей в таблицу источника данных на запись. В противном случае Spark может применить определенные оптимизации для ускорения запроса, но нарушить требование по распределению. Обратите внимание, что эта конфигурация требует SPARK-37523 (доступен в Spark 3.4), без этой патча она всегда ведет себя как true . | 0.3.0 |
spark.clickhouse.write.retryInterval | 10s | Интервал в секундах между попытками повторной записи. | 0.1.0 |
spark.clickhouse.write.retryableErrorCodes | 241 | Код ошибки, допускаемые для повторной записи, возвращаемые сервером ClickHouse при неудаче записи. | 0.1.0 |
Поддерживаемые типы данных
В этом разделе описывается сопоставление типов данных между Spark и ClickHouse. Таблицы ниже предоставляют быстрые ссылки для преобразования типов данных при чтении из ClickHouse в Spark и при вставке данных из Spark в ClickHouse.
Чтение данных из ClickHouse в Spark
Тип данных ClickHouse | Тип данных Spark | Поддерживаемый | Является примитивным | Примечания |
---|---|---|---|---|
Nothing | NullType | ✅ | Да | |
Bool | BooleanType | ✅ | Да | |
UInt8 , Int16 | ShortType | ✅ | Да | |
Int8 | ByteType | ✅ | Да | |
UInt16 ,Int32 | IntegerType | ✅ | Да | |
UInt32 ,Int64 , UInt64 | LongType | ✅ | Да | |
Int128 ,UInt128 , Int256 , UInt256 | DecimalType(38, 0) | ✅ | Да | |
Float32 | FloatType | ✅ | Да | |
Float64 | DoubleType | ✅ | Да | |
String , JSON , UUID , Enum8 , Enum16 , IPv4 , IPv6 | StringType | ✅ | Да | |
FixedString | BinaryType , StringType | ✅ | Да | Контролируется конфигурацией READ_FIXED_STRING_AS |
Decimal | DecimalType | ✅ | Да | Точность и масштаб до Decimal128 |
Decimal32 | DecimalType(9, scale) | ✅ | Да | |
Decimal64 | DecimalType(18, scale) | ✅ | Да | |
Decimal128 | DecimalType(38, scale) | ✅ | Да | |
Date , Date32 | DateType | ✅ | Да | |
DateTime , DateTime32 , DateTime64 | TimestampType | ✅ | Да | |
Array | ArrayType | ✅ | Нет | Тип элемента массива также преобразуется |
Map | MapType | ✅ | Нет | Ключи ограничены StringType |
IntervalYear | YearMonthIntervalType(Year) | ✅ | Да | |
IntervalMonth | YearMonthIntervalType(Month) | ✅ | Да | |
IntervalDay , IntervalHour , IntervalMinute , IntervalSecond | DayTimeIntervalType | ✅ | Нет | Используется специфический тип интервала |
Object | ❌ | |||
Nested | ❌ | |||
Tuple | ❌ | |||
Point | ❌ | |||
Polygon | ❌ | |||
MultiPolygon | ❌ | |||
Ring | ❌ | |||
IntervalQuarter | ❌ | |||
IntervalWeek | ❌ | |||
Decimal256 | ❌ | |||
AggregateFunction | ❌ | |||
SimpleAggregateFunction | ❌ |
Вставка данных из Spark в ClickHouse
Тип данных Spark | Тип данных ClickHouse | Поддерживаемый | Является примитивным | Примечания |
---|---|---|---|---|
BooleanType | UInt8 | ✅ | Да | |
ByteType | Int8 | ✅ | Да | |
ShortType | Int16 | ✅ | Да | |
IntegerType | Int32 | ✅ | Да | |
LongType | Int64 | ✅ | Да | |
FloatType | Float32 | ✅ | Да | |
DoubleType | Float64 | ✅ | Да | |
StringType | String | ✅ | Да | |
VarcharType | String | ✅ | Да | |
CharType | String | ✅ | Да | |
DecimalType | Decimal(p, s) | ✅ | Да | Точность и масштаб до Decimal128 |
DateType | Date | ✅ | Да | |
TimestampType | DateTime | ✅ | Да | |
ArrayType (список, кортеж или массив) | Array | ✅ | Нет | Тип элемента массива также преобразуется |
MapType | Map | ✅ | Нет | Ключи ограничены StringType |
Участие и поддержка
Если вы хотите внести свой вклад в проект или сообщить о каких-либо проблемах, мы будем рады вашему участию! Посетите наш репозиторий на GitHub, чтобы открыть новую проблему, предложить улучшения или отправить запрос на внесение изменений. Мы приветствуем любой вклад! Пожалуйста, ознакомьтесь с рекомендациями по участию в репозитории перед тем, как начинать. Спасибо за помощь в улучшении нашего ClickHouse Spark connector!