Перейти к основному содержимому
Перейти к основному содержимому

Создайте сводную таблицу с материализованными представлениями для быстрой аналитики временных рядов

Этот учебник показывает, как поддерживать предварительно агрегированные сводки из таблицы событий с высоким объемом с использованием материализованных представлений. Вы создадите три объекта: необработанную таблицу, сводную таблицу и материализованное представление, которое автоматически записывает данные в сводку.

Когда использовать эту схему

Используйте эту схему, когда:

  • У вас есть только добавляемый поток событий (клики, просмотры страниц, IoT, логи).
  • Большинство запросов - это агрегации по временным диапазонам (в минуту/час/день).
  • Вы хотите постоянные чтения менее чем за секунду без повторного сканирования всех необработанных строк.

Создание таблицы необработанных событий

CREATE TABLE events_raw
(
    event_time   DateTime,
    user_id      UInt64,
    country      LowCardinality(String),
    event_type   LowCardinality(String),
    value        Float64
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id)
TTL event_time + INTERVAL 90 DAY DELETE

Примечания

  • PARTITION BY toYYYYMM(event_time) оставляет партиции маленькими и легкими для удаления.
  • ORDER BY (event_time, user_id) поддерживает временные запросы + вторичную фильтрацию.
  • LowCardinality(String) экономит память для категориальных измерений.
  • TTL очищает необработанные данные через 90 дней (настраивайте в соответствии с вашими требованиями к хранению).

Проектирование сводной (агрегированной) таблицы

Мы будем предварительно агрегировать с почасовой разрешающей способностью. Выберите зерно, соответствующее наиболее распространенному окну анализа.

