跳转到主要内容
如果你需要任何帮助,请在 repository 中提交 issue,或在 ClickHouse public Slack 中提出问题。
ClickHouse Kafka Connect Sink 是一个 Kafka 连接器,用于将数据从 Kafka topic 传输到 ClickHouse 表。

许可证

Kafka Connector Sink 根据 Apache 2.0 许可证 进行分发

环境要求

环境中应已安装 v2.7 或更高版本的 Kafka Connect 框架。

版本兼容性矩阵

ClickHouse Kafka Connect 版本ClickHouse 版本Kafka ConnectConfluent Platform
1.0.0> 23.3> 2.7> 6.1

主要特性

  • 开箱即用,具备精确一次语义。其底层依托 ClickHouse 一项名为 KeeperMap 的全新核心特性 (由连接器用作状态存储) ,从而实现极简架构。
  • 支持 3rd-party 状态存储:当前默认使用内存,也可使用 KeeperMap (即将支持 Redis) 。
  • 核心集成:由 ClickHouse 构建、维护并提供支持。
  • 持续针对 ClickHouse Cloud 进行测试。
  • 支持按声明的 schema 和无 schema 方式插入数据。
  • 支持 ClickHouse 的所有数据类型。

安装说明

准备连接信息

要通过 HTTP(S) 连接到 ClickHouse,你需要以下信息:
Parameter(s)Description
HOST and PORT通常,使用 TLS 时端口为 8443;不使用 TLS 时端口为 8123。
DATABASE NAME默认情况下,存在一个名为 default 的数据库。请使用你要连接的数据库名称。
USERNAME and PASSWORD默认情况下,用户名为 default。请根据你的使用场景使用相应的用户名。
你的 ClickHouse Cloud 服务的连接信息可在 ClickHouse Cloud 控制台中查看。 选择一个服务,然后点击 Connect 选择 HTTPS。连接信息会显示在示例 curl 命令中。 如果你使用的是自管理 ClickHouse,则连接信息由你的 ClickHouse 管理员配置。

常规安装说明

该 连接器以单个 JAR 文件形式分发,其中包含运行该插件所需的所有类文件。 要安装该插件,请按以下步骤操作:
  • 从 ClickHouse Kafka Connect Sink repository 的 Releases 页面下载包含连接器 JAR 文件的 ZIP 压缩包。
  • 解压 ZIP 文件,并将其内容复制到所需位置。
  • 在 Connect 属性文件的 plugin.path 配置中添加插件目录路径,以便 Confluent Platform 能够找到该插件。
  • 在配置中提供 topic 名称、ClickHouse 实例的 hostname 和密码。
connector.class=com.clickhouse.kafka.connect.ClickHouseSinkConnector
tasks.max=1
topics=<topic_name>
ssl=true
jdbcConnectionProperties=?sslmode=STRICT
security.protocol=SSL
hostname=<hostname>
database=<database_name>
password=<password>
ssl.truststore.location=/tmp/kafka.client.truststore.jks
port=8443
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
exactlyOnce=true
username=default
schemas.enable=false
  • 重启 Confluent Platform。
  • 如果您使用 Confluent Platform,请登录 Confluent Control Center UI,确认 ClickHouse Sink 是否出现在可用连接器列表中。

配置选项

要将 ClickHouse Sink 连接到 ClickHouse server,您需要提供:
  • 连接信息:hostname (必填) 和端口 (可选)
  • 用户凭据:密码 (必填) 和用户名 (可选)
  • 连接器类:com.clickhouse.kafka.connect.ClickHouseSinkConnector (必填)
  • topics 或 topics.regex:要轮询的 Kafka topic——topic 名称必须与表名一致 (必填)
  • key 和 value 转换器:根据 topic 中的数据类型进行设置。如果尚未在工作线程配置中定义,则为必填。
