跳转到主要内容
Cloud 中的 Distributed 引擎要在 ClickHouse Cloud 中创建 Distributed 表引擎,可以使用 remoteremoteSecure 表函数。 在 ClickHouse Cloud 中不能使用 Distributed(...) 语法。
使用 Distributed 引擎的表本身不存储任何数据,而是支持在多台服务器上进行分布式查询处理。 读取会自动并行执行。读取时,如果远程服务器上存在表索引,则会使用这些索引。

创建表

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]

从表复制

Distributed 表指向当前服务器上的某个表时,你可以沿用该表的 schema:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]

Distributed 参数

参数描述
clusterserver 配置文件中的 cluster 名称
database远程数据库的名称
table远程表的名称
sharding_key (可选)分片键。
在以下情况下,必须指定 sharding_key
  • 向分布式表执行 INSERTs 时 (因为表引擎需要 sharding_key 来确定如何拆分数据) 。不过,如果启用了 insert_distributed_one_random_shard 设置,则 INSERTs 不需要分片键。
  • optimize_skip_unused_shards 一起使用时,因为需要 sharding_key 来确定应查询哪些分片
policy_name (可选)策略名称,将用于存储后台发送时的临时 File
另请参阅

Distributed 设置

设置描述默认值
fsync_after_insert在向 Distributed 执行后台 INSERT 后,对文件数据执行 fsync。保证操作系统会将发起节点磁盘上插入的全部数据刷写到文件中。false
fsync_directories对目录执行 fsync。保证操作系统在完成与 Distributed 表后台 INSERT 相关的操作后 (如插入后、将数据发送到分片后等) 刷新目录元数据。false
skip_unavailable_shards如果为 true,ClickHouse 会静默跳过不可用的分片。以下情况下,分片会被标记为不可用:1) 由于连接失败而无法访问该分片。2) 无法通过 DNS 解析该分片。3) 该分片上不存在该表。false
bytes_to_throw_insert如果后台 INSERT 的待处理压缩字节数超过此值,将抛出异常。0 - 不抛出。0
bytes_to_delay_insert如果后台 INSERT 的待处理压缩字节数超过此值,查询将被延迟。0 - 不延迟。0
max_delay_to_insert当后台发送存在大量待处理字节时,向分布式表插入数据的最大延迟时间 (秒) 。60
background_insert_batchdistributed_background_insert_batch 相同0
background_insert_split_batch_on_failuredistributed_background_insert_split_batch_on_failure 相同0
background_insert_sleep_time_msdistributed_background_insert_sleep_time_ms 相同0
background_insert_max_sleep_time_msdistributed_background_insert_max_sleep_time_ms 相同0
flush_on_detachDETACH/DROP/服务器关闭时,将数据刷写到远程节点。true
持久性设置 (fsync_...):
  • 仅影响后台 INSERT (即 distributed_foreground_insert=false) :数据首先存储在发起节点的磁盘上,之后再在后台发送到各分片。
  • 可能会显著降低 INSERT 性能
  • 影响的是将分布式表目录中的数据写入接受插入请求的节点。如果你需要保证数据写入到底层 MergeTree 表,请参见 system.merge_tree_settings 中的持久性设置 (...fsync...)
对于插入限制设置 (..._insert) ,另请参见:
示例
CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
    fsync_after_insert=0,
    fsync_directories=0;
将从 logs 集群中的所有 server 读取数据,数据来自集群中每台 server 上的 default.hits 表。数据不仅会被读取,还会尽可能在远程 server 上进行部分处理。例如,对于带有 GROUP BY 的 查询,数据会先在远程 server 上聚合,并将 aggregate function 的中间状态发送到发起请求的 server,随后再继续聚合。 你也可以用返回字符串的常量 expression 来代替 database 名称。例如:currentDatabase()

集群

集群是在服务器配置文件中配置的:
<remote_servers>
    <logs>
        <!-- 用于 Distributed 查询的服务器间集群级 secret
             默认值:无 secret(不执行身份验证)

             若已设置,则 Distributed 查询将在分片上进行验证,至少需满足:
             - 该集群须存在于分片上,
             - 该集群须具有相同的 secret。

             此外(更为重要的是),initial_user 将
             被用作该查询的当前用户。
        -->
        <!-- <secret></secret> -->
        
        <!-- 可选。是否允许对该集群执行分布式 DDL 查询(ON CLUSTER 子句)。默认值:true(允许)。-->        
        <!-- <allow_distributed_ddl_queries>true</allow_distributed_ddl_queries> -->
        
        <shard>
            <!-- 可选。写入数据时的分片权重。默认值:1。-->
            <weight>1</weight>
            <!-- 可选。分片名称。在集群内各分片中必须非空且唯一。若未指定,则为空。-->
            <name>shard_01</name>
            <!-- 可选。是否仅向其中一个副本写入数据。默认值:false(向所有副本写入数据)。-->
            <internal_replication>false</internal_replication>
            <replica>
                <!-- 可选。副本的负载均衡优先级(另请参阅 load_balancing 设置)。默认值:1(值越小,优先级越高)。-->
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <name>shard_02</name>
            <internal_replication>false</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <secure>1</secure>
                <port>9440</port>
            </replica>
        </shard>
    </logs>
