Перейти к основному содержанию
Этот движок обеспечивает интеграцию с экосистемой Amazon S3 и позволяет выполнять потоковый импорт. Этот движок похож на движки Kafka, RabbitMQ, но предоставляет возможности, характерные именно для S3. Важно понимать это примечание из оригинального PR с реализацией S3Queue: когда MATERIALIZED VIEW подключается к движку, движок таблицы S3Queue начинает собирать данные в фоновом режиме.

CREATE TABLE

CREATE TABLE s3_queue_engine_table (name String, value UInt32)
    ENGINE = S3Queue(path, [NOSIGN, | aws_access_key_id, aws_secret_access_key,] format, [compression], [headers], [extra_credentials])
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    [loading_retries = 0,]
    [processing_threads_num = 16,]
    [parallel_inserts = false,]
    [enable_logging_to_queue_log = true,]
    [last_processed_path = "",]
    [tracked_files_limit = 1000,]
    [tracked_file_ttl_sec = 0,]
    [polling_min_timeout_ms = 1000,]
    [polling_max_timeout_ms = 10000,]
    [polling_backoff_ms = 0,]
    [cleanup_interval_min_ms = 10000,]
    [cleanup_interval_max_ms = 30000,]
    [buckets = 0,]
    [list_objects_batch_size = 1000,]
    [enable_hash_ring_filtering = 0,]
    [max_processed_files_before_commit = 100,]
    [max_processed_rows_before_commit = 0,]
    [max_processed_bytes_before_commit = 0,]
    [max_processing_time_sec_before_commit = 0,]
До версии 24.7 для всех настроек, кроме mode, after_processing и keeper_path, необходимо использовать префикс s3queue_.
Параметры движка Параметры S3Queue совпадают с параметрами, которые поддерживает движок таблицы S3. См. раздел параметров здесь. Пример
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
    mode = 'unordered';
Использование именованных коллекций:
<clickhouse>
    <named_collections>
        <s3queue_conf>
            <url>'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*</url>
            <access_key_id>test<access_key_id>
            <secret_access_key>test</secret_access_key>
        </s3queue_conf>
    </named_collections>
</clickhouse>
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue(s3queue_conf, format = 'CSV', compression_method = 'gzip')
SETTINGS
    mode = 'ordered';

Настройки

Чтобы получить список настроек, заданных для таблицы, используйте таблицу system.s3_queue_settings. Доступно с версии 24.10.
Имена настроек (24.7+)Начиная с версии 24.7, настройки S3Queue можно указывать как с префиксом s3queue_, так и без него:
  • Современный синтаксис (24.7+): processing_threads_num, tracked_file_ttl_sec и т. д.
  • Устаревший синтаксис (все версии): s3queue_processing_threads_num, s3queue_tracked_file_ttl_sec и т. д.
Обе формы поддерживаются в версии 24.7 и выше. В примерах на этой странице используется современный синтаксис без префикса.

Режим

Возможные значения:
  • unordered — В режиме unordered множество уже обработанных файлов отслеживается с помощью постоянных узлов в ZooKeeper.
  • ordered — В режиме ordered файлы обрабатываются в лексикографическом порядке. Это означает, что если файл с именем ‘BBB’ был обработан в какой-то момент, а позже в бакет будет добавлен файл с именем ‘AA’, он будет проигнорирован. В ZooKeeper сохраняются только максимальное имя (в лексикографическом смысле) успешно обработанного файла и имена файлов, для которых будут выполняться повторные попытки после неудачной загрузки.
Значение по умолчанию: ordered в версиях до 24.6. Начиная с 24.6 значение по умолчанию отсутствует, и этот параметр необходимо указывать вручную. Для таблиц, созданных в более ранних версиях, для совместимости значением по умолчанию останется Ordered.

after_processing

Что делать с файлом после успешной обработки. Возможные значения:
  • keep.
  • delete.
  • move.
  • tag.
