跳转到主要内容

使用 ClickHouse Connect 插入数据:高级用法

InsertContexts

ClickHouse Connect 的所有插入操作都在 InsertContext 中执行。InsertContext 包含传递给客户端 insert 方法的所有参数值。此外,在初次构造 InsertContext 时,ClickHouse Connect 还会获取插入列的数据类型,以便高效地进行 Native format 插入。复用同一个 InsertContext 执行多次插入时,就可以避免这类“预查询”,从而让插入更快、更高效。 可以使用客户端的 create_insert_context 方法获取 InsertContext。该方法接受的参数与 insert 函数相同。请注意,复用 InsertContext 时,只应修改其 data 属性。这也符合它的设计目的:为向同一张表重复插入新数据提供一个可复用的对象。
test_data = [[1, 'v1', 'v2'], [2, 'v3', 'v4']]
ic = test_client.create_insert_context(table='test_table', data='test_data')
client.insert(context=ic)
assert client.command('SELECT count() FROM test_table') == 2
new_data = [[3, 'v5', 'v6'], [4, 'v7', 'v8']]
ic.data = new_data
client.insert(context=ic)
qr = test_client.query('SELECT * FROM test_table ORDER BY key DESC')
assert qr.row_count == 4
assert qr[0][0] == 4
InsertContexts 包含会在 insert 过程中更新的可变状态,因此不具备线程安全性。

写入格式

目前,仅有少数类型实现了写入格式。在大多数情况下,ClickHouse Connect 会通过检查某列第一个 (非 NULL) 数据值的类型,尝试自动判断该列应使用的正确写入格式。例如,如果向 DateTime 列插入数据,而该列的第一个插入值是 Python 整数,ClickHouse Connect 会直接插入该整数值,并假定它实际上表示纪元秒。 大多数情况下,无需覆盖某种数据类型的写入格式;但如果需要,也可以使用 clickhouse_connect.datatypes.format 包中的相关方法在全局级别进行设置。

写入格式选项

ClickHouse 类型原生 Python 类型写入格式注释
Int[8-64], UInt[8-32]int-
UInt64int
[U]Int[128,256]int
BFloat16float
Float32float
Float64float
Decimaldecimal.Decimal
Stringstring
FixedStringbytesstring如果以字符串形式插入,额外的字节会被补零
Enum[8,16]string
Datedatetime.dateintClickHouse 将 Date 存储为自 01/01/1970 起的天数。int 类型将被视为该“纪元日期”值
Date32datetime.dateint与 Date 相同,但支持更宽的日期范围
DateTimedatetime.datetimeintClickHouse 将 DateTime 存储为纪元秒。int 类型将被视为该“纪元秒”值
DateTime64datetime.datetimeintPython datetime.datetime 的精度仅到微秒。可使用原始 64 位 int 值
Timedatetime.timedeltaint, string, timeClickHouse 将 DateTime 存储为纪元秒。int 类型将被视为该“纪元秒”值
Time64datetime.timedeltaint, string, timePython datetime.timedelta 的精度仅到微秒。可使用原始 64 位 int 值
IPv4ipaddress.IPv4Addressstring格式正确的字符串可作为 IPv4 地址插入
IPv6ipaddress.IPv6Addressstring格式正确的字符串可作为 IPv6 地址插入
Tupledict or tuple
Mapdict
NestedSequence[dict]
UUIDuuid.UUIDstring格式正确的字符串可作为 ClickHouse UUID 插入
JSON/Object(‘json’)dictstring字典或 JSON 字符串都可以插入 JSON 列中 (注意:Object('json') 已弃用)
Variantobject当前所有 Variant 都会以 String 形式插入,并由 ClickHouse server 解析
Dynamicobject警告 — 当前插入到 Dynamic 列中的任何内容都会以 ClickHouse String 的形式持久化

专用插入方法

