メインコンテンツへスキップ
このエンジンは Amazon S3 エコシステムとのインテグレーションを提供し、ストリーミングインポートを可能にします。このエンジンは KafkaRabbitMQ エンジンに似ていますが、S3 固有の機能を備えています。 S3Queue 実装の元の PR にある次の注意書きを理解しておくことが重要です。MATERIALIZED VIEW がこのエンジンに接続されると、S3Queue テーブルエンジンはバックグラウンドでデータ収集を開始します。

テーブルの作成

CREATE TABLE s3_queue_engine_table (name String, value UInt32)
    ENGINE = S3Queue(path, [NOSIGN, | aws_access_key_id, aws_secret_access_key,] format, [compression], [headers], [extra_credentials])
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    [loading_retries = 0,]
    [processing_threads_num = 16,]
    [parallel_inserts = false,]
    [enable_logging_to_queue_log = true,]
    [last_processed_path = "",]
    [tracked_files_limit = 1000,]
    [tracked_file_ttl_sec = 0,]
    [polling_min_timeout_ms = 1000,]
    [polling_max_timeout_ms = 10000,]
    [polling_backoff_ms = 0,]
    [cleanup_interval_min_ms = 10000,]
    [cleanup_interval_max_ms = 30000,]
    [buckets = 0,]
    [list_objects_batch_size = 1000,]
    [enable_hash_ring_filtering = 0,]
    [max_processed_files_before_commit = 100,]
    [max_processed_rows_before_commit = 0,]
    [max_processed_bytes_before_commit = 0,]
    [max_processing_time_sec_before_commit = 0,]
24.7 より前では、modeafter_processingkeeper_path を除くすべての設定で s3queue_ プレフィックスを使用する必要があります。
エンジンパラメータ S3Queue のパラメータは、S3 テーブルエンジンでサポートされているものと同じです。パラメータのセクションはこちらを参照してください。
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
    mode = 'unordered';
named collections を使用するには:
<clickhouse>
    <named_collections>
        <s3queue_conf>
            <url>'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*</url>
            <access_key_id>test<access_key_id>
            <secret_access_key>test</secret_access_key>
        </s3queue_conf>
    </named_collections>
</clickhouse>
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue(s3queue_conf, format = 'CSV', compression_method = 'gzip')
SETTINGS
    mode = 'ordered';

設定

テーブルに設定されている項目の一覧を取得するには、system.s3_queue_settings テーブルを使用します。24.10 から利用できます。
設定名 (24.7+)バージョン 24.7 以降では、S3Queue の設定は s3queue_ プレフィックスの有無にかかわらず指定できます。
  • 新しい構文 (24.7+): processing_threads_numtracked_file_ttl_sec など
  • 従来の構文 (全バージョン): s3queue_processing_threads_nums3queue_tracked_file_ttl_sec など
24.7 以降では、どちらの形式もサポートされています。このページの例では、プレフィックスなしの新しい構文を使用しています。

モード

設定可能な値:
  • unordered — 順不同モードでは、すでに処理されたすべてのファイルの集合が、ZooKeeper の永続ノードによって追跡されます。
  • ordered — ordered モードでは、ファイルは辞書順で処理されます。つまり、ある時点で ‘BBB’ という名前のファイルが処理されたあとに、‘AA’ という名前のファイルがバケットに追加されても、そのファイルは無視されます。ZooKeeper に保存されるのは、正常に取り込まれたファイルのうち辞書順で最大の名前と、読み込みに失敗して再試行対象となるファイルの名前だけです。
デフォルト値: 24.6 より前のバージョンでは ordered。24.6 以降はデフォルト値がなくなり、この設定は手動で指定する必要があります。以前のバージョンで作成されたテーブルでは、互換性のためデフォルト値は Ordered のままです。

after_processing

正常に処理された後のファイルの処理方法です。 設定可能な値:
  • keep。
  • delete。
  • move。
  • tag。