完整的配置选项表如下:
属性名称说明默认值
hostname (必填)服务器的主机名或 IP 地址不适用
portClickHouse 端口:默认值为 8443 (用于 Cloud 中的 HTTPS) ;但对于 HTTP (self-hosted 的默认方式) ,应为 81238443
ssl启用与 ClickHouse 的 SSL 连接true
jdbcConnectionProperties连接到 ClickHouse 时使用的连接属性。必须以 ? 开头,并用 & 连接各个 param=value""
usernameClickHouse 数据库用户名default
password (必填)ClickHouse 数据库密码N/A
databaseClickHouse 数据库名称default
connector.class (必填)Connector 类 (需显式设置并保持默认值)"com.clickhouse.kafka.connect.ClickHouseSinkConnector"
tasks.maxConnector 任务数量"1"
errors.retry.timeoutKafka Connect 最大重试时长 (毫秒) 。0 表示不重试。-1 表示无限重试。建议值大于 “10000” ms (10 秒) 超时"0"
exactlyOnce启用 Exactly Once"false"
topics (必填)要轮询的 Kafka topic——topic 名称必须与表名一致""
key.converter (必填* - 见说明)根据键的类型进行设置。如果要传递键 (且未在工作线程配置中定义) ,则此项为必填。"org.apache.kafka.connect.storage.StringConverter"
value.converter (必填* - 参见说明)根据 topic 中的数据类型进行设置。支持:JSON、String、Avro 或 Protobuf 格式。如果未在工作线程配置中定义,则此项为必填。"org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable连接器值转换器的 schema 支持"false"
errors.tolerance连接器错误容忍度。支持:none、all"none"
errors.deadletterqueue.topic.name如果已设置 (且 errors.tolerance=all) ,失败的批次将发送到 DLQ (参见 故障排查)""
errors.deadletterqueue.context.headers.enable为 DLQ 添加额外请求头""
clickhouseSettings以逗号分隔的 ClickHouse 设置列表 (例如 “insert_quorum=2 等”)""
topic2TableMap以逗号分隔的列表,用于将 topic 名称映射到表名 (例如 “topic1=table1, topic2=table2 等”)""
tableRefreshInterval刷新表定义缓存的时间 (单位:秒)0
keeperOnCluster允许为自托管实例配置 ON CLUSTER 参数 (例如 ON CLUSTER clusterNameInConfigFileDefinition) ,以用于恰好一次 connect_state 表 (参见 分布式 DDL 查询""
bypassRowBinary允许对基于 schema 的数据 (Avro、Protobuf 等) 禁用 RowBinary 和 RowBinaryWithDefaults;仅应在数据存在缺失列,且无法接受 Nullable/Default 时使用"false"
dateTimeFormats用于解析 DateTime64 schema 字段的日期时间格式,多个格式之间以 ; 分隔 (例如 someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss) 。""
tolerateStateMismatch允许 connector 丢弃比 AFTER_PROCESSING 中存储的当前 offset 更“早”的记录 (例如,发送的是 offset 5,而上次记录的 offset 是 250) 。此选项应用于修复故障后的摄取问题,完成后应恢复为 "false""false"
ignorePartitionsWhenBatching收集待插入的消息时将忽略分区 (但仅在 exactlyOncefalse 时) 。性能说明:connector task 越多,分配给每个 task 的 Kafka partition 就越少——这可能会导致收益递减。"false"
bufferCount (自 v1.3.6 起)在刷新到 ClickHouse 之前,先在内存中缓冲的记录数。0 表示禁用内部缓冲。exactlyOnce=true 时不支持缓冲。"0"
bufferFlushTime (自 v1.3.6 版本起)exactlyOnce=false 时,执行 flush 前缓冲记录的最长时间 (毫秒) 。0 表示禁用基于时间的 flush。默认值为 0。仅在使用基于时间阈值时才需要。仅当 bufferCount > 0 时才生效。"0"
reportInsertedOffsets (自 v1.3.6 起)启用后,当 exactlyOnce=false 时,preCommit 将仅返回成功插入的偏移量,而非 currentOffsets。但在 ignorePartitionsWhenBatching=true 时,此设置不适用,仍会返回 currentOffsets"false"

目标表

ClickHouse Connect Sink 会从 Kafka topic 中读取消息,并将其写入相应的表。ClickHouse Connect Sink 仅会将数据写入现有表。请确保在开始向 ClickHouse 中插入数据之前,已在其中创建好具有适当 schema 的目标表。 每个 topic 都需要在 ClickHouse 中对应一个专用的目标表。目标表名称必须与源 topic 名称一致。

预处理

如果您需要在将出站消息发送到 ClickHouse Kafka Connect Sink 之前先进行转换,请使用 Kafka Connect Transformations

支持的数据类型

已声明 schema 时:
Kafka Connect TypeClickHouse 类型支持基本类型
STRINGString
STRINGJSON。见下文 (1)
INT8Int8
INT16Int16
INT32Int32
INT64Int64
FLOAT32Float32
FLOAT64Float64
BOOLEANBoolean
ARRAYArray(T)
MAPMap(Primitive, T)
STRUCTVariant(T1, T2, …)
STRUCTTuple(a T1, b T2, …)
STRUCTNested(a T1, b T2, …)
STRUCTJSON。见下文 (1), (2)
BYTESString
org.apache.kafka.connect.data.TimeInt64 / DateTime64
org.apache.kafka.connect.data.TimestampInt32 / Date32
org.apache.kafka.connect.data.DecimalDecimal
  • (1) - 仅当 ClickHouse 设置中启用了 input_format_binary_read_json_as_string=1 时,才支持 JSON。这仅适用于 RowBinary 格式家族,并且该设置会影响 insert 请求中的所有列,因此这些列都应为字符串。在这种情况下,连接器 会将 STRUCT 转换为 JSON 字符串。
  • (2) - 当 struct 包含 oneof 这类 union 时,应将 converter 配置为不要为 field 名称添加 prefix/suffix。ProtobufConverter 提供了 generate.index.for.unions=false 设置
未声明 schema 时: 记录会被转换为 JSON,并作为 JSONEachRow 格式中的值发送到 ClickHouse。

配置范例

以下是一些常见的配置范例,帮助你快速上手。

基础版配置

这是帮助你快速上手的最基础配置——假设你以分布式模式运行 Kafka Connect,且在 localhost:8443 上运行了启用 SSL 的 ClickHouse server,数据采用无 schema 的 JSON 格式。
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "1",
    "consumer.override.max.poll.records": "5000",
    "consumer.override.max.partition.fetch.bytes": "5242880",
    "database": "default",
    "errors.retry.timeout": "60000",
    "exactlyOnce": "false",
    "hostname": "localhost",
    "port": "8443",
    "ssl": "true",
    "jdbcConnectionProperties": "?ssl=true&sslmode=strict",
    "username": "default",
    "password": "<PASSWORD>",
    "topics": "<TOPIC_NAME>",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "clickhouseSettings": ""
  }
}
上述连接器配置要求你在工作线程配置中通过 connector.client.config.override.policy=All 启用客户端覆盖。更多信息请参阅 Kafka Connect 文档