CREATE TABLE events_rollup_1h
(
    bucket_start  DateTime,            -- start of the hour
    country       LowCardinality(String),
    event_type    LowCardinality(String),
    users_uniq    AggregateFunction(uniqExact, UInt64),
    value_sum     AggregateFunction(sum, Float64),
    value_avg     AggregateFunction(avg, Float64),
    events_count  AggregateFunction(count)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(bucket_start)
ORDER BY (bucket_start, country, event_type)

Мы храним состояния агрегатов (например, AggregateFunction(sum, ...)), которые компактно представляют частичные агрегаты и могут быть объединены или завершены позже.

Создание материализованного представления, которое заполняет сводку

Это материализованное представление срабатывает автоматически при вставках в events_raw и записывает состояния агрегатов в сводку.

CREATE MATERIALIZED VIEW mv_events_rollup_1h
TO events_rollup_1h
AS
SELECT
    toStartOfHour(event_time) AS bucket_start,
    country,
    event_type,
    uniqExactState(user_id)   AS users_uniq,
    sumState(value)           AS value_sum,
    avgState(value)           AS value_avg,
    countState()              AS events_count
FROM events_raw
GROUP BY bucket_start, country, event_type;

Вставка тестовых данных

Вставьте тестовые данные:

INSERT INTO events_raw VALUES
    (now() - INTERVAL 4 SECOND, 101, 'US', 'view', 1),
    (now() - INTERVAL 3 SECOND, 101, 'US', 'click', 1),
    (now() - INTERVAL 2 SECOND, 202, 'DE', 'view', 1),
    (now() - INTERVAL 1 SECOND, 101, 'US', 'view', 1);

Запрос сводки

Вы можете либо объединить состояния во время чтения, либо завершить их:

SELECT
    bucket_start,
    country,
    event_type,
    uniqExactMerge(users_uniq) AS users,
    sumMerge(value_sum)        AS value_sum,
    avgMerge(value_avg)        AS value_avg,
    countMerge(events_count)   AS events
FROM events_rollup_1h
WHERE bucket_start >= now() - INTERVAL 1 DAY
GROUP BY ALL
ORDER BY bucket_start, country, event_type;

подсказка

Если вы ожидаете, что чтения всегда будут обращаться к сводке, вы можете создать второе материализованное представление, которое записывает завершенные числа в «обычную» таблицу MergeTree с тем же зерном 1ч. Состояния предоставляют большую гибкость, в то время как завершенные числа дают немного более простые чтения.

Фильтрация по полям в первичном ключе для лучшей производительности

Вы можете использовать команду EXPLAIN, чтобы увидеть, как индекс используется для отсечения данных:

EXPLAIN indexes=1
SELECT *
FROM events_rollup_1h
WHERE bucket_start BETWEEN now() - INTERVAL 3 DAY AND now()
  AND country = 'US';
    ┌─explain────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
1.  │ Expression ((Project names + Projection))                                                                                          │
2.  │   Expression                                                                                                                       │
3.  │     ReadFromMergeTree (default.events_rollup_1h)                                                                                   │
4.  │     Indexes:                                                                                                                       │
5.  │       MinMax                                                                                                                       │
6.  │         Keys:                                                                                                                      │
7.  │           bucket_start                                                                                                             │
8.  │         Condition: and((bucket_start in (-Inf, 1758550242]), (bucket_start in [1758291042, +Inf)))                                 │
9.  │         Parts: 1/1                                                                                                                 │
10. │         Granules: 1/1                                                                                                              │
11. │       Partition                                                                                                                    │
12. │         Keys:                                                                                                                      │
13. │           toYYYYMM(bucket_start)                                                                                                   │
14. │         Condition: and((toYYYYMM(bucket_start) in (-Inf, 202509]), (toYYYYMM(bucket_start) in [202509, +Inf)))                     │
15. │         Parts: 1/1                                                                                                                 │
16. │         Granules: 1/1                                                                                                              │
17. │       PrimaryKey                                                                                                                   │
18. │         Keys:                                                                                                                      │
19. │           bucket_start                                                                                                             │
20. │           country                                                                                                                  │
21. │         Condition: and((country in ['US', 'US']), and((bucket_start in (-Inf, 1758550242]), (bucket_start in [1758291042, +Inf)))) │
22. │         Parts: 1/1                                                                                                                 │
23. │         Granules: 1/1                                                                                                              │
    └────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

План выполнения запроса выше показывает три типа индексов, которые используются: минмакс индекс, индекс партиции и первичный ключ. Каждый индекс использует поля, указанные в нашем первичном ключе: (bucket_start, country, event_type). Для наилучшей производительности фильтрации вам следует убедиться, что ваши запросы используют поля первичного ключа для отсечения данных.

Обычные изменения

  • Разные зерна: добавьте дневную сводку:
CREATE TABLE events_rollup_1d
(
    bucket_start Date,
    country      LowCardinality(String),
    event_type   LowCardinality(String),
    users_uniq   AggregateFunction(uniqExact, UInt64),
    value_sum    AggregateFunction(sum, Float64),
    value_avg    AggregateFunction(avg, Float64),
    events_count AggregateFunction(count)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(bucket_start)
ORDER BY (bucket_start, country, event_type);

Затем второе материализованное представление:

CREATE MATERIALIZED VIEW mv_events_rollup_1d
TO events_rollup_1d
AS
SELECT
    toDate(event_time) AS bucket_start,
    country,
    event_type,
    uniqExactState(user_id),
    sumState(value),
    avgState(value),
    countState()
FROM events_raw
GROUP BY ALL;
  • Сжатие: применяйте кодеки к большим колонкам (например: Codec(ZSTD(3))) в необработанной таблице.
  • Контроль затрат: перемещайте тяжелое хранение в необработанную таблицу и сохраняйте долгоживущие сводки.
  • Заполнение: при загрузке исторических данных вставляйте в events_raw и позволяйте материализованному представлению автоматически создавать сводки. Для существующих строк используйте POPULATE при создании материализованного представления, если это подходит, или INSERT SELECT.

Очистка и удержание

  • Увеличьте TTL необработанных данных (например, 30/90 дней), но сохраняйте сводки дольше (например, 1 год).
  • Вы также можете использовать TTL для перемещения старых частей в более дешевое хранилище, если включена многоуровневая система.

Устранение неполадок

  • Материализованное представление не обновляется? Проверьте, что вставки идут в events_raw (а не в сводную таблицу), и что цель материализованного представления правильная (TO events_rollup_1h).
  • Медленные запросы? Убедитесь, что они проходят через сводку (запросите таблицу сводки напрямую) и что временные фильтры соответствуют зерну сводки.
  • Неполадки с заполнением? Используйте SYSTEM FLUSH LOGS и проверяйте system.query_log / system.parts, чтобы подтвердить вставки и объединения.