</remote_servers>
这里定义了一个名为 logs 的集群,它由两个分片组成,每个分片包含两个副本。分片指的是存放不同数据部分的服务器 (要读取全部数据,必须访问所有分片) 。副本则是彼此冗余的服务器 (要读取全部数据,访问任意一个副本即可) 。 集群名称中不能包含点号。 需要为每台服务器指定 hostport 参数,以及可选的 userpasswordsecurecompressionbind_host 参数:
ParameterDescriptionDefault Value
host远程服务器的地址。可以使用域名、IPv4 地址或 IPv6 地址。如果指定的是域名,服务器会在启动时发起 DNS 请求,并在运行期间缓存结果。如果 DNS 请求失败,服务器将无法启动。如果修改了 DNS 记录,请重启服务器。-
port用于 TCP 通信的端口 (即 config 中的 tcp_port,通常设置为 9000) 。不要与 http_port 混淆。-
user用于连接远程服务器的用户名。该用户必须具有连接到指定服务器的权限。访问权限在 users.xml 文件中配置。更多信息,请参见访问权限章节。default
password用于连接远程服务器的密码 (不会被掩码处理) 。
secure是否使用安全的 SSL/TLS 连接。通常还需要同时指定端口 (默认安全端口为 9440) 。服务器应监听 <tcp_port_secure>9440</tcp_port_secure>,并配置正确的证书。false
compression使用压缩。true
bind_host从当前节点连接到远程服务器时使用的源地址。仅支持 IPv4 地址。适用于高级部署场景,即需要设置 ClickHouse 分布式查询所使用的源 IP 地址。-
指定副本时,读取每个分片的数据都会从可用副本中选取一个。你可以配置负载均衡算法 (即优先访问哪个副本) ——请参见 load_balancing 设置。如果无法与服务器建立连接,则会以较短的超时时间尝试连接。如果连接失败,就会选择下一个副本,并依次尝试所有副本。如果所有副本的连接尝试都失败了,则会按照同样的方式重复尝试若干次。这样有利于提高系统弹性,但并不能提供完整的容错能力:远程服务器可能会接受连接,但实际上无法正常工作,或者工作状态不佳。 你可以只指定一个分片 (在这种情况下,应称为 remote 查询处理,而不是 distributed) ,也可以指定任意多个分片。在每个分片中,你可以指定一个或多个副本。你也可以为每个分片指定不同数量的副本。 你可以在配置中按需指定任意多个集群。 要查看你的集群,请使用 system.clusters 表。 Distributed 引擎允许你像使用本地服务器一样使用集群。不过,集群配置不能动态指定,必须在服务器配置文件中进行配置。通常,集群中的所有服务器都会使用相同的集群配置 (虽然这不是必需的) 。配置文件中的集群会动态更新,无需重启服务器。 如果你需要每次都向一组未知的分片和副本发送查询,则无需创建 Distributed 表,而应改用 remote 表函数。请参见 表函数 部分。

写入数据