多个 topic 的基本配置

该连接器可以从多个 topic 中消费数据
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "topics": "SAMPLE_TOPIC, ANOTHER_TOPIC, YET_ANOTHER_TOPIC",
    ...
  }
}

含 DLQ 的基本配置

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "<DLQ_TOPIC>",
    "errors.deadletterqueue.context.headers.enable": "true",
  }
}

配合不同数据格式使用

支持 Avro schema
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "<SCHEMA_REGISTRY_HOST>:<PORT>",
    "value.converter.schemas.enable": "true",
  }
}
Avro 类型映射
下面的类型映射由 io.confluent.connect.avro.AvroConverter 定义,它是 Kafka Connect 官方的 Avro 序列化器/反序列化器实现。有关转换逻辑的更多高级信息,请参阅 Kafka Connect 文档 ✅:支持 ❌:不支持 ️⚠️:部分支持
Avro 类型Kafka Connect 类型支持情况备注
nullN/A不支持作为独立类型,但可用于联合类型
booleanBOOLEAN
intINT8/INT16/INT32默认为 INT32。如果 schema 具有属性 connect.type=int8,则解析为 INT8 (同理,如果 connect.type=int16,则解析为 INT16)
longINT64
floatFLOAT32
doubleFLOAT64
bytesBYTES
stringSTRING
recordSTRUCT
enumSTRING
arrayARRAY/MAP默认为 ARRAY。如果该字段最初是通过 AvroData.fromConnectSchema 构造的,则解析为 MAP (source)
mapMAP
unionSTRUCT/<T>⚠️默认为 STRUCT。如果 flatten.singleton.unions=true,则解析为联合类型定义中的单例类型 T (参见 文档)
fixedBYTES⚠️不支持 fixed decimal 逻辑类型 (见下文)
有关 Kafka Connect 类型与 ClickHouse 类型之间的映射,请参阅支持的数据类型
不支持的 Avro schema
以下 Avro schema 不受该连接器支持:
  • 固定长度的 decimal 逻辑类型
{"name": "decimal_18_4", "type": "fixed", "size": 8, "logicalType": "decimal", "precision": 18, "scale": 4}
  • Nullable 联合类型
{"name": "mixed_union", "type": ["null", "string", "int"], "default": null}
  • 记录中的联合类型
{
  "name": "record_union",
  "type": [
    {
      "type": "record",
      "name": "TypeA",
      "fields": [
        {
          "name": "label",
          "type": "string"
        }
      ]
    },
    {
      "type": "record",
      "name": "TypeB",
      "fields": [
        {
          "name": "count",
          "type": "int"
        }
      ]
    }
  ]
}
支持 Protobuf schema
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "<SCHEMA_REGISTRY_HOST>:<PORT>",
    "value.converter.schemas.enable": "true",
  }
}
请注意:如果你遇到类缺失的问题,并非所有环境都自带 protobuf 转换器,你可能需要使用包含依赖项的其他 jar 发布版本。
Protobuf 类型映射
下方的类型映射由 io.confluent.connect.protobuf.ProtobufConverter 定义,它是 Kafka Connect 官方的 Protobuf 序列化/反序列化实现。有关转换逻辑的更多高级信息,请参阅 Kafka Connect 文档 ✅:支持 ❌:不支持 ️⚠️:部分支持
Protobuf 类型Kafka Connect 类型支持情况说明
doubleFLOAT64
floatFLOAT32
int32INT8/INT16/INT32默认为 INT32。如果 schema 中设置了选项 connect.type=int8,则解析为 INT8 (INT16 同理,对应 connect.type=int16)
sint32INT8/INT16/INT32默认为 INT32。如果 schema 中设置了选项 connect.type=int8,则解析为 INT8 (INT16 同理,对应 connect.type=int16)
sfixed32INT8/INT16/INT32默认为 INT32。如果 schema 中设置了选项 connect.type=int8,则解析为 INT8 (INT16 同理,对应 connect.type=int16)
uint32INT64
fixed32INT64
int64INT64
uint64INT64
sint64INT64
fixed64INT64
sfixed64INT64
boolBOOLEAN
stringSTRING
bytesBYTES
enumINT32/STRING默认为 STRING。如果 int.for.enums=true,则解析为 INT32 (参见 Schema Registry 文档)
messageSTRUCT⚠️请参见下方“不支持的 schema”部分
repeated T (where T is not a map entry)ARRAY
map<K, V>MAP
oneofSTRUCT⚠️请参见下方关于将 oneof 转换为 ClickHouse schema 的部分
google.protobuf.DoubleValueFLOAT64
google.protobuf.FloatValueFLOAT32
google.protobuf.Int64ValueINT64
google.protobuf.UInt64ValueINT64
google.protobuf.UInt32ValueINT64
google.protobuf.Int32ValueINT32
google.protobuf.BoolValueBOOLEAN
google.protobuf.StringValueSTRING
google.protobuf.BytesValueBYTES
google.protobuf.Timestamporg.apache.kafka.connect.data.Timestamp
google.type.Dateorg.apache.kafka.connect.data.Date
google.type.TimeOfDayorg.apache.kafka.connect.data.Time
有关 Kafka Connect 类型与 ClickHouse 类型之间的映射,请参阅支持的数据类型
关于将 oneof 字段映射为 ClickHouse 列的说明
该连接器不支持将 Protobuf 联合类型 (oneof) 映射为 ClickHouse 的 Variant 类型。请改为在 ClickHouse 表 schema 中,将 oneof 字段列为单独的可空字段。 例如:
syntax = "proto3";

