Перейти к основному содержимому
Перейти к основному содержимому

ClickHouse Kafka Connect Sink

примечание

Если вам нужна помощь, пожалуйста, сообщите о проблеме в репозитории или задайте вопрос в публичном Slack ClickHouse.

ClickHouse Kafka Connect Sink — это коннектор Kafka, который передает данные из топика Kafka в таблицу ClickHouse.

Лицензия

Коннектор Kafka Sink распространяется под лицензией Apache 2.0.

Требования к окружению

В окружении должна быть установлена версия фреймворка Kafka Connect v2.7 или более поздняя.

Матрица совместимости версий

Версия ClickHouse Kafka ConnectВерсия ClickHouseKafka ConnectConfluent platform
1.0.0> 23.3> 2.7> 6.1

Основные функции

  • Поставляется с готовой семантикой exactly-once. Она поддерживается новой основной функцией ClickHouse под названием KeeperMap (используется как хранилище состояния коннектора) и позволяет создать минималистичную архитектуру.
  • Поддержка сторонних хранилищ состояния: По умолчанию используется in-memory, но может использовать KeeperMap (Redis будет добавлен скоро).
  • Интеграция с основным продуктом: Построен, поддерживается и обслуживается ClickHouse.
  • Постоянное тестирование с ClickHouse Cloud.
  • Вставка данных с заданной схемой и без схемы.
  • Поддержка всех типов данных ClickHouse.

Инструкции по установке

Соберите свои данные для подключения

Чтобы подключиться к ClickHouse с помощью HTTP(S), вам необходима следующая информация:

  • ХОСТ и ПОРТ: как правило, порт 8443 при использовании TLS или 8123 при отсутствии TLS.

  • ИМЯ БАЗЫ ДАННЫХ: по умолчанию существует база данных с именем default, используйте имя базы данных, к которой вы хотите подключиться.

  • ИМЯ ПОЛЬЗОВАТЕЛЯ и ПАРОЛЬ: по умолчанию имя пользователя default. Используйте имя пользователя, подходящее для вашего случая.

Данные для вашего сервиса ClickHouse Cloud доступны в консоли ClickHouse Cloud. Выберите сервис, к которому вы хотите подключиться, и нажмите Подключиться:

Кнопка подключения к сервису ClickHouse Cloud

Выберите HTTPS, и данные будут доступны в примере команды curl.

Детали подключения ClickHouse Cloud по HTTPS

Если вы используете самоуправляемый ClickHouse, детали подключения устанавливаются вашим администратором ClickHouse.

Общие инструкции по установке

Коннектор распространяется в виде одного JAR файла, содержащего все классы, необходимые для работы плагина.

Чтобы установить плагин, выполните следующие шаги:

  • Скачайте ZIP-архив, содержащий JAR файл коннектора, на странице Releases репозитория ClickHouse Kafka Connect Sink.
  • Извлеките содержимое ZIP-файла и скопируйте его в нужное место.
  • Добавьте путь с директорией плагинов в конфигурацию plugin.path в вашем файле свойств Connect, чтобы платформа Confluent могла найти плагин.
  • Укажите имя топика, имя хоста экземпляра ClickHouse и пароль в конфигурации.
connector.class=com.clickhouse.kafka.connect.ClickHouseSinkConnector
tasks.max=1
topics=<topic_name>
ssl=true
jdbcConnectionProperties=?sslmode=STRICT
security.protocol=SSL
hostname=<hostname>
database=<database_name>
password=<password>
ssl.truststore.location=/tmp/kafka.client.truststore.jks
port=8443
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
exactlyOnce=true
username=default
schemas.enable=false
  • Перезапустите платформу Confluent.
  • Если вы используете платформу Confluent, войдите в пользовательский интерфейс Confluent Control Center, чтобы убедиться, что ClickHouse Sink доступен в списке доступных коннекторов.

Опции конфигурации

Чтобы подключить ClickHouse Sink к серверу ClickHouse, вам необходимо предоставить:

  • данные подключения: hostname (обязательно) и порт (опционально)
  • учетные данные пользователя: пароль (обязательно) и имя пользователя (опционально)
  • класс коннектора: com.clickhouse.kafka.connect.ClickHouseSinkConnector (обязательно)
  • topics или topics.regex: топики Kafka для опроса - имена топиков должны совпадать с именами таблиц (обязательно)
  • преобразователи ключей и значений: настройте в зависимости от типа данных в вашем топике. Обязательно, если не определено в конфигурации рабочего процесса.

Полная таблица опций конфигурации:

Имя свойстваОписаниеЗначение по умолчанию
hostname (Обязательно)Имя хоста или IP-адрес сервераN/A
portПорт ClickHouse - по умолчанию 8443 (для HTTPS в облаке), но для HTTP (по умолчанию для саморазмещенных) это должно быть 81238443
sslВключить SSL-соединение с ClickHousetrue
jdbcConnectionPropertiesСвойства соединения при подключении к Clickhouse. Должен начинаться с ? и объединяться с помощью & между param=value""
usernameИмя пользователя базы данных ClickHousedefault
password (Обязательно)Пароль базы данных ClickHouseN/A
databaseИмя базы данных ClickHousedefault
connector.class (Обязательно)Класс коннектора (явно установленный и сохраненный как значение по умолчанию)"com.clickhouse.kafka.connect.ClickHouseSinkConnector"
tasks.maxКоличество задач коннектора"1"
errors.retry.timeoutТаймаут повторной попытки ClickHouse JDBC"60"
exactlyOnceВключен режим Exactly Once"false"
topics (Обязательно)Топики Kafka для опроса - имена топиков должны совпадать с именами таблиц""
key.converter (Обязательно* - см. Описание)Установите в зависимости от типов ваших ключей. Обязательно, если вы передаете ключи (и не определены в конфигурации рабочего процесса)."org.apache.kafka.connect.storage.StringConverter"
value.converter (Обязательно* - см. Описание)Установите в зависимости от типа данных в вашем топике. Поддерживаемые: - JSON, String, Avro или Protobuf форматы. Обязательно, если не определено в конфигурации рабочего процесса."org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enableПоддержка схемы преобразователя значений коннектора"false"
errors.toleranceОшибки в коннекторе. Поддерживаемые: none, all"none"
errors.deadletterqueue.topic.nameЕсли установлено (с errors.tolerance=all), будет использоваться DLQ для неудачных пакетных операций (см. Устранение неполадок)""
errors.deadletterqueue.context.headers.enableДобавляет дополнительные заголовки для DLQ""
clickhouseSettingsСписок настроек ClickHouse, разделенный запятыми (например, "insert_quorum=2 и т. д.")""
topic2TableMapСписок, разделенный запятыми, который сопоставляет имена топиков с именами таблиц (например, "topic1=table1, topic2=table2 и т. д.")""
tableRefreshIntervalВремя (в секундах) для обновления кэша определения таблицы0
keeperOnClusterПозволяет настроить параметр ON CLUSTER для саморазмещенных экземпляров (например, ON CLUSTER clusterNameInConfigFileDefinition) для таблицы connect_state exactly-once (см. Распределенные DDL Запросы)""
bypassRowBinaryПозволяет отключить использование RowBinary и RowBinaryWithDefaults для данных на основе схемы (Avro, Protobuf и т. д.) - следует использовать только тогда, когда данные будут иметь отсутствующие столбцы, и Nullable/Default недопустимы"false"
dateTimeFormatsФорматы даты и времени для разбора полей схемы DateTime64, разделенные ; (например, someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss).""
tolerateStateMismatchПозволяет коннектору удалять записи "ранее", чем текущий сохраненный офсет AFTER_PROCESSING (например, если передан офсет 5, а последний записанный офсет - 250)"false"
ignorePartitionsWhenBatchingИгнорирует партицию при сборе сообщений для вставки (хотя только если exactlyOnce равно false). Примечание по производительности: больше задач коннектора, меньше партиций Kafka, назначенных на каждую задачу - это может означать снижающуюся отдачу."false"

Целевые таблицы

ClickHouse Connect Sink считывает сообщения из топиков Kafka и записывает их в соответствующие таблицы. ClickHouse Connect Sink записывает данные в существующие таблицы. Пожалуйста, убедитесь, что целевая таблица с подходящей схемой была создана в ClickHouse до начала вставки данных в нее.

Каждому топику требуется выделенная целевая таблица в ClickHouse. Имя целевой таблицы должно совпадать с именем исходного топика.

Предварительная обработка

Если вам нужно преобразовать исходящие сообщения перед их отправкой в ClickHouse Kafka Connect Sink, используйте Преобразования Kafka Connect.

Поддерживаемые типы данных

С объявленной схемой:

Тип Kafka ConnectТип ClickHouseПоддерживаетсяПримитив
STRINGStringДа
STRINGJSON. См. ниже (1)Да
INT8Int8Да
INT16Int16Да
INT32Int32Да
INT64Int64Да
FLOAT32Float32Да
FLOAT64Float64Да
BOOLEANBooleanДа
ARRAYArray(T)Нет
MAPMap(Primitive, T)Нет
STRUCTVariant(T1, T2, ...)Нет
STRUCTTuple(a T1, b T2, ...)Нет
STRUCTNested(a T1, b T2, ...)Нет
STRUCTJSON. См. ниже (1), (2)Нет
BYTESStringНет
org.apache.kafka.connect.data.TimeInt64 / DateTime64Нет
org.apache.kafka.connect.data.TimestampInt32 / Date32Нет
org.apache.kafka.connect.data.DecimalDecimalНет
  • (1) - JSON поддерживается только тогда, когда в настройках ClickHouse установлен input_format_binary_read_json_as_string=1. Это работает только для семейства формата RowBinary, и эта настройка влияет на все столбцы в запросе вставки, так что они все должны быть строками. В этом случае коннектор преобразует STRUCT в строку JSON.

  • (2) - Когда структура имеет объединения, такие как oneof, преобразователь должен быть настроен так, чтобы НЕ добавлять префикс/суффикс к именам полей. Существует настройка generate.index.for.unions=false для ProtobufConverter.

Без объявленной схемы:

Запись преобразуется в JSON и отправляется в ClickHouse как значение в формате JSONEachRow.

Рецепты конфигурации

Вот некоторые общие рецепты конфигурации, которые помогут вам быстро начать.

Основная конфигурация

Наиболее простая конфигурация, чтобы начать работу — предполагается, что вы запускаете Kafka Connect в распределенном режиме и у вас запущен сервер ClickHouse на localhost:8443 с включенным SSL, данные находятся в JSON без схемы.

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "1",
    "consumer.override.max.poll.records": "5000",
    "consumer.override.max.partition.fetch.bytes": "5242880",
    "database": "default",
    "errors.retry.timeout": "60",
    "exactlyOnce": "false",
    "hostname": "localhost",
    "port": "8443",
    "ssl": "true",
    "jdbcConnectionProperties": "?ssl=true&sslmode=strict",
    "username": "default",
    "password": "<PASSWORD>",
    "topics": "<TOPIC_NAME>",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "clickhouseSettings": ""
  }
}

Основная конфигурация с несколькими топиками

Коннектор может потреблять данные из нескольких топиков.

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "topics": "SAMPLE_TOPIC, ANOTHER_TOPIC, YET_ANOTHER_TOPIC",
    ...
  }
}

Основная конфигурация с DLQ

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "<DLQ_TOPIC>",
    "errors.deadletterqueue.context.headers.enable": "true",
  }
}

Использование с разными форматами данных

Поддержка схемы Avro
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "<SCHEMA_REGISTRY_HOST>:<PORT>",
    "value.converter.schemas.enable": "true",
  }
}
Поддержка схемы Protobuf
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "<SCHEMA_REGISTRY_HOST>:<PORT>",
    "value.converter.schemas.enable": "true",
  }
}

Обратите внимание: если у вас возникают проблемы с отсутствующими классами, не в каждой среде есть преобразователь protobuf, и вам может потребоваться альтернативный выпуск jar с зависимостями.

Поддержка схемы JSON
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  }
}
Поддержка строки

Коннектор поддерживает преобразователь строки в разных форматах ClickHouse: JSON, CSV и TSV.

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "customInsertFormat": "true",
    "insertFormat": "CSV"
  }
}

Логирование

Логирование автоматически обеспечивается платформой Kafka Connect. Место назначения и формат журналирования могут быть настроены через конфигурационный файл Kafka connect configuration file.

Если вы используете платформу Confluent, логи можно просмотреть, выполнив команду CLI:

confluent local services connect log

Для получения дополнительных деталей ознакомьтесь с официальным учебником.

Мониторинг

ClickHouse Kafka Connect сообщает метрики времени выполнения через Java Management Extensions (JMX). JMX включен в коннектор Kafka по умолчанию.

Имя MBeanName ClickHouse Connect:

com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask{id}

ClickHouse Kafka Connect сообщает следующие метрики:

ИмяТипОписание
receivedRecordslongОбщее количество полученных записей.
recordProcessingTimelongОбщее время в наносекундах, потраченное на группировку и преобразование записей в единообразную структуру.
taskProcessingTimelongОбщее время в наносекундах, потраченное на обработку и вставку данных в ClickHouse.

Ограничения

  • Удаления не поддерживаются.
  • Размер пакета унаследован от свойств потребителя Kafka.
  • При использовании KeeperMap для exactly-once и изменении или перематывании офсета необходимо удалить содержимое из KeeperMap для этого конкретного топика. (См. руководство по устранению неполадок ниже для получения более подробной информации).

Оптимизация производительности

Если вы когда-либо думали: "Я хотел бы отрегулировать размер пакета для коннектора sink", то этот раздел для вас.

Fetch Connect vs Poll Connector