デフォルト値: keep move には追加の設定が必要です。同じバケット内で move する場合は、新しいパスプレフィックスを after_processing_move_prefix として指定する必要があります。 別の S3 バケットに move する場合は、対象バケットの URI を after_processing_move_uri として、S3 の認証情報を after_processing_move_access_key_id および after_processing_move_secret_access_key として指定する必要があります。 例:
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_retries = 20,
    after_processing_move_prefix = 'dst_prefix',
    after_processing_move_uri = 'https://clickhouse-public-datasets.s3.amazonaws.com/dst-bucket',
    after_processing_move_access_key_id = 'test',
    after_processing_move_secret_access_key = 'test';
ある Azure コンテナーから別の Azure コンテナーへ移動するには、after_processing_move_connection_string に Blob Storage の接続文字列、after_processing_move_container にコンテナー名を指定する必要があります。AzureQueue の設定を参照してください。 タグ付けを行うには、after_processing_tag_keyafter_processing_tag_value にタグのキーと値を指定する必要があります。

after_processing_retries

指定された処理後アクションを断念するまでの再試行回数です。 設定可能な値:
  • 0以上の整数。
デフォルト値: 10.

after_processing_move_access_key_id

宛先が別の S3 バケット の場合に、正常に処理されたファイルの移動先となる S3 バケット に使用する Access Key ID。 設定可能な値:
  • String.
デフォルト値: 空文字列。

after_processing_move_prefix

正常に処理されたファイルの移動先となるパスプレフィックスです。同一 bucket 内で移動する場合と、別の bucket に移動する場合のいずれにも適用されます。 設定可能な値:
  • String.
デフォルト値: 空文字列。

after_processing_move_secret_access_key

正常に処理されたファイルの移動先が別の S3 バケットである場合の、その移動先 S3 バケットの Secret Access Key。 設定可能な値:
  • String.
デフォルト値: 空文字列。

after_processing_move_uri

宛先が別の S3 バケットである場合に、正常に処理されたファイルの移動先となる S3 バケットの URI。 設定可能な値:
  • String.
デフォルト値: 空文字列。

after_processing_tag_key

after_processing='tag' の場合に、正常に処理されたファイルに付与するタグのキーです。 設定可能な値:
  • String.
デフォルト値: 空文字列。

after_processing_tag_value

after_processing='tag' の場合に、正常に処理されたファイルに付与するタグの値です。 設定可能な値:
  • String。
デフォルト値: 空文字列。

keeper_path

ZooKeeper 内のキューのメタデータへのパスです。明示的に指定しない場合、ClickHouse は s3queue_default_zookeeper_path、データベース UUID、およびテーブル UUID からこのパスを構築します。絶対パス (/ で始まる値) は指定どおりに使用され、相対パスは設定されたプレフィックスに追加されます。{database}{uuid} などのマクロは、エンジンが ZooKeeper に接続する前に展開されます。 補助的な ZooKeeper クラスターを対象にするには、値の先頭に設定済みの名前を付けます。たとえば analytics_keeper:/clickhouse/queue/orders のように指定します。この名前は <auxiliary_zookeepers> に存在している必要があり、存在しない場合、エンジンは Unknown auxiliary ZooKeeper name ... を報告します。完全な文字列 (プレフィックスを含む) は SHOW CREATE TABLE に保持されるため、ステートメントをそのまま複製できます。 設定可能な値:
  • String。
デフォルト値: /

loading_retries

ファイルの読み込みは、指定した回数まで再試行されます。デフォルトでは、再試行は行われません。 設定可能な値:
  • 正の整数。
デフォルト値: 0

processing_threads_num

処理に使用するスレッド数。Unordered モードでのみ適用されます。 デフォルト値: CPU 数または 16。

parallel_inserts

デフォルトでは、processing_threads_num は 1 つの INSERT しか生成しないため、複数スレッドで実行されるのはファイルのダウンロードと解析だけです。 ただし、これでは並列度が制限されるため、より高いスループットを得るには parallel_inserts=true を使用してください。これにより、データを並列に挿入できるようになります (ただし、その結果として MergeTree family で生成されるデータパーツの数が増える点に注意してください) 。
INSERTmax_process*_before_commit 設定に従って生成されます。
デフォルト値: false

enable_logging_to_s3queue_log

system.s3queue_log へのログ出力を有効にします。 デフォルト値: 0

polling_min_timeout_ms