package com.clickhouse.kafka.connect.proto.test;

message StringIntUnion {
  oneof mixed {
    string mixed_string = 2;
    int32 mixed_int = 3;
  }
}

会被转换为以下 ClickHouse 表定义:
CREATE TABLE IF NOT EXISTS `StringIntUnion`
(
    mixed_string Nullable(String),
    mixed_int Nullable(Int32)
) ENGINE = ...;
不支持的 Protobuf schema
以下 Protobuf schema 不受连接器支持:
  • 多消息联合类型 (在 CH 版本 26.1 之前)
syntax = "proto3";

package com.clickhouse.kafka.connect.proto.test;

message TwoRecords {
  oneof payload {
    TypeA type_a = 2;
    TypeB type_b = 3;
  }

  // 在 ClickHouse 中转换为 Nullable(Tuple(label String)),不受支持
  message TypeA {
    string label = 1;
  }

  // 在 ClickHouse 中转换为 Nullable(Tuple(count Int32)),不受支持
  message TypeB {
    int32 count = 1;
  }
}
从 CH 26.1 版本起,在 allow_experimental_nullable_tuple_type=1 时支持此 schema (参见此文档页面) 。
支持 JSON schema
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  }
}
支持 String
该连接器支持在不同的 ClickHouse 格式中使用 String 转换器:JSONCSVTSV
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "customInsertFormat": "true",
    "insertFormat": "CSV"
  }
}

内部缓冲

内部缓冲允许 sink 任务累计多个 poll() 调用返回的记录,并将其作为更大的批次刷新到 ClickHouse。在每次轮询都会为各个分区产生大量小批次的工作负载中,这可以提升吞吐量。 关键行为:
  • bufferCount 控制刷新前缓冲的记录数。
  • bufferFlushTime 设置刷新缓冲记录前的最长等待时间 (以毫秒为单位) 。
  • bufferFlushTime 仅在 bufferCount > 0 时生效。
  • bufferCount=0bufferFlushTime=0 会使缓冲保持禁用状态 (默认行为) 。
  • exactlyOnce=true 时,不支持缓冲。
为什么缓冲与 exactly-once 模式不兼容: 缓冲会改变批次边界,从而破坏 ClickHouse 的块去重机制以及连接器 的 offset 状态机。 要解决此问题,可以在连接器 配置中使用 exactlyOnce=false 禁用 exactly-once 模式,或者使用 bufferCount=0 禁用缓冲。 示例:
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "exactlyOnce": "false",
    "bufferCount": "5000",
    "bufferFlushTime": "2000"
  }
}

日志

Kafka Connect Platform 会自动提供日志功能。 日志目标端和格式可通过 Kafka Connect 配置文件进行配置。 如果使用 Confluent Platform,可以通过运行 CLI 命令来查看日志:
confluent local services connect log
更多详情,请参阅官方教程

监控

ClickHouse Kafka Connect 通过 Java Management Extensions (JMX) 上报运行时指标。Kafka Connector 默认启用 JMX。

ClickHouse 特有指标

