メインコンテンツへスキップ
Executable および ExecutablePool テーブルエンジンでは、ユーザーが定義したスクリプト (stdout に行を書き出す) によって行が生成されるテーブルを定義できます。実行可能スクリプトは users_scripts ディレクトリに保存され、任意のデータソースからデータを読み取ることができます。
  • Executable テーブル: スクリプトはクエリごとに実行されます
  • ExecutablePool テーブル: 永続的なプロセスのプールを維持し、読み取り時にそのプールからプロセスを取得します
必要に応じて、1 つ以上の入力クエリを含めることもできます。これらの結果は、スクリプトが読み取れるよう stdin にストリーミングされます。

Executable テーブルの作成

Executable テーブルエンジンには、スクリプト名と入力データのフォーマットという2つのパラメータが必要です。必要に応じて、1つ以上の入力クエリを渡すこともできます。
Executable(script_name, format, [input_query...])
以下は、Executable テーブルに関連する設定です。
  • send_chunk_header
    • 説明: 処理用の chunk を送信する前に、各 chunk の行数を送信します。この設定を使うと、一部のリソースを事前に確保できるため、スクリプトをより効率的に作成できます
    • デフォルト値: false
  • command_termination_timeout
    • 説明: コマンド終了のタイムアウト (秒)
    • デフォルト値: 10
  • command_read_timeout
    • 説明: コマンドの stdout からデータを読み込む際のタイムアウト (ミリ秒)
    • デフォルト値: 10000
  • command_write_timeout
    • 説明: コマンドの stdin にデータを書き込む際のタイムアウト (ミリ秒)
    • デフォルト値: 10000
例を見てみましょう。以下の Python スクリプトは my_script.py という名前で、user_scripts フォルダーに保存されています。このスクリプトは数値 i を読み取り、ランダムな文字列を i 個出力します。各文字列の前には、タブで区切られた数値が付きます。
#!/usr/bin/python3

import sys
import string
import random

def main():

    # 入力値を読み込む
    for number in sys.stdin:
        i = int(number)

        # ランダムな行をいくつか生成する
        for id in range(0, i):
            letters = string.ascii_letters
            random_string =  ''.join(random.choices(letters ,k=10))
            print(str(id) + '\t' + random_string + '\n', end='')

        # 結果を標準出力にフラッシュする
        sys.stdout.flush()

if __name__ == "__main__":
    main()
以下の my_executable_tablemy_script.py の出力に基づいて構築されており、my_executable_table に対して SELECT を実行するたびに、ランダムな文字列を 10 個生成します。
CREATE TABLE my_executable_table (
   x UInt32,
   y String
)
ENGINE = Executable('my_script.py', TabSeparated, (SELECT 10))
テーブルの作成は即座に完了し、この時点ではスクリプトは呼び出されません。my_executable_table に対してクエリを実行すると、スクリプトが呼び出されます。
SELECT * FROM my_executable_table
┌─x─┬─y──────────┐
│ 0 │ BsnKBsNGNH │
│ 1 │ mgHfBCUrWM │
│ 2 │ iDQAVhlygr │
│ 3 │ uNGwDuXyCk │
│ 4 │ GcFdQWvoLB │
│ 5 │ UkciuuOTVO │
│ 6 │ HoKeCdHkbs │
│ 7 │ xRvySxqAcR │
│ 8 │ LKbXPHpyDI │
│ 9 │ zxogHTzEVV │
└───┴────────────┘

クエリ結果をスクリプトに渡す

Hacker News のユーザーはコメントを投稿します。Python には自然言語処理ツールキット (nltk) があり、コメントがポジティブ・ネガティブ・ニュートラルのいずれかを判定する SentimentIntensityAnalyzer が含まれています。さらに、-1 (非常にネガティブなコメント) から 1 (非常にポジティブなコメント) までの値を割り当てることもできます。nltk を使って Hacker News のコメントのセンチメントを計算する Executable テーブルを作成してみましょう。 この例では、こちらで説明している hackernews テーブルを使用します。hackernews テーブルには、UInt64 型の id カラムと、comment という名前の String カラムが含まれています。まずは Executable テーブルを定義します。
CREATE TABLE sentiment (
   id UInt64,
   sentiment Float32
)
ENGINE = Executable(
    'sentiment.py',
    TabSeparated,
    (SELECT id, comment FROM hackernews WHERE id > 0 AND comment != '' LIMIT 20)
);
sentiment テーブルについて、いくつか補足します。
  • sentiment.py ファイルは user_scripts フォルダ (user_scripts_path 設定のデフォルトフォルダ) に保存されます
  • TabSeparated フォーマットは、Python スクリプトがタブ区切りの値を含む生データの行を生成する必要があることを意味します
  • このクエリでは hackernews から 2 つのカラムを選択します。Python スクリプトでは、入力される行からそれらのカラムの値をパースする必要があります