向集群写入数据有两种方式: 第一种,你可以自行指定将哪些数据写入哪些服务器,并直接在各个分片上执行写入。换句话说,就是直接对集群中 Distributed 表所指向的远程表执行 INSERT 语句。这是最灵活的方案,因为你可以采用任意分片方案,甚至可以根据具体业务需求使用较复杂的分片策略。这也是最优的方案,因为数据可以完全独立地写入不同分片。 第二种,你可以对 Distributed 表执行 INSERT 语句。此时,表会自行将插入的数据分发到各台服务器。要向 Distributed 表写入数据,必须配置 sharding_key 参数 (只有一个分片时除外) 。 每个分片都可以在配置文件中定义一个 <weight>。默认值为 1。数据会按照分片权重的比例分布到各个分片。系统会先将所有分片的权重求和,再用每个分片的权重除以总权重,以确定该分片所占的比例。例如,如果有两个分片,第一个权重为 1,第二个权重为 2,那么第一个将接收三分之一 (1 / 3) 的插入行,第二个将接收三分之二 (2 / 3) 。 每个分片都可以在配置文件中定义 internal_replication 参数。如果该参数设置为 true,写操作会选择第一个健康的副本并向其写入数据。如果 Distributed 表底层的表是复制表 (例如任意一种 Replicated*MergeTree 表引擎) ,请使用此设置。某个表副本会接收写入,然后数据会自动复制到其他副本。 如果 internal_replication 设置为 false (默认值) ,数据会写入所有副本。在这种情况下,由 Distributed 表自行复制数据。这比使用复制表更差,因为系统不会检查副本之间的一致性,随着时间推移,它们会包含略有差异的数据。 要确定一行数据会被发送到哪个分片,系统会分析分片表达式,并将其结果对所有分片的总权重取余。然后,该行会被发送到与余数所在半开区间对应的分片,该区间范围为 prev_weightsprev_weights + weight,其中 prev_weights 是编号更小的分片的总权重,weight 是当前分片的权重。例如,如果有两个分片,第一个权重为 9,第二个权重为 10,那么余数落在区间 [0, 9) 时,该行会被发送到第一个分片;余数落在区间 [9, 19) 时,则会被发送到第二个分片。 分片表达式可以是任何由常量和表列组成且返回整数的表达式。例如,你可以使用表达式 rand() 来随机分布数据,或者使用 UserID 按用户 ID 取余后的结果进行分布 (这样单个用户的数据会位于同一个分片上,从而简化按用户执行 INJOIN) 。如果某一列的分布不够均匀,可以再套一层哈希函数,例如 intHash64(UserID) 简单的除法取余只是一种有限的分片方案,并不总是合适。它适用于中等和较大规模的数据量 (几十台服务器) ,但不适用于超大规模的数据量 (数百台或更多服务器) 。对于后一种情况,应采用业务领域所需的分片方案,而不是依赖 Distributed 表中的条目。 在以下情况下,你应当重点考虑分片方案:
  • 使用了需要按特定键对数据进行连接的查询 (INJOIN) 。如果数据是按该键分片的,就可以使用本地 INJOIN,而不是 GLOBAL INGLOBAL JOIN,效率会高得多。
  • 使用了大量服务器 (数百台或更多) ,并且存在大量小查询,例如针对单个客户数据的查询 (如网站、广告主或合作伙伴) 。为了避免这些小查询影响整个集群,将单个客户的数据放在单个分片上会更合理。或者,也可以采用两级分片:将整个集群划分为多个“层”,每一层可以包含多个分片。单个客户的数据位于某一层内,但可以根据需要向该层添加分片,数据会在这些分片中随机分布。每一层都创建各自的 Distributed 表,同时再创建一个共享的分布式表用于全局查询。
数据在后台写入。插入到表中时,数据块只是写入本地文件系统。随后,数据会尽快在后台发送到远程服务器。数据发送的周期由 distributed_background_insert_sleep_time_msdistributed_background_insert_max_sleep_time_ms 设置控制。Distributed 引擎会分别发送每个包含已插入数据的文件,但你也可以通过 distributed_background_insert_batch 设置启用文件批次发送。该设置可以更充分地利用本地服务器和网络资源,从而提升集群性能。你应通过检查表目录中等待发送的数据文件列表,来确认数据是否已成功发送:/var/lib/clickhouse/data/database/table/。执行后台任务的线程数可通过 background_distributed_schedule_pool_size 设置。 如果在向 Distributed 表执行 INSERT 后,服务器宕机或发生异常重启 (例如由于硬件故障) ,则已插入的数据可能会丢失。如果在表目录中检测到损坏的数据分区片段,它会被移至 broken 子目录,并且不再使用。

读取数据

查询 Distributed 表时,SELECT 查询会发送到所有分片,因此无论数据在各分片之间如何分布 (甚至可以是完全随机分布) ,都能正常工作。添加新分片时,无需将旧数据迁移到新分片中。相反,可以通过为其设置更高权重来写入新数据——这样数据分布会略有不均,但查询仍能正确且高效地执行。 启用 max_parallel_replicas 选项后,查询处理会在单个分片内的所有副本上并行进行。更多信息,请参见 max_parallel_replicas 一节。 要进一步了解分布式 inglobal in 查询的处理方式,请参阅此文档

虚拟列

_Shard_num

_shard_num — 包含表 system.clusters 中的 shard_num 值。类型:UInt32
由于 remote 和 [cluster](../../../sql-reference/table-functions/cluster.md) 表函数在内部会创建临时 Distributed 表,因此 _shard_num` 在这些函数中也可用。
另请参见
最后修改于 2026年6月10日