ClickHouse が次にポーリングを試行するまでに待機する最小時間を、ミリ秒単位で指定します。 設定可能な値:
  • 正の整数。
デフォルト値: 1000.

polling_max_timeout_ms

ClickHouse が次のポーリング試行を開始するまでに待機する最大時間を、ミリ秒単位で定義します。 設定可能な値:
  • 正の整数。
デフォルト値: 10000

polling_backoff_ms

新しいファイルが見つからなかった場合に、直前のポーリング間隔に追加される待機時間を指定します。次回のポーリングは、直前の間隔とこのバックオフ値の合計、または最大間隔のいずれか小さいほうの時間が経過した後に実行されます。 設定可能な値:
  • 正の整数。
デフォルト値: 0.

tracked_files_limit

unordered モード使用時に、ZooKeeper ノード数を制限できます。ordered モードでは効果はありません。 制限に達すると、最も古い処理済みファイルが ZooKeeper ノードから削除され、再度処理されます。 設定可能な値:
  • 正の整数。
デフォルト値: 1000.

tracked_file_ttl_sec

unordered モードで、処理済みファイルを ZooKeeper ノードに保持する最大秒数です (デフォルトでは無期限に保持されます) 。ordered モードでは効果はありません。 指定した秒数が経過すると、そのファイルは再インポートされます。 設定可能な値:
  • 正の整数。
デフォルト値: 0

cleanup_interval_min_ms

Ordered モード用です。追跡対象ファイルの有効期限 (TTL) と追跡対象ファイル数の上限を管理するバックグラウンドタスクの再スケジュール間隔の最小値を定義します。 デフォルト値: 10000

cleanup_interval_max_ms

Ordered モード用です。追跡対象ファイルの有効期限 (TTL) と追跡対象ファイル集合の最大数を管理するバックグラウンドタスクの、再スケジュール間隔の上限を定義します。 デフォルト値: 30000

buckets

Ordered モード向けです。24.6 から利用できます。S3Queue テーブルに複数のレプリカがあり、それぞれが keeper 内の同じメタデータディレクトリを使って動作している場合、buckets の値は少なくともレプリカ数以上にする必要があります。processing_threads 設定も使用している場合は、S3Queue 処理の実際の並列度を決めるため、buckets 設定の値をさらに増やすのが適切です。

use_persistent_processing_nodes

デフォルトでは、S3Queue テーブルでは常に一時的な 処理ノード が使用されてきました。そのため、S3Queue が ZooKeeper に処理済みファイルをコミットする前、かつ処理開始後に ZooKeeper セッションが期限切れになると、データが重複する可能性がありました。この設定を有効にすると、Keeper セッションの期限切れ時に重複が発生する可能性をサーバー側で排除できます。

persistent_processing_nodes_ttl_seconds

サーバーが正常な手順を経ずに終了した場合、use_persistent_processing_nodes が有効になっていると、処理ノードが削除されずに残ることがあります。この設定は、それらの処理ノードを安全にクリーンアップできる期間を定義します。 デフォルト値: 3600 (1 時間) 。

S3関連の設定

このエンジンは、S3 関連の設定をすべてサポートします。S3 の設定の詳細については、こちらを参照してください。

S3 のロールベースアクセス

S3Queue テーブルエンジンは、ロールベースアクセスに対応しています。 バケットにアクセスするためのロールを設定する手順については、こちらのドキュメントを参照してください。 ロールの設定後、以下のように extra_credentials パラメータ経由で roleARN を渡せます。
CREATE TABLE s3_table
(
    ts DateTime,
    value UInt64
)
ENGINE = S3Queue(
                'https://<your_bucket>/*.csv',
                extra_credentials(role_arn = 'arn:aws:iam::111111111111:role/<your_role>')
                ,'CSV')
SETTINGS
    ...

S3Queue の ordered モード