Значение по умолчанию: keep. Для move требуются дополнительные настройки. Если перемещение выполняется в пределах одного бакета, нужно указать новый префикс пути в after_processing_move_prefix. Для перемещения в другой S3 бакет требуется URI целевого бакета в after_processing_move_uri, а также учетные данные S3 в after_processing_move_access_key_id и after_processing_move_secret_access_key. Пример:
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_retries = 20,
    after_processing_move_prefix = 'dst_prefix',
    after_processing_move_uri = 'https://clickhouse-public-datasets.s3.amazonaws.com/dst-bucket',
    after_processing_move_access_key_id = 'test',
    after_processing_move_secret_access_key = 'test';
Для перемещения из одного контейнера Azure в другой требуются строка подключения Blob Storage в параметре after_processing_move_connection_string и имя контейнера в параметре after_processing_move_container. См. настройки AzureQueue. Для тегирования требуются ключ тега и значение тега, задаваемые параметрами after_processing_tag_key и after_processing_tag_value.

after_processing_retries

Количество повторных попыток для запрошенного действия после обработки, прежде чем отказаться от дальнейших попыток. Возможные значения:
  • Неотрицательное целое число.
Значение по умолчанию: 10.

after_processing_move_access_key_id

Идентификатор ключа доступа для S3 бакета, в который перемещаются успешно обработанные файлы, если пункт назначения — другой S3 бакет. Возможные значения:
  • String.
Значение по умолчанию: пустая строка.

after_processing_move_prefix

Префикс пути, в который будут перемещаться успешно обработанные файлы. Применяется в обоих случаях: при перемещении в пределах того же бакета и в другой бакет. Возможные значения:
  • String.
Значение по умолчанию: пустая строка.

after_processing_move_secret_access_key

Секретный ключ доступа для S3 бакета, в который нужно перемещать успешно обработанные файлы, если в качестве пункта назначения используется другой S3 бакет. Возможные значения:
  • String.
Значение по умолчанию: пустая строка.

after_processing_move_uri

URI S3 бакета, в который перемещаются успешно обработанные файлы, если пунктом назначения является другой S3 бакет. Возможные значения:
  • String.
Значение по умолчанию: пустая строка.

after_processing_tag_key

Ключ тега, который будет присвоен успешно обработанным файлам, если after_processing='tag'. Возможные значения:
  • String.
Значение по умолчанию: пустая строка.

after_processing_tag_value

Значение тега, которое присваивается успешно обработанным файлам, если after_processing='tag'. Возможные значения:
  • String.
Значение по умолчанию: пустая строка.

keeper_path

Путь к метаданным очереди в ZooKeeper. Если он не задан явно, ClickHouse формирует путь из s3queue_default_zookeeper_path, UUID базы данных и UUID таблицы. Абсолютные значения (начинающиеся с /) используются как есть, а относительные добавляются к настроенному префиксу. Макросы, такие как {database} или {uuid}, разворачиваются до подключения движка к ZooKeeper. Чтобы указать дополнительный кластер ZooKeeper, добавьте к значению в качестве префикса настроенное имя, например analytics_keeper:/clickhouse/queue/orders. Это имя должно существовать в <auxiliary_zookeepers>; в противном случае движок сообщит об ошибке Unknown auxiliary ZooKeeper name .... Полная строка (включая префикс) сохраняется в SHOW CREATE TABLE, чтобы оператор можно было дословно воспроизвести. Возможные значения:
  • String.
Значение по умолчанию: /.

loading_retries

Повторять попытку загрузки файла указанное количество раз. По умолчанию повторные попытки не выполняются. Возможные значения:
  • Положительное целое число.
Значение по умолчанию: 0.

processing_threads_num

Количество потоков для обработки. Применяется только в режиме Unordered. Значение по умолчанию: количество процессоров или 16.

parallel_inserts

По умолчанию processing_threads_num будет создавать один INSERT, поэтому в нескольких потоках будут только скачиваться файлы и разбираться их содержимое. Но это ограничивает параллелизм, поэтому для более высокой пропускной способности используйте parallel_inserts=true: это позволит выполнять вставку данных параллельно (но имейте в виду, что это приведёт к увеличению числа создаваемых частей данных в семействе MergeTree).
INSERT будут создаваться с учётом настроек max_process*_before_commit.
Значение по умолчанию: false.

enable_logging_to_s3queue_log