ClickHouse Connect 为常见数据格式提供了专用的插入方法:
  • insert_df — 插入 Pandas DataFrame。此方法的第二个参数需要传入 df 参数,且该参数必须是 Pandas DataFrame 实例,而不是 Python 的 Sequence of Sequences 类型 data 参数。ClickHouse Connect 会自动将 DataFrame 作为列式数据源处理,因此不需要 column_oriented 参数,也不提供该参数。
  • insert_arrow — 插入 PyArrow Table。ClickHouse Connect 会将 Arrow 表原样传递给 ClickHouse server 处理,因此除了 tablearrow_table 之外,只有 databasesettings 参数可用。
  • insert_df_arrow — 插入基于 Arrow 的 Pandas DataFrame 或 Polars DataFrame。ClickHouse Connect 会自动判断该 DataFrame 是 Pandas 类型还是 Polars 类型。如果是 Pandas,则会进行校验,以确保每一列的 dtype backend 都基于 Arrow;如果有任意一列不满足条件,则会引发错误。
NumPy array 是合法的 Sequence of Sequences,因此可作为主 insert 方法的 data 参数使用,无需专用方法。

插入 Pandas DataFrame

import clickhouse_connect
import pandas as pd

client = clickhouse_connect.get_client()

df = pd.DataFrame({
    "id": [1, 2, 3],
    "name": ["Alice", "Bob", "Joe"],
    "age": [25, 30, 28],
})

client.insert_df("users", df)

PyArrow Table 插入操作

import clickhouse_connect
import pyarrow as pa

client = clickhouse_connect.get_client()

arrow_table = pa.table({
    "id": [1, 2, 3],
    "name": ["Alice", "Bob", "Joe"],
    "age": [25, 30, 28],
})

client.insert_arrow("users", arrow_table)

基于 Arrow 的 DataFrame 插入 (pandas 2.x)

import clickhouse_connect
import pandas as pd

client = clickhouse_connect.get_client()

# 转换为 Arrow 支持的数据类型以提升性能
df = pd.DataFrame({
    "id": [1, 2, 3],
    "name": ["Alice", "Bob", "Joe"],
    "age": [25, 30, 28],
}).convert_dtypes(dtype_backend="pyarrow")

client.insert_df_arrow("users", df)

时区

将 Python datetime.datetime 对象插入 ClickHouse 的 DateTimeDateTime64 列时,ClickHouse Connect 会自动处理时区信息。由于 ClickHouse 在内部将所有 DateTime 值存储为不带时区信息的 Unix 时间戳 (即自纪元以来的秒或小数秒) ,因此插入时的时区转换会由客户端自动完成。

带时区信息的 datetime 对象

如果插入带时区信息的 Python datetime.datetime 对象,ClickHouse Connect 会自动调用 .timestamp() 将其转换为 Unix 时间戳,从而正确处理时区偏移。这意味着你可以插入来自任何时区的 datetime 对象,并且它们都会以对应的 UTC 时间戳被正确存储。
import clickhouse_connect
from datetime import datetime
import pytz

client = clickhouse_connect.get_client()
client.command("CREATE TABLE events (event_time DateTime) ENGINE Memory")

# 插入带时区的 datetime 对象
denver_tz = pytz.timezone('America/Denver')
tokyo_tz = pytz.timezone('Asia/Tokyo')

data = [
    [datetime(2023, 6, 15, 10, 30, 0, tzinfo=pytz.UTC)],
    [denver_tz.localize(datetime(2023, 6, 15, 10, 30, 0))],
    [tokyo_tz.localize(datetime(2023, 6, 15, 10, 30, 0))]
]

client.insert('events', data, column_names=['event_time'])
results = client.query("SELECT * from events")
print(*results.result_rows, sep="\n")
# 输出:
# (datetime.datetime(2023, 6, 15, 10, 30),)
# (datetime.datetime(2023, 6, 15, 16, 30),)
# (datetime.datetime(2023, 6, 15, 1, 30),)
在此示例中,这三个 datetime 对象由于时区不同,表示的是不同的时间点。每个对象都会被正确转换为对应的 Unix timestamp 并存储到 ClickHouse 中。
使用 pytz 时,必须使用 localize() 方法为 naive datetime 添加时区信息。直接将 tzinfo= 传给 datetime 构造函数会使用错误的历史偏移量。对于 UTC,tzinfo=pytz.UTC 可以正常工作。更多信息请参见 pytz 文档