S3Queue の処理モードでは、ZooKeeper に保存するメタデータを少なくできますが、あとから追加されるファイルは、名前が英数字順でより後になる必要があるという制約があります。 S3Queueordered モードは、unordered と同様に (s3queue_)processing_threads_num 設定 (s3queue_ プレフィックスは省略可能) をサポートしており、サーバー上で S3 ファイルをローカル処理するスレッド数を制御できます。 パーティション化なしの ordered モードでは、プレフィックス全体の履歴を再度一覧化するのを避けるため、ClickHouse は最後に処理したキーから S3 のリスト取得を再開することがあります。bucket 化された ordered モードでは、未処理ファイルの取りこぼしを防ぐため、再開位置はすべての buckets の中で最小の処理済みキーとして保守的に選ばれます。 このリスト取得再開の最適化は、パーティション化なしの ordered モードで S3 をバックエンドとする queue に対してのみ使用されます (AzureQueue や partitioning_mode が設定されている場合は対象外です) 。 さらに、ordered モードでは (s3queue_)buckets という別の設定も導入されており、これは「論理スレッド」を意味します。つまり、複数のサーバーに S3Queue テーブルのレプリカがある分散環境では、この設定が処理ユニット数を定義します。たとえば、各 S3Queue レプリカ上の各処理スレッドは、処理対象として特定の bucket のロック取得を試みます。各 bucket には、ファイル名の hash に基づいて特定のファイルが割り当てられます。したがって、分散環境では (s3queue_)buckets 設定を少なくともレプリカ数以上、できればそれより大きくすることを強く推奨します。bucket 数がレプリカ数を上回っていても問題ありません。最も理想的なのは、(s3queue_)buckets 設定を number_of_replicas(s3queue_)processing_threads_num の積に等しくすることです。 設定 (s3queue_)processing_threads_num は、バージョン 24.6 より前では使用を推奨しません。 設定 (s3queue_)buckets は、バージョン 24.6 以降で利用できます。

S3Queue テーブルエンジンからのSELECT

S3Queue テーブルでは、デフォルトで SELECT クエリが禁止されています。これは、データを一度読み取ったらキューから削除するという一般的なキューのパターンに従っているためです。SELECT が禁止されているのは、意図しないデータ損失を防ぐためです。 ただし、場合によっては有用です。これを行うには、設定 stream_like_engine_allow_direct_selectTrue に設定する必要があります。 S3Queue エンジンには、SELECT クエリ用の特別な設定 commit_on_select があります。読み取り後もキュー内のデータを保持するには False に、削除するには True に設定します。

説明

SELECT は、各ファイルをインポートできるのが一度限りであるため、ストリーミングインポートには (デバッグ用途を除き) あまり適していません。代わりに、materialized view を使ってリアルタイム処理のスレッドを作成するほうが実用的です。手順は次のとおりです。
  1. エンジンを使用して、S3 の指定したパスからデータを取り込むためのテーブルを作成し、それをデータストリームとして扱います。
  2. 必要な構造を持つテーブルを作成します。
  3. エンジンからのデータを変換し、あらかじめ作成したテーブルに格納する materialized view を作成します。
MATERIALIZED VIEW をエンジンに対して作成すると、バックグラウンドでデータの収集が開始されます。 例:
  CREATE TABLE s3queue_engine_table (name String, value UInt32)
    ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
    SETTINGS
        mode = 'unordered';

  CREATE TABLE stats (name String, value UInt32)
    ENGINE = MergeTree() ORDER BY name;

  CREATE MATERIALIZED VIEW consumer TO stats
    AS SELECT name, value FROM s3queue_engine_table;

  SELECT * FROM stats ORDER BY name;

仮想カラム

  • _path — ファイルへのパス。
  • _file — ファイル名。
  • _size — ファイルのサイズ。
  • _time — ファイルの作成時刻。
仮想カラムについて詳しくは、こちらを参照してください。

path 内のワイルドカード

path 引数では、bash 風のワイルドカードを使って複数のファイルを指定できます。処理対象のファイルは存在し、パスパターン全体に一致している必要があります。ファイルの一覧は SELECT の実行時に決定されます (CREATE 時ではありません) 。
  • * — 空文字列を含む、/ を除く任意の文字列に置き換えられます。
  • ** — 空文字列を含む、/ を含む任意の文字列に置き換えられます。
  • ? — 任意の 1 文字に置き換えられます。
  • {some_string,another_string,yet_another_one}'some_string''another_string''yet_another_one' のいずれかの文字列に置き換えられます。
  • {N..M} — N から M までの範囲内の任意の数値 (両端を含む) に置き換えられます。N と M には先頭にゼロを付けることもできます (例: 000..078) 。
{} を使う構文は、remote テーブル関数に似ています。