该连接器通过以下 MBean 名称公开自定义指标:
com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask{id}
指标名称类型描述
receivedRecordslong接收到的记录总数。
recordProcessingTimelong对记录进行分组并将其转换为统一结构所花费的总时间 (以纳秒为单位) 。
taskProcessingTimelong处理数据并将其插入 ClickHouse 所花费的总时间 (以纳秒为单位) 。

Kafka 生产者/消费者指标

该连接器会暴露标准的 Kafka 生产者和消费者指标,用于洞察数据流、吞吐量和性能。 Topic 级指标:
  • records-sent-total:发送到 topic 的记录总数
  • bytes-sent-total:发送到 topic 的总字节数
  • record-send-rate:每秒发送记录的平均速率
  • byte-rate:每秒发送字节的平均速率
  • compression-rate:实现的压缩率
分区级指标:
  • records-sent-total:发送到分区的记录总数
  • bytes-sent-total:发送到分区的总字节数
  • records-lag:分区当前的滞后量
  • records-lead:分区当前的超前量
  • replica-fetch-lag:副本的滞后信息
节点级连接指标:
  • connection-creation-total:与 Kafka 节点建立的连接总数
  • connection-close-total:关闭的连接总数
  • request-total:发送到节点的请求总数
  • response-total:从节点接收的响应总数
  • request-rate:每秒请求的平均速率
  • response-rate:每秒响应的平均速率
这些指标有助于监控:
  • 吞吐量:跟踪数据摄取速率
  • 滞后:识别瓶颈和处理延迟
  • 压缩:衡量数据压缩效率
  • 连接健康状况:监控网络连通性和稳定性

Kafka Connect Framework 指标

该连接器与 Kafka Connect Framework 集成,并公开用于跟踪任务生命周期和错误的指标。 任务状态指标:
  • task-count:连接器中的任务总数
  • running-task-count:当前正在运行的任务数量
  • paused-task-count:当前已暂停的任务数量
  • failed-task-count:已失败的任务数量
  • destroyed-task-count:已销毁的任务数量
  • unassigned-task-count:未分配的任务数量
任务状态值包括:runningpausedfaileddestroyedunassigned 错误指标:
  • deadletterqueue-produce-failures:DLQ 写入失败次数
  • deadletterqueue-produce-requests:DLQ 写入尝试总次数
  • last-error-timestamp:最近一次错误的时间戳
  • records-skip-total:因错误而跳过的记录总数
  • records-retry-total:重试的记录总数
  • errors-total:发生的错误总数
性能指标:
  • offset-commit-failures:偏移量 提交失败次数
  • offset-commit-avg-time-ms:偏移量 提交的平均耗时
  • offset-commit-max-time-ms:偏移量 提交的最长耗时
  • put-batch-avg-time-ms:处理一个批次的平均耗时
  • put-batch-max-time-ms:处理一个批次的最长耗时
  • source-record-poll-total:轮询到的记录总数

监控最佳实践

  1. 监控消费者滞后:按分区跟踪 records-lag,以识别处理瓶颈
  2. 跟踪错误率:关注 errors-totalrecords-skip-total,以发现数据质量问题
  3. 关注任务健康状态:监控任务状态指标,确保任务正常运行
  4. 衡量吞吐量:使用 records-send-ratebyte-rate 跟踪摄取性能
  5. 监控连接状态:检查节点级连接指标,以发现网络问题
  6. 跟踪压缩效率:使用 compression-rate 优化数据传输
有关详细的 JMX 指标定义和 Prometheus 集成,请参阅配置文件 jmx-export-connector.yml

限制

  • 不支持删除。
  • 批次大小继承自 Kafka 消费者属性。
  • 使用 KeeperMap 实现 exactly-once 时,如果偏移量发生变化或被回退,需要删除 KeeperMap 中该特定 topic 的内容。 (更多详情请参阅下方的故障排查指南)

性能调优与吞吐量优化

本节介绍 ClickHouse Kafka Connect Sink 的性能调优策略。在高吞吐量场景下,或需要优化资源利用率并尽量减少滞后时,性能调优至关重要。

何时需要进行性能调优?

通常在以下场景中需要进行性能调优:
  • 高吞吐量工作负载:当需要从 Kafka topic 中以每秒数百万条的速度处理事件时
  • 滞后:当连接器无法跟上数据生成速度,导致滞后持续增加时
  • 资源受限:当你需要优化 CPU、内存或网络资源的使用时
  • 多个 topic:当需要同时消费多个高流量 topic 时
  • 消息较小:当需要处理大量小消息,而这些消息适合通过服务端批处理来提升效率时
在以下情况下,通常不需要进行性能调优:
  • 处理的数据量较低或中等 (< 10,000 条消息/秒)
  • 对你的使用场景来说,消费者滞后稳定且可接受
  • 默认连接器设置已经满足你的吞吐量要求
  • 你的 ClickHouse 集群可以轻松处理传入负载

理解数据流