以下は sentiment.py の定義です。
#!/usr/local/bin/python3.9

import sys
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer

def main():
    sentiment_analyzer = SentimentIntensityAnalyzer()

    while True:
        try:
            row = sys.stdin.readline()
            if row == '':
                break

            split_line = row.split("\t")

            id = str(split_line[0])
            comment = split_line[1]

            score = sentiment_analyzer.polarity_scores(comment)['compound']
            print(id + '\t' + str(score) + '\n', end='')
            sys.stdout.flush()
        except BaseException as x:
            break

if __name__ == "__main__":
    main()
Python スクリプトについて、いくつか補足します。
  • これを動作させるには、nltk.downloader.download('vader_lexicon') を実行する必要があります。これをスクリプト内に書くこともできますが、そうすると sentiment テーブルに対してクエリが実行されるたびにダウンロードされてしまい、非効率です
  • row の各値は、SELECT id, comment FROM hackernews WHERE id > 0 AND comment != '' LIMIT 20 の結果セット内の1行に対応します
  • 入力される行はタブ区切りなので、Python の split 関数を使って idcomment を取り出します
  • polarity_scores の結果は、いくつかの値を含む JSON オブジェクトです。ここでは、この JSON オブジェクトから compound の値だけを取り出すことにしました
  • ClickHouse の sentiment テーブルは TabSeparated フォーマットを使用し、2つのカラムを含んでいることを思い出してください。そのため、print 関数ではそれらのカラムをタブで区切って出力しています
sentiment テーブルから行を選択するクエリを記述するたびに、SELECT id, comment FROM hackernews WHERE id > 0 AND comment != '' LIMIT 20 クエリが実行され、その結果が sentiment.py に渡されます。では、試してみましょう。
SELECT *
FROM sentiment
レスポンスは次のようになります。
┌───────id─┬─sentiment─┐
│  7398199 │    0.4404 │
│ 21640317 │    0.1779 │
│ 21462000 │         0 │
│ 25168863 │         0 │
│ 25168978 │   -0.1531 │
│ 25169359 │         0 │
│ 25169394 │   -0.9231 │
│ 25169766 │    0.4137 │
│ 25172570 │    0.7469 │
│ 25173687 │    0.6249 │
│ 28291534 │         0 │
│ 28291669 │   -0.4767 │
│ 28291731 │         0 │
│ 28291949 │   -0.4767 │
│ 28292004 │    0.3612 │
│ 28292050 │    -0.296 │
│ 28292322 │         0 │
│ 28295172 │    0.7717 │
│ 28295288 │    0.4404 │
│ 21465723 │   -0.6956 │
└──────────┴───────────┘

ExecutablePool テーブルの作成

ExecutablePool の構文は Executable と似ていますが、ExecutablePool テーブルに固有の設定がいくつかあります。
  • pool_size
    • 説明: プロセスプールのサイズ。サイズが 0 の場合、サイズの制限はありません
    • デフォルト値: 16
  • max_command_execution_time
    • 説明: コマンドの最大実行時間 (秒)
    • デフォルト値: 10
上記の sentiment テーブルは、Executable の代わりに ExecutablePool を使うよう簡単に変換できます。
CREATE TABLE sentiment_pooled (
   id UInt64,
   sentiment Float32
)
ENGINE = ExecutablePool(
    'sentiment.py',
    TabSeparated,
    (SELECT id, comment FROM hackernews WHERE id > 0 AND comment != '' LIMIT 20000)
)
SETTINGS
    pool_size = 4;
クライアントがsentiment_pooledテーブルにクエリを実行すると、ClickHouse は必要に応じて 4 つのプロセスを維持します。
最終更新日 2026年6月10日