Перейти к основному содержанию
Этот движок интегрируется с экосистемой Azure Blob Storage, обеспечивая потоковый импорт данных.

CREATE TABLE

CREATE TABLE test (name String, value UInt32)
    ENGINE = AzureQueue(...)
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    ...
Параметры движка Параметры AzureQueue совпадают с параметрами, поддерживаемыми движком таблицы AzureBlobStorage. См. раздел с параметрами здесь. Как и в случае с движком таблицы AzureBlobStorage, для локальной разработки Azure Storage можно использовать эмулятор Azurite. Подробнее здесь. Пример
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS mode = 'unordered'

Настройки

Набор поддерживаемых настроек в целом такой же, как у движка таблицы S3Queue, но без префикса s3queue_. См. полный список настроек. Чтобы получить список настроек, заданных для таблицы, используйте таблицу system.azure_queue_settings. Доступно начиная с версии 24.10. Ниже приведены настройки, которые поддерживаются только в AzureQueue и не применяются к S3Queue.

after_processing_move_connection_string

Строка подключения к Azure Blob Storage для перемещения успешно обработанных файлов, если пункт назначения — другой контейнер Azure. Возможные значения:
  • String.
Значение по умолчанию: пустая строка.

after_processing_move_container

Имя контейнера, в который перемещаются успешно обработанные файлы, если пункт назначения — другой контейнер Azure. Возможные значения:
  • String.
Значение по умолчанию: пустая строка. Пример:
CREATE TABLE azure_queue_engine_table
(
    `key` UInt64,
    `data` String
)
ENGINE = AzureQueue('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'testcontainer', '*', 'CSV')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_move_connection_string = 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;',
    after_processing_move_container = 'dst-container';

SELECT из таблиц на движке таблицы AzureQueue

Запросы SELECT для таблиц AzureQueue по умолчанию запрещены. Это соответствует распространённому паттерну очереди, при котором данные считываются один раз, а затем удаляются из очереди. SELECT запрещён, чтобы предотвратить случайную потерю данных. Однако в некоторых случаях он может быть полезен. Для этого нужно установить значение настройки stream_like_engine_allow_direct_select в True. У движка AzureQueue есть специальная настройка для запросов SELECT: commit_on_select. Установите для неё значение False, чтобы сохранить данные в очереди после чтения, или True, чтобы удалить их.

Описание

SELECT не особенно полезен для потокового импорта (кроме отладки), потому что каждый файл можно импортировать только один раз. Гораздо практичнее организовать обработку в реальном времени с помощью materialized views. Для этого:
  1. С помощью движка создайте таблицу для чтения из указанного пути в S3 и рассматривайте её как поток данных.
  2. Создайте таблицу с нужной структурой.
  3. Создайте materialized view, которое преобразует данные из движка и помещает их в ранее созданную таблицу.
После подключения MATERIALIZED VIEW к движку начинается фоновый сбор данных. Пример:
CREATE TABLE azure_queue_engine_table (key UInt64, data String)
  ENGINE=AzureQueue('<endpoint>', 'CSV', 'gzip')
  SETTINGS
      mode = 'unordered';

CREATE TABLE stats (key UInt64, data String)
  ENGINE = MergeTree() ORDER BY key;

CREATE MATERIALIZED VIEW consumer TO stats
  AS SELECT key, data FROM azure_queue_engine_table;

SELECT * FROM stats ORDER BY key;

Виртуальные столбцы

  • _path — путь к файлу.
  • _file — имя файла.
Подробнее о виртуальных столбцах см. здесь.

Интроспекция

Включите логирование для таблицы с помощью настройки таблицы enable_logging_to_queue_log=1. Возможности интроспекции такие же, как у движка таблицы S3Queue, однако есть несколько важных отличий:
  1. Используйте system.azure_queue_metadata_cache для состояния очереди в памяти в версиях сервера >= 25.1. Для более старых версий используйте system.s3queue_metadata_cache (он также содержит информацию о таблицах azure).
  2. Включите system.azure_queue_log через основную конфигурацию ClickHouse, например:
  <azure_queue_log>
    <database>system</database>
    <table>azure_queue_log</table>
  </azure_queue_log>
Эта постоянная таблица содержит ту же информацию, что и system.s3queue_metadata_cache, но для обработанных и файлов, обработка которых завершилась ошибкой. Таблица имеет следующую структуру:

CREATE TABLE system.azure_queue_log
(
    `hostname` LowCardinality(String) COMMENT 'Имя хоста',
    `event_date` Date COMMENT 'Дата события записи данной строки журнала',
    `event_time` DateTime COMMENT 'Время события записи данной строки журнала',
    `database` String COMMENT 'Имя базы данных, в которой находится текущая таблица S3Queue.',
    `table` String COMMENT 'Имя таблицы S3Queue.',
    `uuid` String COMMENT 'UUID таблицы S3Queue',
    `file_name` String COMMENT 'Имя обрабатываемого файла',
    `rows_processed` UInt64 COMMENT 'Количество обработанных строк',
    `status` Enum8('Processed' = 0, 'Failed' = 1) COMMENT 'Статус обрабатываемого файла',
    `processing_start_time` Nullable(DateTime) COMMENT 'Время начала обработки файла',
    `processing_end_time` Nullable(DateTime) COMMENT 'Время окончания обработки файла',
    `exception` String COMMENT 'Сообщение об исключении, если оно возникло'
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
COMMENT 'Содержит записи журнала с информацией о файлах, обработанных движком S3Queue.'

Пример:
SELECT *
FROM system.azure_queue_log
LIMIT 1
FORMAT Vertical

Row 1:
──────
hostname:              clickhouse
event_date:            2024-12-16
event_time:            2024-12-16 13:42:47
database:              default
table:                 azure_queue_engine_table
uuid:                  1bc52858-00c0-420d-8d03-ac3f189f27c8
file_name:             test_1.csv
rows_processed:        3
status:                Processed
processing_start_time: 2024-12-16 13:42:47
processing_end_time:   2024-12-16 13:42:47
exception:

1 row in set. Elapsed: 0.002 sec.

Последнее изменение 10 июня 2026 г.