不含时区信息的 datetime 对象

如果你插入的是不含时区信息的 Python datetime.datetime 对象 (即没有 tzinfo 的对象) ,.timestamp() 方法会将其视为系统本地时区的时间。为避免歧义,建议:
  1. 插入时始终使用带时区信息的 datetime 对象,或
  2. 确保系统时区设置为 UTC,或
  3. 在插入前手动转换为纪元时间戳
import clickhouse_connect
from datetime import datetime
import pytz

client = clickhouse_connect.get_client()

# 推荐:始终使用带时区信息的日期时间对象
utc_time = datetime(2023, 6, 15, 10, 30, 0, tzinfo=pytz.UTC)
client.insert('events', [[utc_time]], column_names=['event_time'])

# 替代方案:手动转换为纪元时间戳
naive_time = datetime(2023, 6, 15, 10, 30, 0)
epoch_timestamp = int(naive_time.replace(tzinfo=pytz.UTC).timestamp())
client.insert('events', [[epoch_timestamp]], column_names=['event_time'])

带有时区元数据的 DateTime 列

ClickHouse 列可以定义时区元数据 (例如 DateTime('America/Denver')DateTime64(3, 'Asia/Tokyo')) 。这些元数据不会影响数据的存储方式 (仍以 UTC 时间戳存储) ,但会决定从 ClickHouse 查询数据时使用的时区。 向此类列插入数据时,ClickHouse Connect 会将你的 Python datetime 转换为 Unix timestamp (如果包含时区信息,也会一并考虑) 。查询数据时,无论插入时使用的是哪个时区,ClickHouse Connect 返回的 datetime 都会转换为该列的时区。
import clickhouse_connect
from datetime import datetime
import pytz

client = clickhouse_connect.get_client()

# 创建带有洛杉矶时区元数据的表
client.command("CREATE TABLE events (event_time DateTime('America/Los_Angeles')) ENGINE Memory")

# 插入纽约时间(EDT 上午 10:30,即 UTC 14:30)
ny_tz = pytz.timezone("America/New_York")
data = ny_tz.localize(datetime(2023, 6, 15, 10, 30, 0))
client.insert("events", [[data]], column_names=["event_time"])

# 查询返回时,时间会自动转换为洛杉矶时区
# 纽约上午 10:30(UTC-4)= UTC 14:30 = 洛杉矶上午 7:30(UTC-7)
results = client.query("select * from events")
print(*results.result_rows, sep="\n")
# 输出:
# (datetime.datetime(2023, 6, 15, 7, 30, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>),)

文件插入

clickhouse_connect.driver.tools 包提供了 insert_file 方法,可将数据直接从文件系统插入现有的 ClickHouse 表中。解析工作由 ClickHouse server 负责。insert_file 接受以下参数:
ParameterTypeDefaultDescription
clientClient必填用于执行插入操作的 driver.Client
tablestr必填要插入数据的 ClickHouse 表。允许使用完整表名 (包括 database) 。
file_pathstr必填数据文件在本机文件系统中的路径
fmtstrCSV, CSVWithNames文件的 ClickHouse 输入格式。如果未提供 column_names,则默认使用 CSVWithNames
column_namesSequence of strNone数据文件中的列名列表。对于包含列名的格式,无需提供
databasestrNone表所在的 database。如果表名是完全限定名,则会忽略该值。如果未指定,插入操作将使用客户端的 database
settingsdictNone参见 settings 说明
compressionstrNone用于 Content-Encoding HTTP 请求头的受支持 ClickHouse 压缩类型 (zstd、lz4、gzip)
对于数据不一致或日期/时间值格式不常见的文件,此方法也支持适用于数据导入的 settings (例如 input_format_allow_errors_numinput_format_allow_errors_num) 。
import clickhouse_connect
from clickhouse_connect.driver.tools import insert_file

client = clickhouse_connect.get_client()
insert_file(client, 'example_table', 'my_data.csv',
            settings={'input_format_allow_errors_ratio': .2,
                      'input_format_allow_errors_num': 5})
最后修改于 2026年6月10日