Включает запись в журнал system.s3queue_log. Значение по умолчанию: 0.

polling_min_timeout_ms

Указывает минимальное время в миллисекундах, которое ClickHouse ожидает перед следующей попыткой опроса. Возможные значения:
  • Положительное целое число.
Значение по умолчанию: 1000.

polling_max_timeout_ms

Определяет максимальное время в миллисекундах, по истечении которого ClickHouse приступает к следующей попытке опроса. Возможные значения:
  • Положительное целое число.
Значение по умолчанию: 10000.

polling_backoff_ms

Определяет дополнительное время ожидания, добавляемое к предыдущему интервалу опроса, если новых файлов не найдено. Следующий опрос выполняется через сумму предыдущего интервала и этого значения задержки либо через максимальный интервал — в зависимости от того, что меньше. Возможные значения:
  • Положительное целое число.
Значение по умолчанию: 0.

tracked_files_limit

Позволяет ограничить количество узлов ZooKeeper, если используется режим ‘unordered’; для режима ‘ordered’ не имеет эффекта. При достижении лимита самые старые обработанные файлы будут удалены из узла ZooKeeper и обработаны повторно. Возможные значения:
  • Положительное целое число.
Значение по умолчанию: 1000.

tracked_file_ttl_sec

Максимальное количество секунд, в течение которых обработанные файлы хранятся в узле ZooKeeper (по умолчанию — бессрочно) для режима ‘unordered’; для режима ‘ordered’ не применяется. По истечении указанного количества секунд файл будет повторно импортирован. Возможные значения:
  • Положительное целое число.
Значение по умолчанию: 0.

cleanup_interval_min_ms

Для режима ‘Ordered’. Определяет минимальное значение интервала повторного планирования для фоновой задачи, отвечающей за поддержание TTL отслеживаемых файлов и максимального количества отслеживаемых файлов. Значение по умолчанию: 10000.

cleanup_interval_max_ms

Для режима Ordered. Определяет максимальный интервал перепланирования фоновой задачи, отвечающей за поддержание TTL отслеживаемых файлов и максимального количества отслеживаемых файлов. Значение по умолчанию: 30000.

buckets

Для режима ‘Ordered’. Доступно начиная с версии 24.6. Если у таблицы S3Queue несколько реплик и каждая из них работает с одним и тем же каталогом метаданных в Keeper, значение buckets должно быть как минимум равно числу реплик. Если также используется настройка processing_threads, имеет смысл ещё увеличить значение buckets, так как именно она определяет фактический параллелизм обработки S3Queue.

use_persistent_processing_nodes

По умолчанию таблица S3Queue всегда использовала эфемерные узлы обработки, из-за чего могли возникать дубликаты данных, если сеанс ZooKeeper истекал до того, как S3Queue выполняла коммит обработанных файлов в ZooKeeper, но уже после начала их обработки. Эта настройка заставляет сервер исключить возможность появления дубликатов при истечении сеанса Keeper.

persistent_processing_nodes_ttl_seconds

В случае нештатного завершения работы сервера, если включён параметр use_persistent_processing_nodes, может оказаться, что узлы обработки не были удалены. Этот параметр задаёт период времени, по истечении которого такие узлы обработки можно безопасно удалить. Значение по умолчанию: 3600 (1 час).

Настройки, связанные с S3

Этот движок поддерживает все настройки, связанные с S3. Дополнительные сведения о настройках S3 см. здесь.

Ролевой доступ на основе ролей к S3

Движок таблицы S3Queue поддерживает доступ на основе ролей. Инструкции по настройке роли для доступа к вашему бакету см. в документации здесь. После настройки роли roleARN можно передать через параметр extra_credentials, как показано ниже:
CREATE TABLE s3_table
(
    ts DateTime,
    value UInt64
)
ENGINE = S3Queue(
                'https://<your_bucket>/*.csv',
                extra_credentials(role_arn = 'arn:aws:iam::111111111111:role/<your_role>')
                ,'CSV')
SETTINGS
    ...

Упорядоченный режим S3Queue