在开始调优之前,先了解数据如何流经连接器非常重要:
  1. Kafka Connect Framework 在后台从 Kafka topic 拉取消息
  2. 连接器轮询 框架内部 buffer 中的消息
  3. 连接器分批 根据轮询大小将消息组成批次
  4. ClickHouse 接收 通过 HTTP/S 发送的批次 insert
  5. ClickHouse 处理 该 insert (同步或异步)
这些阶段中的每一个环节都可以进行性能优化。

Kafka Connect 批次大小调优

第一步优化是控制连接器每个批次从 Kafka 接收的数据量。
拉取设置
Kafka Connect (即该框架) 会在后台从 Kafka topic 拉取消息,这一过程独立于 连接器:
  • fetch.min.bytes:框架将值传给 连接器 之前所需的最小数据量 (默认值:1 字节)
  • fetch.max.bytes:单个请求可拉取的最大数据量 (默认值:52428800 / 50 MB)
  • fetch.max.wait.ms:如果未达到 fetch.min.bytes,返回数据前最长等待的时间 (默认值:500 毫秒)

在 Confluent Cloud 上,如需调整这些设置,需要通过 Confluent Cloud 提交支持工单。
轮询设置
连接器 会从框架的 buffer 中轮询消息:
  • max.poll.records:单次轮询返回的最大记录数 (默认值:500)
  • max.partition.fetch.bytes:每个分区可拉取的最大数据量 (默认值:1048576 / 1 MB)

在 Confluent Cloud 上,调整这些设置需要通过 Confluent Cloud 提交支持工单。
为获得 ClickHouse 的最佳性能,建议尽量使用更大的批次:
# 增加每次轮询的记录数
consumer.override.max.poll.records=5000

# 增加分区拉取大小(5 MB)
consumer.override.max.partition.fetch.bytes=5242880

# 可选:增加最小拉取大小以等待更多数据(1 MB)
consumer.override.fetch.min.bytes=1048576

# 可选:若对延迟敏感,可减少等待时间
consumer.override.fetch.max.wait.ms=300
上述属性要求你在 worker 配置中通过 connector.client.config.override.policy=All 启用客户端配置覆盖。更多信息请参阅 Kafka Connect 文档
重要:Kafka Connect 的拉取设置对应的是压缩后的数据,而 ClickHouse 接收的是未压缩数据。请根据压缩率权衡这些设置。 权衡取舍
  • 更大的批次 = 更好的 ClickHouse 摄取性能、更少的 parts、更低的开销
  • 更大的批次 = 更高的内存使用量,以及端到端延迟可能增加
  • 批次过大 = 可能会导致 timeout、OutOfMemory 错误,或超出 max.poll.interval.ms
更多详情:Confluent 文档 | Kafka 文档

异步插入

当连接器发送的批次较小时,或者你希望将批处理职责交给 ClickHouse 以进一步优化摄取时,异步插入是一项非常实用的功能。
何时使用异步插入
在以下情况下,可考虑启用异步插入:
  • 大量小批次:你的连接器会频繁发送小批次 (每批少于 1000 行)
  • 高并发:多个连接器任务同时向同一张表写入数据
  • 分布式部署:在不同主机上运行多个连接器实例
  • parts 创建开销:你遇到了“parts 过多”错误
  • 混合工作负载:同时处理实时摄取和查询工作负载
在以下情况下不要使用异步插入:
  • 你已经以可控频率发送大批次 (每批超过 10,000 行)
  • 你需要数据立即可见 (查询必须立刻看到数据)
  • 使用 wait_for_async_insert=0 时,精确一次语义与你的需求冲突
  • 你的使用场景更适合通过客户端侧批处理优化来改进
异步插入的工作原理
启用异步插入后,ClickHouse 会:
  1. 从连接器接收插入查询
  2. 将数据写入内存缓冲区 (而不是立即写入磁盘)
  3. 向连接器返回成功 (如果 wait_for_async_insert=0)
  4. 在满足以下任一条件时,将缓冲区刷写到磁盘:
    • 缓冲区达到 async_insert_max_data_size (默认值:100 MB)
    • 自首次插入起已过去 async_insert_busy_timeout_ms 毫秒 (默认值:1000 ms)
    • 累积查询数达到上限 (async_insert_max_query_number,默认值:100)
这会显著减少创建的 parts 数量,并提高整体吞吐量。
启用异步插入
将异步插入相关设置添加到 clickhouseSettings 配置参数中:
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
  }
}
关键设置
  • async_insert=1:启用异步插入
  • wait_for_async_insert=1 (推荐) :连接器会等数据刷写到 ClickHouse 存储后再确认。可提供交付保障。
  • wait_for_async_insert=0:连接器在数据缓冲后立即确认。性能更好,但如果服务器在刷写前崩溃,数据可能会丢失。