Kafka Connect (фреймворк, на котором построен наш коннектор sink) будет фетчить сообщения из топиков Kafka в фоновом режиме (независимо от коннектора).

Вы можете контролировать этот процесс, используя fetch.min.bytes и fetch.max.bytes — в то время как fetch.min.bytes устанавливает минимальное количество, необходимое перед тем, как фреймворк передаст значения коннектору (в пределах временного лимита, установленного fetch.max.wait.ms), fetch.max.bytes устанавливает верхний предел размера. Если вы хотите передать более крупные пакеты в коннектор, одним из вариантов может быть увеличение минимального фетча или максимального ожидания для формирования больших наборов данных.

Эти извлеченные данные затем потребляются клиентом коннектора, опрашивающим сообщения, количество для каждого опроса контролируется max.poll.records — обратите внимание, что fetch независим от poll!

Когда вы настраиваете эти параметры, пользователи должны стремиться к тому, чтобы их размер извлечения производил несколько пакетов max.poll.records (и не забывайте, что настройки fetch.min.bytes и fetch.max.bytes представляют собой сжатые данные) — таким образом, каждая задача коннектора вставляет как можно больше пакетов.

ClickHouse оптимизирован для больших пакетов, даже с небольшими задержками, а не для частых, но маленьких пакетов — чем больше пакет, тем лучше.

consumer.max.poll.records=5000
consumer.max.partition.fetch.bytes=5242880

Более подробную информацию можно найти в документации Confluent или в документации Kafka.

Несколько топиков с высокой пропускной способностью

Если ваш коннектор настроен на подписку на несколько топиков, вы используете topic2TableMap для сопоставления топиков с таблицами, и вы experiencing a bottleneck при вставке, что приводит к задержкам потребителя, рассмотрите возможность создания одного коннектора на топик вместо этого. Основная причина, по которой это происходит, заключается в том, что в настоящее время пакеты вставляются в каждую таблицу поочередно.

Создание одного коннектора на топик является обходным решением, которое гарантирует, что вы получите максимально возможную скорость вставки.

Устранение неполадок

"Несоответствие состояния для топика [someTopic] партиция [0]"

Это происходит, когда офсет, хранящийся в KeeperMap, отличается от офсета, хранящегося в Kafka, обычно когда топик был удален или офсет был вручную изменен. Чтобы исправить это, вам нужно удалить старые значения, хранящиеся для данного топика + партиции.

ПРИМЕЧАНИЕ: Это изменение может иметь последствия для exactly-once.

"Какие ошибки будет пытаться повторить коннектор?"

На данный момент внимание уделяется выявлению ошибок, которые являются временными и могут быть повторены, включая:

  • ClickHouseException — это общее исключение, которое может быть выброшено ClickHouse. Обычно оно выбрасывается, когда сервер перегружен, и следующие коды ошибок считаются особенно временными:
    • 3 - UNEXPECTED_END_OF_FILE
    • 159 - TIMEOUT_EXCEEDED
    • 164 - READONLY
    • 202 - TOO_MANY_SIMULTANEOUS_QUERIES
    • 203 - NO_FREE_CONNECTION
    • 209 - SOCKET_TIMEOUT
    • 210 - NETWORK_ERROR
    • 242 - TABLE_IS_READ_ONLY
    • 252 - TOO_MANY_PARTS
    • 285 - TOO_FEW_LIVE_REPLICAS
    • 319 - UNKNOWN_STATUS_OF_INSERT
    • 425 - SYSTEM_ERROR
    • 999 - KEEPER_EXCEPTION
    • 1002 - UNKNOWN_EXCEPTION
  • SocketTimeoutException — Это выбрасывается, когда сокет истекает по времени.
  • UnknownHostException — Это выбрасывается, когда хост не может быть разрешен.
  • IOException — Это выбрасывается, когда есть проблема с сетью.

"Все мои данные пустые/нули"

Вероятно, поля в ваших данных не соответствуют полям в таблице — это особенно распространено в случае CDC (и формата Debezium). Одним из распространенных решений является добавление трансформации flatten в конфигурацию вашего коннектора:

transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=_

Это преобразует ваши данные из вложенного JSON в упрощенный JSON (используя _ в качестве разделителя). Поля в таблице тогда будут иметь формат "field1_field2_field3" (т.е. "before_id", "after_id" и т.д.).

"Я хочу использовать свои ключи Kafka в ClickHouse"

Ключи Kafka по умолчанию не хранятся в поле значений, но вы можете использовать трансформацию KeyToValue, чтобы переместить ключ в поле значений (под новым именем поля _key):

transforms=keyToValue
transforms.keyToValue.type=com.clickhouse.kafka.connect.transforms.KeyToValue
transforms.keyToValue.field=_key