Режим обработки S3Queue позволяет хранить меньше метаданных в ZooKeeper, но имеет ограничение: файлы, добавленные позже, должны иметь алфавитно-цифровые имена, которые больше по порядку. Режим S3Queue ordered, как и unordered, поддерживает настройку (s3queue_)processing_threads_num (префикс s3queue_ необязателен), которая позволяет управлять количеством потоков, выполняющих локальную обработку файлов S3 на сервере. Для режима ordered без партиционирования ClickHouse может возобновлять перечисление объектов S3 с последнего обработанного ключа, чтобы не выполнять повторный обход всей истории префикса. В бакетизированном упорядоченном режиме точка возобновления консервативно выбирается как наименьший обработанный ключ среди всех бакетов, чтобы избежать пропуска необработанных файлов. Эта оптимизация возобновления перечисления используется только для очередей на базе S3 в режиме ordered без партиционирования (не для AzureQueue и не когда задан partitioning_mode). Кроме того, режим ordered также вводит ещё одну настройку — (s3queue_)buckets, которая означает «логические потоки». В распределённом сценарии, когда есть несколько серверов с репликами таблицы S3Queue, эта настройка определяет количество единиц обработки. Например, каждый поток обработки на каждой реплике S3Queue будет пытаться заблокировать определённый bucket для обработки, при этом каждый bucket назначается определённым файлам по хэшу имени файла. Поэтому в распределённом сценарии настоятельно рекомендуется, чтобы значение настройки (s3queue_)buckets было как минимум равно числу реплик или больше. Количество бакетов может быть больше числа реплик. Оптимальный вариант — когда значение настройки (s3queue_)buckets равно произведению number_of_replicas и (s3queue_)processing_threads_num. Использовать настройку (s3queue_)processing_threads_num не рекомендуется до версии 24.6. Настройка (s3queue_)buckets доступна начиная с версии 24.6.

SELECT из движка таблицы S3Queue

Запросы SELECT для таблиц S3Queue по умолчанию запрещены. Это соответствует распространённому паттерну очереди, при котором данные считываются один раз, а затем удаляются из очереди. SELECT запрещён, чтобы предотвратить случайную потерю данных. Однако в некоторых случаях это может быть полезно. Для этого нужно установить параметр stream_like_engine_allow_direct_select в True. У движка S3Queue есть специальный параметр для запросов SELECT: commit_on_select. Установите его в False, чтобы сохранить данные в очереди после чтения, или в True, чтобы удалить их.

Описание

SELECT не особенно полезен для потокового импорта (кроме отладки), поскольку каждый файл можно импортировать только один раз. Гораздо практичнее создавать потоки в реальном времени с помощью materialized view. Для этого:
  1. С помощью движка создайте таблицу для чтения из указанного пути в S3 и рассматривайте её как поток данных.
  2. Создайте таблицу с нужной структурой.
  3. Создайте materialized view, который преобразует данные из движка и помещает их в ранее созданную таблицу.
Когда MATERIALIZED VIEW подключается к движку, он начинает собирать данные в фоновом режиме. Пример:
  CREATE TABLE s3queue_engine_table (name String, value UInt32)
    ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
    SETTINGS
        mode = 'unordered';

  CREATE TABLE stats (name String, value UInt32)
    ENGINE = MergeTree() ORDER BY name;

  CREATE MATERIALIZED VIEW consumer TO stats
    AS SELECT name, value FROM s3queue_engine_table;

  SELECT * FROM stats ORDER BY name;

Виртуальные столбцы

  • _path — Путь к файлу.
  • _file — Имя файла.
  • _size — Размер файла.
  • _time — Время создания файла.
Подробнее о виртуальных столбцах см. здесь.

Подстановочные шаблоны в path

Аргумент path может задавать несколько файлов с помощью bash-подобных подстановочных шаблонов. Чтобы файл был обработан, он должен существовать и соответствовать шаблону пути целиком. Список файлов определяется во время SELECT (а не в момент CREATE).
  • * — Заменяет любое количество любых символов, кроме /, включая пустую строку.
  • ** — Заменяет любое количество любых символов, включая /, включая пустую строку.
  • ? — Заменяет любой отдельный символ.
  • {some_string,another_string,yet_another_one} — Заменяет любую из строк: 'some_string', 'another_string', 'yet_another_one'.
  • {N..M} — Заменяет любое число в диапазоне от N до M включительно. N и M могут содержать ведущие нули, например 000..078.