调优 异步插入 行为
您可以微调 异步插入 的刷新行为:
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1,async_insert_max_data_size=104857600,async_insert_busy_timeout_ms=1000"
常见调优参数:
  • async_insert_max_data_size (默认值:104857600 / 100 MB) :触发 flush 前的最大缓冲区大小
  • async_insert_busy_timeout_ms (默认值:1000) :触发 flush 前的最长等待时间 (毫秒)
  • async_insert_stale_timeout_ms (默认值:0) :距离上次 insert 后触发 flush 的时间 (毫秒)
  • async_insert_max_query_number (默认值:100) :触发 flush 前的最大查询数
权衡
  • 优点:parts 更少、merge 性能更好、CPU 开销更低,并且在高并发下吞吐量更高
  • 注意事项:数据无法立即被查询到,端到端延迟会略有增加
  • 风险:如果 wait_for_async_insert=0,server 崩溃时可能导致数据丢失;缓冲区过大时可能带来内存压力
具有精确一次语义的异步插入
当在异步插入中使用 exactlyOnce=true 时:
{
  "config": {
    "exactlyOnce": "true",
    "clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
  }
}
重要:务必将 wait_for_async_insert=1 与 exactly-once 配合使用,以确保只有在数据持久化后才会提交偏移量。 有关异步插入的更多信息,请参阅 ClickHouse 异步插入文档

连接器并行度

提高并行度以提升吞吐量:
每个连接器的任务数
"tasks.max": "4"
每个任务处理一部分 topic 分区。任务越多,并行度越高,但:
  • 有效任务数的上限 = topic 分区数
  • 每个任务都会维护自己与 ClickHouse 的连接
  • 任务越多,开销越高,还可能导致资源争用
建议:先将 tasks.max 设为与 topic 分区数相同,然后再根据 CPU 和吞吐量指标进行调整。
批处理时忽略分区
默认情况下,连接器会按分区对消息进行批处理。若要提高吞吐量,可以跨分区进行批处理:
"ignorePartitionsWhenBatching": "true"
** 警告**:仅在 exactlyOnce=false 时使用。此设置可通过创建更大的批次来提高吞吐量,但会失去分区内的顺序保证。

多个高吞吐量 topic

如果你的 连接器 配置为订阅多个 topic,使用 topic2TableMap 将 topic 映射到表,并且在插入时遇到瓶颈,导致消费滞后,则建议改为每个 topic 单独创建一个 连接器。 出现这种情况的主要原因是,目前批次会串行插入到各个表中。 建议:对于多个高流量 topic,建议为每个 topic 部署一个独立的 连接器 实例,以最大化并行插入吞吐量。

ClickHouse 表引擎注意事项

根据你的使用场景选择合适的 ClickHouse 表引擎:
  • MergeTree:最适合大多数场景,兼顾查询和 insert 性能
  • ReplicatedMergeTree:高可用性所必需,但会增加复制开销
  • *MergeTree 配合适当的 ORDER BY:针对你的查询模式进行优化
需要考虑的设置
CREATE TABLE my_table (...)
ENGINE = MergeTree()
ORDER BY (timestamp, id)
SETTINGS 
    -- 增加最大插入线程数以实现并行分片写入
    max_insert_threads = 4,
    -- 允许使用 quorum 写入以提高可靠性(ReplicatedMergeTree)
    insert_quorum = 2
对于 connector 级别的 insert 设置:
"clickhouseSettings": "insert_quorum=2,insert_quorum_timeout=60000"

连接池与超时

连接器 会维护与 ClickHouse 之间的 HTTP 连接。对于高延迟网络,请调整超时设置:
"clickhouseSettings": "socket_timeout=300000,connection_timeout=30000"
  • socket_timeout (默认值:30000 毫秒) :读取操作的最长等待时间
  • connection_timeout (默认值:10000 毫秒) :建立连接的最长等待时间
如果处理大批次时遇到超时错误,请适当增大这些值。

监控与性能问题排查

监控以下关键指标:
  1. 滞后:使用 Kafka 监控工具跟踪各分区的滞后
  2. 连接器指标:通过 JMX 监控 receivedRecordsrecordProcessingTimetaskProcessingTime (参见 Monitoring)
  3. ClickHouse 指标
    • system.asynchronous_inserts:监控异步插入缓冲区的使用情况
    • system.parts:监控 parts 数量,以发现合并问题
    • system.merges:监控正在进行的合并
    • system.events:跟踪 InsertedRowsInsertedBytesFailedInsertQuery
常见性能问题
症状可能原因解决方案
消费者滞后较高批次过小增加 max.poll.records,启用异步插入
“parts 过多”错误小批量频繁插入启用异步插入,增大批次大小
超时错误批次过大、网络较慢减小批次大小,增加 socket_timeout,检查网络
CPU 使用率高小 parts 过多启用异步插入,调大合并相关设置
OutOfMemory 错误批次过大减小 max.poll.recordsmax.partition.fetch.bytes
任务负载不均分区分布不均重新均衡分区或调整 tasks.max

