Введение
ClickHouse обрабатывает запросы крайне быстро, но как эти запросы распределяются и параллелизуются между несколькими серверами?
В этом руководстве мы сначала обсудим, как ClickHouse распределяет запрос по нескольким шартам через распределенные таблицы, а затем как запрос может использовать несколько реплик для его выполнения.
Шардированная архитектура
В архитектуре с отсутствием общего хранилища кластеры обычно делятся на несколько шардов, каждый из которых содержит подмножество общих данных. Распределенная таблица находится сверху этих шардов и предоставляет единый взгляд на полные данные.
Чтения могут быть отправлены в локальную таблицу. Выполнение запроса будет происходить только на указанном шарде, или его можно отправить в распределенную таблицу, и в этом случае каждый шард выполнит указанные запросы. Сервер, на котором был выполнен запрос к распределенной таблице, агрегирует данные и отвечает клиенту:
На рисунке выше показано, что происходит, когда клиент запрашивает распределенную таблицу:
SELECT-запрос отправляется в распределенную таблицу на узел произвольно
(через стратегию round-robin или после маршрутизации на конкретный сервер
с помощью балансировщика нагрузки). Этот узел теперь будет выступать в качестве координатора.
Узел находит каждый шард, который должен выполнить запрос,
используя информацию, указанную в распределенной таблице, и запрос отправляется каждому шару.
Каждый шард читает, фильтрует и агрегирует данные локально, а затем
отправляет обратно слияющийся статус координатору.
Координирующий узел сливает данные и затем отправляет ответ обратно
клиенту.
Когда мы добавляем реплики в процесс, он довольно похож, с единственным
отличием, что только одна реплика из каждого шарда будет выполнять запрос.
Это означает, что больше запросов может быть обработано параллельно.
Не-шардированная архитектура
ClickHouse Cloud имеет совершенно другую архитектуру по сравнению с вышеописанной.
(См. "Архитектура ClickHouse Cloud"
для получения дополнительной информации). С разделением вычислений и хранения, а также с практически бесконечным объемом хранения, необходимость в шардировании становится менее важной.
На рисунке ниже показана архитектура ClickHouse Cloud:
Эта архитектура позволяет почти мгновенно добавлять и удалять реплики, обеспечивая очень высокую масштабируемость кластера. Кластер ClickHouse Keeper (показан справа) обеспечивает единую упоминание
для метаданных. Реплики могут получать метаданные из кластера ClickHouse Keeper
и все поддерживать одни и те же данные. Сами данные хранятся в
объектном хранилище, а кэш SSD позволяет ускорить запросы.
Но как мы теперь можем распределить выполнение запросов между несколькими серверами? В шардированной архитектуре это было довольно очевидно, поскольку каждый шард мог фактически выполнять запрос по подмножеству данных. Как это работает, когда нет шардирования?
Введение параллельных реплик
Чтобы параллелизовать выполнение запросов через несколько серверов, сначала нужно назначить один из наших серверов координатором. Координатор — это тот, кто создает список задач, которые необходимо выполнить, следит за их выполнением, агрегирует их и возвращает результат клиенту. Как и в большинстве распределённых систем, это будет роль узла, который получает начальный запрос. Нам также нужно определить единицу работы. В шардированной архитектуре единицей работы является шард, подмножество данных. С параллельными репликами мы будем использовать небольшую порцию таблицы, называемую гранулами,
в качестве единицы работы.
Теперь давайте посмотрим, как это работает на практике с помощью рисунка ниже:
С параллельными репликами:
Запрос от клиента отправляется на один узел после прохождения через балансировщик нагрузки. Этот узел становится координатором для этого запроса.
Узел анализирует индекс каждой части и выбирает подходящие части и
гранулы для обработки.
Координатор разбивает рабочую нагрузку на набор гранул, которые могут быть
назначены различным репликам.
Каждый набор гранул обрабатывается соответствующими репликами, и слияющийся статус отправляется координатору, когда они заканчивают.
Наконец, координатор сливает все результаты из реплик и
затем возвращает ответ клиенту.
Шаги выше описывают, как параллельные реплики работают теоретически.
Однако на практике существует много факторов, которые могут помешать такой логике
работать идеально:
Некоторые реплики могут быть недоступны.
Репликация в ClickHouse является асинхронной, и некоторые реплики могут не
иметь одних и тех же частей в определенный момент времени.
Необходимо каким-то образом справляться с задержкой между репликами.
Кэш файловой системы варьируется от реплики к реплике в зависимости от
активности каждой реплики, что означает, что случайное распределение задач может
привести к менее оптимальному выполнению с учетом локальности кэша.
Мы исследуем, как эти факторы преодолеваются в следующих разделах.
Объявления
Чтобы решить (1) и (2) из списка выше, мы ввели концепцию объявления. Давайте визуализируем, как это работает, используя рисунок ниже:

Запрос от клиента отправляется на один узел после прохождения через балансировщик нагрузки. Узел становится координатором для этого запроса.
Координирующий узел отправляет запрос на получение объявлений от
всех реплик в кластере. Реплики могут иметь немного разные
представления о текущем наборе частей для таблицы. В результате нам нужно собрать эту информацию, чтобы избежать неправильных решений о планировании.
Координирующий узел затем использует объявления для определения набора
гранул, которые могут быть назначены различным репликам. Здесь, например,
мы можем увидеть, что никакие гранулы из части 3 не были назначены реплике 2,
потому что эта реплика не предоставила эту часть в своем объявлении.
Также обратите внимание, что никакие задачи не были назначены реплике 3, потому что
реплика не предоставила объявление.
После того как каждая реплика обработала запрос на своем подмножестве гранул
и слияющийся статус был отправлен обратно координатору, координатор сливает результаты и ответ отправляется клиенту.
Динамическая координация
Чтобы решить проблему задержки, мы добавили динамическую координацию. Это означает,
что все гранулы не отправляются на реплику за один запрос, а каждая реплика
может запрашивать новую задачу (набор гранул для обработки) у
координатора. Координатор предоставит реплике набор гранул на основе
полученного объявления.
Предположим, что мы находимся на этапе процесса, когда все реплики отправили
объявление со всеми частями.
На рисунке ниже визуализируется, как работает динамическая координация:
Реплики сообщают координатору, что они могут обрабатывать
задачи, они также могут указать, сколько работы они могут обрабатывать.
Координатор назначает задачи репликам.
Реплики 1 и 2 могут очень быстро завершить свою задачу. Они
запрашивают другую задачу у координатора.
Координатор назначает новые задачи реплике 1 и 2.
Все реплики теперь завершили обработку своей задачи. Они
запрашивают больше задач.
Координатор, используя объявления, проверяет, какие задачи остаются для
обработки, но больше задач нет.
Координатор сообщает репликам, что все было обработано. Теперь он
объединит все слияющиеся состояния и ответит на запрос.
Управление локальностью кэша
Последней оставшейся потенциальной проблемой является то, как мы обрабатываем локальность кэша. Если запрос
выполняется несколько раз, как мы можем убедиться, что одна и та же задача будет направлена на
ту же реплику? В предыдущем примере у нас были назначены следующие задачи:
| Реплика 1 | Реплика 2 | Реплика 3 |
---|
Часть 1 | g1, g6, g7 | g2, g4, g5 | g3 |
Часть 2 | g1 | g2, g4, g5 | g3 |
Часть 3 | g1, g6 | g2, g4, g5 | g3 |
Чтобы обеспечить назначение одних и тех же задач тем же репликам и чтобы они могли
воспользоваться кэшем, происходит две вещи. Вычисляется хеш части + набор
гранул (задача). Применяется модуль количества реплик для назначения задач.
На бумаге это выглядит хорошо, но на практике внезапная нагрузка на одну реплику или
ухудшение сети могут ввести задержку, если одна и та же реплика
постоянно используется для выполнения определенных задач. Если max_parallel_replicas
меньше
количества реплик, то для выполнения запроса выбираются случайные реплики.
Кража задач
Если какая-то реплика обрабатывает задачи медленнее, чем другие, другие реплики попытаются
«украсть» задачи, которые в принципе принадлежат этой реплике, по хешу, чтобы сократить
задержку.
Ограничения
У этой функции есть известные ограничения, из которых основные задокументированы в
этой секции.
примечание
Если вы обнаружите проблему, которая не является одним из перечисленных ниже ограничений, и
подозреваете, что параллельная реплика является причиной, пожалуйста, откройте проблему на GitHub, используя
метку comp-parallel-replicas
.
Ограничение | Описание |
---|
Сложные запросы | В настоящее время параллельные реплики работают довольно хорошо для простых запросов. Сложные составные конструкции, такие как CTE, подзапросы, JOIN, не плоские запросы и т. д., могут негативно повлиять на производительность запроса. |
Малые запросы | Если вы выполняете запрос, который не обрабатывает много строк, его выполнение на нескольких репликах может не дать лучшего времени выполнения, учитывая, что время сети для координации между репликами может привести к дополнительным циклам в выполнении запроса. Вы можете ограничить эти проблемы, используя настройку: parallel_replicas_min_number_of_rows_per_replica . |
Параллельные реплики отключены с FINAL | |
Проекции не используются вместе с параллельными репликами | |
Данные с высокой кардинальностью и сложная агрегация | Данные с высокой кардинальностью, которые необходимо отправить, могут значительно замедлить ваши запросы. |
Совместимость с новым анализатором | Новый анализатор может значительно замедлить или ускорить выполнение запросов в специфических сценариях. |
Настройка | Описание |
---|
enable_parallel_replicas | 0 : отключено 1 : включено
2 : Принудительное использование параллельных реплик, вызовет исключение, если не используется. |
cluster_for_parallel_replicas | Имя кластера для использования для параллельной репликации; если вы используете ClickHouse Cloud, используйте default . |
max_parallel_replicas | Максимальное количество реплик, которое следует использовать для выполнения запроса на нескольких репликах; если указано число меньше чем количество реплик в кластере, узлы будут выбираться случайным образом. Это значение также может быть превышено для учета горизонтального масштабирования. |
parallel_replicas_min_number_of_rows_per_replica | Помогает ограничить количество реплик на основе количества строк, которые необходимо обработать; количество используемых реплик определяется: estimated rows to read / min_number_of_rows_per_replica . |
allow_experimental_analyzer | 0 : использовать старый анализатор 1 : использовать новый анализатор.
Поведение параллельных реплик может изменяться в зависимости от используемого анализатора. |
Исследование проблем с параллельными репликами
Вы можете проверить, какие настройки используются для каждого запроса в таблице
system.query_log
. Вы также можете
посмотреть таблицу system.events
для отображения всех событий, которые произошли на сервере, и вы можете использовать
табличную функцию clusterAllReplicas
, чтобы увидеть таблицы на всех репликах
(если вы пользователь облака, используйте default
).
SELECT
hostname(),
*
FROM clusterAllReplicas('default', system.events)
WHERE event ILIKE '%ParallelReplicas%'
Ответ
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleRequestMicroseconds │ 438 │ Time spent processing requests for marks from replicas │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasHandleAnnouncementMicroseconds │ 558 │ Time spent processing replicas announcements │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadUnassignedMarks │ 240 │ Sum across all replicas of how many unassigned marks were scheduled │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasReadAssignedForStealingMarks │ 4 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingByHashMicroseconds │ 5 │ Time spent collecting segments meant for stealing by hash │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasProcessingPartsMicroseconds │ 5 │ Time spent processing data parts │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasStealingLeftoversMicroseconds │ 3 │ Time spent collecting orphaned segments │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasUsedCount │ 2 │ Number of replicas used to execute a query with task-based parallel replicas │
│ c-crimson-vd-86-server-rdhnsx3-0 │ ParallelReplicasAvailableCount │ 6 │ Number of replicas available to execute a query with task-based parallel replicas │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleRequestMicroseconds │ 698 │ Time spent processing requests for marks from replicas │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasHandleAnnouncementMicroseconds │ 644 │ Time spent processing replicas announcements │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadUnassignedMarks │ 190 │ Sum across all replicas of how many unassigned marks were scheduled │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasReadAssignedForStealingMarks │ 54 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingByHashMicroseconds │ 8 │ Time spent collecting segments meant for stealing by hash │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasProcessingPartsMicroseconds │ 4 │ Time spent processing data parts │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasStealingLeftoversMicroseconds │ 2 │ Time spent collecting orphaned segments │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasUsedCount │ 2 │ Number of replicas used to execute a query with task-based parallel replicas │
│ c-crimson-vd-86-server-e9kp5f0-0 │ ParallelReplicasAvailableCount │ 6 │ Number of replicas available to execute a query with task-based parallel replicas │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleRequestMicroseconds │ 620 │ Time spent processing requests for marks from replicas │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasHandleAnnouncementMicroseconds │ 656 │ Time spent processing replicas announcements │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadUnassignedMarks │ 1 │ Sum across all replicas of how many unassigned marks were scheduled │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasReadAssignedForStealingMarks │ 1 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingByHashMicroseconds │ 4 │ Time spent collecting segments meant for stealing by hash │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasProcessingPartsMicroseconds │ 3 │ Time spent processing data parts │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasStealingLeftoversMicroseconds │ 1 │ Time spent collecting orphaned segments │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasUsedCount │ 2 │ Number of replicas used to execute a query with task-based parallel replicas │
│ c-crimson-vd-86-server-ybtm18n-0 │ ParallelReplicasAvailableCount │ 12 │ Number of replicas available to execute a query with task-based parallel replicas │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─hostname()───────────────────────┬─event──────────────────────────────────────────┬─value─┬─description──────────────────────────────────────────────────────────────────────────────────────────┐
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleRequestMicroseconds │ 696 │ Time spent processing requests for marks from replicas │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasHandleAnnouncementMicroseconds │ 717 │ Time spent processing replicas announcements │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadUnassignedMarks │ 2 │ Sum across all replicas of how many unassigned marks were scheduled │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasReadAssignedForStealingMarks │ 2 │ Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingByHashMicroseconds │ 10 │ Time spent collecting segments meant for stealing by hash │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasProcessingPartsMicroseconds │ 6 │ Time spent processing data parts │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasStealingLeftoversMicroseconds │ 2 │ Time spent collecting orphaned segments │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasUsedCount │ 2 │ Number of replicas used to execute a query with task-based parallel replicas │
│ c-crimson-vd-86-server-16j1ncj-0 │ ParallelReplicasAvailableCount │ 12 │ Number of replicas available to execute a query with task-based parallel replicas │
└──────────────────────────────────┴────────────────────────────────────────────────┴───────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘
Таблица system.text_log
также
содержит информацию о выполнении запросов с использованием параллельных реплик:
SELECT message
FROM clusterAllReplicas('default', system.text_log)
WHERE query_id = 'ad40c712-d25d-45c4-b1a1-a28ba8d4019c'
ORDER BY event_time_microseconds ASC
Ответ
┌─message────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ (from 54.218.178.249:59198) SELECT * FROM session_events WHERE type='type2' LIMIT 10 SETTINGS allow_experimental_parallel_reading_from_replicas=2; (stage: Complete) │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage Complete │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 to stage WithMergeableState only analyze │
│ Access granted: SELECT(clientId, sessionId, pageId, timestamp, type) ON default.session_events │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage FetchColumns to stage WithMergeableState only analyze │
│ Query SELECT __table1.clientId AS clientId, __table1.sessionId AS sessionId, __table1.pageId AS pageId, __table1.timestamp AS timestamp, __table1.type AS type FROM default.session_events AS __table1 WHERE __table1.type = 'type2' LIMIT _CAST(10, 'UInt64') SETTINGS allow_experimental_parallel_reading_from_replicas = 2 from stage WithMergeableState to stage Complete │
│ The number of replicas requested (100) is bigger than the real number available in the cluster (6). Will use the latter number to execute the query. │
│ Initial request from replica 4: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 4 replica
│
│ Reading state is fully initialized: part all_0_2_1 with ranges [(0, 182)] in replicas [4]; part all_3_3_0 with ranges [(0, 62)] in replicas [4] │
│ Sent initial requests: 1 Replicas count: 6 │
│ Initial request from replica 2: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 2 replica
│
│ Sent initial requests: 2 Replicas count: 6 │
│ Handling request from replica 4, minimal marks size is 240 │
│ Going to respond to replica 4 with 1 parts: [part all_0_2_1 with ranges [(128, 182)]]. Finish: false; mine_marks=0, stolen_by_hash=54, stolen_rest=0 │
│ Initial request from replica 1: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 1 replica
│
│ Sent initial requests: 3 Replicas count: 6 │
│ Handling request from replica 4, minimal marks size is 240 │
│ Going to respond to replica 4 with 2 parts: [part all_0_2_1 with ranges [(0, 128)], part all_3_3_0 with ranges [(0, 62)]]. Finish: false; mine_marks=0, stolen_by_hash=0, stolen_rest=190 │
│ Initial request from replica 0: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 0 replica
│
│ Sent initial requests: 4 Replicas count: 6 │
│ Initial request from replica 5: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 5 replica
│
│ Sent initial requests: 5 Replicas count: 6 │
│ Handling request from replica 2, minimal marks size is 240 │
│ Going to respond to replica 2 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0 │
│ Initial request from replica 3: 2 parts: [part all_0_2_1 with ranges [(0, 182)], part all_3_3_0 with ranges [(0, 62)]]----------
Received from 3 replica
│
│ Sent initial requests: 6 Replicas count: 6 │
│ Total rows to read: 2000000 │
│ Handling request from replica 5, minimal marks size is 240 │
│ Going to respond to replica 5 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0 │
│ Handling request from replica 0, minimal marks size is 240 │
│ Going to respond to replica 0 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0 │
│ Handling request from replica 1, minimal marks size is 240 │
│ Going to respond to replica 1 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0 │
│ Handling request from replica 3, minimal marks size is 240 │
│ Going to respond to replica 3 with 0 parts: []. Finish: true; mine_marks=0, stolen_by_hash=0, stolen_rest=0 │
│ (c-crimson-vd-86-server-rdhnsx3-0.c-crimson-vd-86-server-headless.ns-crimson-vd-86.svc.cluster.local:9000) Cancelling query because enough data has been read │
│ Read 81920 rows, 5.16 MiB in 0.013166 sec., 6222087.194288318 rows/sec., 391.63 MiB/sec. │
│ Coordination done: Statistics: replica 0 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 1 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 2 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 3 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0}; replica 4 - {requests: 3 marks: 244 assigned_to_me: 0 stolen_by_hash: 54 stolen_unassigned: 190}; replica 5 - {requests: 2 marks: 0 assigned_to_me: 0 stolen_by_hash: 0 stolen_unassigned: 0} │
│ Peak memory usage (for query): 1.81 MiB. │
│ Processed in 0.024095586 sec. │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Наконец, вы также можете использовать EXPLAIN PIPELINE
. Он подчеркивает, как ClickHouse
будет выполнять запрос и какие ресурсы будут использоваться для
выполнения запроса. Давайте возьмем следующий запрос в качестве примера:
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp)
FROM session_events
WHERE type='type3'
GROUP BY toYear(timestamp) LIMIT 10
Давайте взглянем на конвейер запроса без параллельной реплики:
EXPLAIN PIPELINE graph = 1, compact = 0
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp)
FROM session_events
WHERE type='type3'
GROUP BY toYear(timestamp)
LIMIT 10
SETTINGS allow_experimental_parallel_reading_from_replicas=0
FORMAT TSV;
А теперь с параллельной репликой:
EXPLAIN PIPELINE graph = 1, compact = 0
SELECT count(), uniq(pageId) , min(timestamp), max(timestamp)
FROM session_events
WHERE type='type3'
GROUP BY toYear(timestamp)
LIMIT 10
SETTINGS allow_experimental_parallel_reading_from_replicas=2
FORMAT TSV;