Конструкции с {} аналогичны конструкции в табличной функции remote.

Ограничения

  1. Дублирующиеся строки могут возникать в результате:
  • исключения во время парсинга в середине обработки файла, если повторные попытки включены через s3queue_loading_retries;
  • S3Queue настроен на нескольких серверах, указывающих на один и тот же path в ZooKeeper, и сеанс Keeper истекает до того, как один из серверов успевает закоммитить обработанный файл. Это может привести к тому, что другой сервер начнёт обрабатывать файл, который уже был частично или полностью обработан первым сервером. Однако начиная с версии 25.8 это больше неактуально, если use_persistent_processing_nodes = 1.
  • аварийного завершения работы сервера.
  1. Если S3Queue настроен на нескольких серверах, указывающих на один и тот же path в ZooKeeper, и используется режим Ordered, то s3queue_loading_retries не будет работать. Это скоро исправят.

Интроспекция

Для интроспекции используйте stateless-таблицу system.s3queue_metadata_cache и постоянную таблицу system.s3queue_log.
  1. system.s3queue_metadata_cache. Эта таблица не является постоянной и показывает текущее состояние S3Queue в памяти: какие файлы обрабатываются в данный момент, какие уже обработаны, а какие завершились ошибкой.
┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
CREATE TABLE system.s3queue_metadata_cache
(
    `database` String,
    `table` String,
    `file_name` String,
    `rows_processed` UInt64,
    `status` String,
    `processing_start_time` Nullable(DateTime),
    `processing_end_time` Nullable(DateTime),
    `ProfileEvents` Map(String, UInt64)
    `exception` String
)
ENGINE = SystemS3Queue
COMMENT 'Contains in-memory state of S3Queue metadata and currently processed rows per file.'
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Пример:

SELECT *
FROM system.s3queue_metadata_cache

Row 1:
──────
zookeeper_path:        /clickhouse/s3queue/25ea5621-ae8c-40c7-96d0-cec959c5ab88/3b3f66a1-9866-4c2e-ba78-b6bfa154207e
file_name:             wikistat/original/pageviews-20150501-030000.gz
rows_processed:        5068534
status:                Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time:   2023-10-13 13:10:31
ProfileEvents:         {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5068534,'SelectedBytes':198132283,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':2480,'S3QueueSetFileProcessedMicroseconds':9985,'S3QueuePullMicroseconds':273776,'LogTest':17}
exception:
  1. system.s3queue_log. Постоянная таблица. Содержит ту же информацию, что и system.s3queue_metadata_cache, но для файлов со статусами processed и failed.
Таблица имеет следующую структуру:
SHOW CREATE TABLE system.s3queue_log

Query id: 0ad619c3-0f2a-4ee4-8b40-c73d86e04314

┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
CREATE TABLE system.s3queue_log
(
    `event_date` Date,
    `event_time` DateTime,
    `table_uuid` String,
    `file_name` String,
    `rows_processed` UInt64,
    `status` Enum8('Processed' = 0, 'Failed' = 1),
    `processing_start_time` Nullable(DateTime),
    `processing_end_time` Nullable(DateTime),
    `ProfileEvents` Map(String, UInt64),
    `exception` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time) │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Чтобы использовать system.s3queue_log, задайте его конфигурацию в конфигурационном файле сервера:
    <s3queue_log>
        <database>system</database>
        <table>s3queue_log</table>
    </s3queue_log>
Пример:
SELECT *
FROM system.s3queue_log

Row 1:
──────
event_date:            2023-10-13
event_time:            2023-10-13 13:10:12
table_uuid:
file_name:             wikistat/original/pageviews-20150501-020000.gz
rows_processed:        5112621
status:                Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time:   2023-10-13 13:10:12
ProfileEvents:         {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5112621,'SelectedBytes':198577687,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':1934,'S3QueueSetFileProcessedMicroseconds':17063,'S3QueuePullMicroseconds':5841972,'LogTest':17}
exception:
Последнее изменение 10 июня 2026 г.