最佳实践总结

  1. 先使用默认配置,然后根据实际性能进行测量和调优
  2. 尽量使用更大的批次:如有可能,目标是每次插入 10,000-100,000 行
  3. 在发送大量小批次或高并发场景下,使用异步插入
  4. 使用精确一次语义时,始终启用 wait_for_async_insert=1
  5. 水平扩展:将 tasks.max 提高到最多与分区数相同
  6. 每个高流量 topic 使用一个连接器,以获得最大吞吐量
  7. 持续监控:跟踪滞后、part 数量和合并活动
  8. 充分测试:在生产部署前,务必在接近真实的负载下测试配置更改

示例:高吞吐量配置

下面是一个针对高吞吐量场景优化的完整示例:
{
  "name": "clickhouse-high-throughput",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "8",
    
    "topics": "high_volume_topic",
    "hostname": "my-clickhouse-host.cloud",
    "port": "8443",
    "database": "default",
    "username": "default",
    "password": "<PASSWORD>",
    "ssl": "true",
    
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    
    "exactlyOnce": "false",
    "ignorePartitionsWhenBatching": "true",
    
    "consumer.override.max.poll.records": "10000",
    "consumer.override.max.partition.fetch.bytes": "5242880",
    "consumer.override.fetch.min.bytes": "1048576",
    "consumer.override.fetch.max.wait.ms": "500",
    
    "clickhouseSettings": "async_insert=1,wait_for_async_insert=1,async_insert_max_data_size=16777216,async_insert_busy_timeout_ms=1000,socket_timeout=300000"
  }
}
上述连接器配置要求你在 worker 配置中通过 connector.client.config.override.policy=All 启用客户端覆盖。更多信息请参阅 Kafka Connect 文档
此配置
  • 每次轮询最多处理 10,000 条记录
  • 跨分区合并批次,以实现更大规模的插入
  • 使用 16 MB 缓冲区的异步插入
  • 运行 8 个并行任务 (与分区数保持一致)
  • 针对吞吐量进行了优化,而非严格顺序

故障排查

”topic [someTopic] 的分区 [0] 状态不一致”

当 KeeperMap 中存储的 偏移量 与 Kafka 中存储的 偏移量 不一致时,就会出现这种情况,通常是因为某个 topic 已被删除, 或者 偏移量 被手动调整了。 要解决此问题,你需要删除为该 topic + 分区存储的旧值:
-- 首先,确认用于存储数据的数据库。
SELECT * FROM [database].connect_state

-- 找到与该 topic 和分区匹配的键。
ALTER TABLE [database].connect_state DELETE WHERE key = [keyname]
此调整可能会对 exactly-once 语义产生影响。

“连接器会重试哪些错误?”

目前重点是识别那些具有暂时性、可进行重试的错误,包括:
  • ClickHouseException - 这是 ClickHouse 可能抛出的通用异常。 它通常会在 server 过载时抛出,以下错误代码尤其被视为暂时性错误:
    • 3 - UNEXPECTED_END_OF_FILE
    • 107 - FILE_DOESNT_EXIST
    • 159 - TIMEOUT_EXCEEDED
    • 164 - READONLY
    • 202 - TOO_MANY_SIMULTANEOUS_QUERIES
    • 203 - NO_FREE_CONNECTION
    • 209 - SOCKET_TIMEOUT
    • 210 - NETWORK_ERROR
    • 241 - MEMORY_LIMIT_EXCEEDED
    • 242 - TABLE_IS_READ_ONLY
    • 252 - TOO_MANY_PARTS
    • 285 - TOO_FEW_LIVE_REPLICAS
    • 319 - UNKNOWN_STATUS_OF_INSERT
    • 425 - SYSTEM_ERROR
    • 999 - KEEPER_EXCEPTION
  • SocketTimeoutException - 当套接字超时时会抛出此异常。
  • UnknownHostException - 当主机无法解析时会抛出此异常。
  • IOException - 当网络出现问题时会抛出此异常。

“我的所有数据都是空的/全是 0”

很可能是因为数据中的字段与表中的字段不匹配——这种情况在 CDC (变更数据捕获) 和 Debezium 格式中尤其常见。 一种常见的解决方法是,在连接器配置中添加 flatten 转换:
transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=_
这会将数据从嵌套 JSON 转换为扁平 JSON (使用 _ 作为分隔符) 。这样一来,表中的字段就会采用 “field1_field2_field3” 这种格式 (即 “before_id”、“after_id” 等) 。

“我想在 ClickHouse 中使用 Kafka 键”

默认情况下,Kafka 键不会存储在 value 字段中,但你可以使用 KeyToValue 转换将键移到 value 字段中 (作为新的 _key 字段) :
transforms=keyToValue
transforms.keyToValue.type=com.clickhouse.kafka.connect.transforms.KeyToValue
transforms.keyToValue.field=_key
最后修改于 2026年6月10日