制限事項

  1. 重複した行は、次のような理由で発生する可能性があります。
  • ファイル処理の途中でパース中に例外が発生し、s3queue_loading_retries によって再試行が有効になっている場合。
  • S3Queue が複数のサーバーで設定されており、ZooKeeper 内の同じパスを参照している場合に、あるサーバーが処理済みファイルをコミットする前に Keeper セッションの有効期限が切れると、別のサーバーがそのファイルの処理を引き継ぐ可能性があります。その結果、そのファイルは最初のサーバーによって部分的または完全に処理済みである可能性があります。ただし、use_persistent_processing_nodes = 1 の場合、バージョン 25.8 以降ではこれは当てはまりません。
  • サーバーが異常終了した場合。
  1. S3Queue が複数のサーバーで設定され、ZooKeeper 内の同じパスを参照しており、Ordered モードを使用している場合、s3queue_loading_retries は機能しません。これはまもなく修正される予定です。

イントロスペクション

内部状態の確認には、ステートレスな system.s3queue_metadata_cache テーブルと、永続的な system.s3queue_log テーブルを使用します。
  1. system.s3queue_metadata_cache。このテーブルは永続化されず、S3Queue のインメモリ状態、つまり現在処理中のファイル、処理済みのファイル、または処理に失敗したファイルを表示します。
┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
CREATE TABLE system.s3queue_metadata_cache
(
    `database` String,
    `table` String,
    `file_name` String,
    `rows_processed` UInt64,
    `status` String,
    `processing_start_time` Nullable(DateTime),
    `processing_end_time` Nullable(DateTime),
    `ProfileEvents` Map(String, UInt64)
    `exception` String
)
ENGINE = SystemS3Queue
COMMENT 'Contains in-memory state of S3Queue metadata and currently processed rows per file.'
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
例:

SELECT *
FROM system.s3queue_metadata_cache

Row 1:
──────
zookeeper_path:        /clickhouse/s3queue/25ea5621-ae8c-40c7-96d0-cec959c5ab88/3b3f66a1-9866-4c2e-ba78-b6bfa154207e
file_name:             wikistat/original/pageviews-20150501-030000.gz
rows_processed:        5068534
status:                Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time:   2023-10-13 13:10:31
ProfileEvents:         {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5068534,'SelectedBytes':198132283,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':2480,'S3QueueSetFileProcessedMicroseconds':9985,'S3QueuePullMicroseconds':273776,'LogTest':17}
exception:
  1. system.s3queue_log。永続テーブルです。system.s3queue_metadata_cache と同じ情報を持ちますが、processed および failed ファイルに関するものです。
このテーブルは次の構造を持ちます。
SHOW CREATE TABLE system.s3queue_log

Query id: 0ad619c3-0f2a-4ee4-8b40-c73d86e04314

┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
CREATE TABLE system.s3queue_log
(
    `event_date` Date,
    `event_time` DateTime,
    `table_uuid` String,
    `file_name` String,
    `rows_processed` UInt64,
    `status` Enum8('Processed' = 0, 'Failed' = 1),
    `processing_start_time` Nullable(DateTime),
    `processing_end_time` Nullable(DateTime),
    `ProfileEvents` Map(String, UInt64),
    `exception` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time) │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
system.s3queue_logを使用するには、サーバー設定ファイルでその設定を定義してください。
    <s3queue_log>
        <database>system</database>
        <table>s3queue_log</table>
    </s3queue_log>
例:
SELECT *
FROM system.s3queue_log

Row 1:
──────
event_date:            2023-10-13
event_time:            2023-10-13 13:10:12
table_uuid:
file_name:             wikistat/original/pageviews-20150501-020000.gz
rows_processed:        5112621
status:                Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time:   2023-10-13 13:10:12
ProfileEvents:         {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5112621,'SelectedBytes':198577687,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':1934,'S3QueueSetFileProcessedMicroseconds':17063,'S3QueuePullMicroseconds':5841972,'LogTest':17}
exception:
最終更新日 2026年6月10日