标准 API 的完整代码示例可在此处找到。
有关连接配置,请参见配置。
有关支持的数据类型及 Go 类型映射,请参见数据类型。
database/sql 或“标准”API 允许你在需要通过遵循标准接口来屏蔽底层数据库差异的场景中使用该客户端。这也会带来一定代价——会增加额外的抽象层和间接层,以及一些未必与 ClickHouse 完全契合的基本类型。不过,在工具需要连接多个数据库的场景中,这些成本通常是可以接受的。
此外,该客户端还支持使用 HTTP 作为传输层——数据仍会以原生格式编码,以获得最佳性能。
可以通过格式为 clickhouse://<host>:<port>?<query_option>=<value> 的 DSN 字符串结合 Open 方法建立连接,也可以使用 clickhouse.OpenDB 方法。后者虽然不属于 database/sql 规范的一部分,但会返回一个 sql.DB 实例。该方法还提供了 profiling 等功能,而这些功能显然无法通过 database/sql 规范对外暴露。
func Connect() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
return conn.Ping()
}
func ConnectDSN() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://%s:%d?username=%s&password=%s", env.Host, env.Port, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
完整示例
在后续所有示例中,除非另有明确说明,否则均假定已创建并可使用 ClickHouse conn 变量。
大多数配置选项与 ClickHouse API 通用。有关这些通用设置,请参见配置。以下是 SQL 专用的 DSN 参数:
hosts - 用于负载均衡和故障转移的单地址主机列表,以逗号分隔。请参见连接到多个节点。
username/password - 身份验证凭据。请参见身份验证
database - 选择当前默认数据库
dial_timeout - 耗时字符串可以是一个可带符号的十进制数字序列,每个数字都可以带可选的小数部分和单位后缀,例如 300ms、1s。有效的时间单位包括 ms、s、m。
connection_open_strategy - random/in_order (默认值为 random) - 请参见连接到多个节点
round_robin - 从集合中按轮询方式选择服务器
in_order - 按指定顺序选择第一个可用服务器
debug - 启用调试输出 (布尔值)
compress - 指定压缩算法 - none (默认) 、zstd、lz4、gzip、deflate、br。如果设置为 true,将使用 lz4。原生通信仅支持 lz4 和 zstd。
compress_level - 压缩级别 (默认值为 0) 。请参见压缩。该参数因算法而异:
gzip - -2 (最快速度) 到 9 (最高压缩率)
deflate - -2 (最快速度) 到 9 (最高压缩率)
br - 0 (最快速度) 到 11 (最高压缩率)
zstd, lz4 - 忽略
secure - 建立安全的 SSL 连接 (默认值为 false)
skip_verify - 跳过证书验证 (默认值为 false)
block_buffer_size - 用于控制块缓冲区大小。请参见 BlockBufferSize。 (默认值为 2)
func ConnectSettings() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://127.0.0.1:9001,127.0.0.1:9002,%s:%d/%s?username=%s&password=%s&dial_timeout=10s&connection_open_strategy=round_robin&debug=true&compress=lz4", env.Host, env.Port, env.Database, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
完整示例
默认情况下,连接是通过原生协议建立的。对于需要使用 HTTP 的用户,可以通过修改 DSN 以包含 HTTP 协议,或在连接选项中指定 Protocol 来启用 HTTP。
func ConnectHTTP() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Protocol: clickhouse.HTTP,
})
return conn.Ping()
}
func ConnectDSNHTTP() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s", env.Host, env.HttpPort, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
完整示例
仅限 HTTP只有在使用 HTTP 传输时才需要会话。原生 TCP 连接会自动提供内置会话。
使用 HTTP 时,请通过设置传入 session_id,以启用与会话绑定的功能,例如临时表。
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Protocol: clickhouse.HTTP,
Settings: clickhouse.Settings{
"session_id": uuid.NewString(),
},
})
if _, err := conn.Exec(`DROP TABLE IF EXISTS example`); err != nil {
return err
}
_, err = conn.Exec(`
CREATE TEMPORARY TABLE IF NOT EXISTS example (
Col1 UInt8
)
`)
if err != nil {
return err
}
scope, err := conn.Begin()
if err != nil {
return err
}
batch, err := scope.Prepare("INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 10; i++ {
_, err := batch.Exec(
uint8(i),
)
if err != nil {
return err
}
}
rows, err := conn.Query("SELECT * FROM example")
if err != nil {
return err
}
defer rows.Close()
var (
col1 uint8
)
for rows.Next() {
if err := rows.Scan(&col1); err != nil {
return err
}
fmt.Printf("row: col1=%d\n", col1)
}
// 注意:不要跳过 rows.Err() 检查
if err := rows.Err(); err != nil {
return err
}
完整示例
获取连接后,你可以通过 Exec 方法执行 sql 语句。
conn.Exec(`DROP TABLE IF EXISTS example`)
_, err = conn.Exec(`
CREATE TABLE IF NOT EXISTS example (
Col1 UInt8,
Col2 String
) engine=Memory
`)
if err != nil {
return err
}
_, err = conn.Exec("INSERT INTO example VALUES (1, 'test-1')")
完整示例
该方法不支持传入上下文,默认会在 background 上下文下执行。如有需要,可使用 ExecContext——请参见使用上下文。
可以通过 Being 方法创建一个 sql.Tx,以实现批次语义。基于此,可使用带有 INSERT 语句的 Prepare 方法获取一个批次。这会返回一个 sql.Stmt,可通过 Exec 方法向其中追加行。在原始 sql.Tx 上执行 Commit 之前,该批次会一直累积在内存中。
batch, err := scope.Prepare("INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 1000; i++ {
_, err := batch.Exec(
uint8(42),
"ClickHouse", "Inc",
uuid.New(),
map[string]uint8{"key": 1}, // Map(String, UInt8)
[]string{"Q", "W", "E", "R", "T", "Y"}, // Array(String)
[]interface{}{ // Tuple(String, UInt8, Array(Map(String, String)))
"String Value", uint8(5), []map[string]string{
map[string]string{"key": "value"},
map[string]string{"key": "value"},
map[string]string{"key": "value"},
},
},
time.Now(),
)
if err != nil {
return err
}
}
return scope.Commit()
完整示例
可以使用 QueryRow 方法查询单行数据。它会返回一个 *sql.Row,可在其上调用 Scan,并传入变量指针,以便将各列的数据写入这些变量中。QueryRowContext 变体允许传入除 background 之外的上下文 - 请参见 使用 Context。
row := conn.QueryRow("SELECT * FROM example")
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 interface{}
col8 time.Time
)
if err := row.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
完整示例
遍历多行需要使用 Query 方法。该方法会返回一个 *sql.Rows 结构体,可在其上调用 Next 来逐行遍历。与之对应的 QueryContext 则允许传入上下文。
rows, err := conn.Query("SELECT * FROM example")
if err != nil {
return err
}
defer rows.Close()
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 interface{}
col8 time.Time
)
for rows.Next() {
if err := rows.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s, col4=%s, col5=%v, col6=%v, col7=%v, col8=%v\n", col1, col2, col3, col4, col5, col6, col7, col8)
}
// 注意:不要跳过 rows.Err() 的检查
if err := rows.Err(); err != nil {
return err
}
完整示例
可通过 ExecContext 方法执行插入来实现异步插入。如下面所示,需要向其传入启用了异步模式的上下文。这样,用户便可指定客户端是等待服务器完成插入,还是在接收到数据后就立即返回响应。这实际上控制的是参数 wait_for_async_insert。
const ddl = `
CREATE TABLE example (
Col1 UInt64
, Col2 String
, Col3 Array(UInt8)
, Col4 DateTime
) ENGINE = Memory
`
if _, err := conn.Exec(ddl); err != nil {
return err
}
ctx := clickhouse.Context(context.Background(), clickhouse.WithStdAsync(false))
{
for i := 0; i < 100; i++ {
_, err := conn.ExecContext(ctx, fmt.Sprintf(`INSERT INTO example VALUES (
%d, '%s', [1, 2, 3, 4, 5, 6, 7, 8, 9], now()
)`, i, "Golang SQL database driver"))
if err != nil {
return err
}
}
}
完整示例
标准 API 同样支持与 ClickHouse API 相同的参数绑定能力,可将参数传递给 Exec、Query 和 QueryRow 方法 (以及它们对应的 上下文 变体) 。支持位置参数、命名参数和编号参数。
var count uint64
// 位置绑定
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 >= ? AND Col3 < ?", 500, now.Add(time.Duration(750)*time.Second)).Scan(&count); err != nil {
return err
}
// 250
fmt.Printf("Positional bind count: %d\n", count)
// 数字绑定
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= $2 AND Col3 > $1", now.Add(time.Duration(150)*time.Second), 250).Scan(&count); err != nil {
return err
}
// 100
fmt.Printf("Numeric bind count: %d\n", count)
// 命名绑定
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= @col1 AND Col3 > @col3", clickhouse.Named("col1", 100), clickhouse.Named("col3", now.Add(time.Duration(50)*time.Second))).Scan(&count); err != nil {
return err
}
// 50
fmt.Printf("Named bind count: %d\n", count)
完整示例
请注意,特殊情况 仍适用。
标准 API 与 ClickHouse API 一样,支持通过上下文传递截止时间、取消信号以及其他请求作用域的值。与 ClickHouse API 不同,这一能力是通过使用方法的 Context 变体来实现的。也就是说,像 Exec 这类默认使用 background 上下文的方法,都有一个对应的变体 ExecContext,可将上下文作为第一个参数传入。这样一来,你就可以在应用流程的任何阶段传递上下文。例如,可以在通过 ConnContext 建立连接时传递上下文,或者在通过 QueryRowContext 请求查询行时传递上下文。下面给出了所有可用方法的示例。
有关如何使用上下文传递截止时间、取消信号、查询 id、配额键和连接设置的更多信息,请参阅 ClickHouse API 的 使用上下文。
ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{
"async_insert": "1",
}))
// queries can be cancelled using the context
ctx, cancel := context.WithCancel(context.Background())
go func() {
cancel()
}()
if err = conn.QueryRowContext(ctx, "SELECT sleep(3)").Scan(); err == nil {
return fmt.Errorf("expected cancel")
}
// 为查询设置截止时间——到达指定的绝对时间后将取消查询。同样只会终止连接,
// 查询将在 ClickHouse 中继续执行直至完成
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
defer cancel()
if err := conn.PingContext(ctx); err == nil {
return fmt.Errorf("expected deadline exceeeded")
}
// set a query id to assist tracing queries in logs e.g. see system.query_log
var one uint8
ctx = clickhouse.Context(context.Background(), clickhouse.WithQueryID(uuid.NewString()))
if err = conn.QueryRowContext(ctx, "SELECT 1").Scan(&one); err != nil {
return err
}
conn.ExecContext(context.Background(), "DROP QUOTA IF EXISTS foobar")
defer func() {
conn.ExecContext(context.Background(), "DROP QUOTA IF EXISTS foobar")
}()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQuotaKey("abcde"))
// set a quota key - first create the quota
if _, err = conn.ExecContext(ctx, "CREATE QUOTA IF NOT EXISTS foobar KEYED BY client_key FOR INTERVAL 1 minute MAX queries = 5 TO default"); err != nil {
return err
}
// queries can be cancelled using the context
ctx, cancel = context.WithCancel(context.Background())
// we will get some results before cancel
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
"max_block_size": "1",
}))
rows, err := conn.QueryContext(ctx, "SELECT sleepEachRow(1), number FROM numbers(100);")
if err != nil {
return err
}
defer rows.Close()
var (
col1 uint8
col2 uint8
)
for rows.Next() {
if err := rows.Scan(&col1, &col2); err != nil {
if col2 > 3 {
fmt.Println("expected cancel")
return nil
}
return err
}
fmt.Printf("row: col2=%d\n", col2)
if col2 == 3 {
cancel()
}
}
// NOTE: Do not skip rows.Err() check
if err := rows.Err(); err != nil {
return err
}
完整示例
与 ClickHouse API 类似,这里也提供列类型信息,因此你可以在运行时创建类型正确的变量实例,并将其传递给 Scan。这样,即使事先不知道类型,也能读取列。
const query = `
SELECT
1 AS Col1
, 'Text' AS Col2
`
rows, err := conn.QueryContext(context.Background(), query)
if err != nil {
return err
}
defer rows.Close()
columnTypes, err := rows.ColumnTypes()
if err != nil {
return err
}
vars := make([]interface{}, len(columnTypes))
for i := range columnTypes {
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}
for rows.Next() {
if err := rows.Scan(vars...); err != nil {
return err
}
for _, v := range vars {
switch v := v.(type) {
case *string:
fmt.Println(*v)
case *uint8:
fmt.Println(*v)
}
}
}
// 注意:不要跳过对 rows.Err() 的检查
if err := rows.Err(); err != nil {
return err
}
完整示例
外部表允许客户端在发送 SELECT 查询时一并向 ClickHouse 传送数据。这些数据会被放入临时表中,并可在查询中用于求值。
要在发起查询时向 ClickHouse 发送外部数据,用户必须先通过 ext.NewTable 构建一个外部表,然后再通过上下文将其传入。
table1, err := ext.NewTable("external_table_1",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
if err != nil {
return err
}
for i := 0; i < 10; i++ {
if err = table1.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now()); err != nil {
return err
}
}
table2, err := ext.NewTable("external_table_2",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
for i := 0; i < 10; i++ {
table2.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now())
}
ctx := clickhouse.Context(context.Background(),
clickhouse.WithExternalTable(table1, table2),
)
rows, err := conn.QueryContext(ctx, "SELECT * FROM external_table_1")
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var (
col1 uint8
col2 string
col3 time.Time
)
rows.Scan(&col1, &col2, &col3)
fmt.Printf("col1=%d, col2=%s, col3=%v\n", col1, col2, col3)
}
// 注意:不要跳过 rows.Err() 检查
if err := rows.Err(); err != nil {
return err
}
var count uint64
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_table_1").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1: %d\n", count)
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_table_2").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_2: %d\n", count)
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM (SELECT * FROM external_table_1 UNION ALL SELECT * FROM external_table_2)").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1 UNION external_table_2: %d\n", count)
完整示例
ClickHouse 在 TCP 和 HTTP 传输中都支持 trace 上下文传播。使用 clickhouse.WithSpan 可通过上下文将 span 附加到查询上。
HTTP 传输限制虽然 ClickHouse server 接受标准的 traceparent / tracestate HTTP 请求头,但 clickhouse-go 的 HTTP 传输目前不会发送这些请求头,因此 WithSpan 在 HTTP 下不起作用。临时的解决办法是通过连接选项中的 HttpHeaders 手动设置请求头。
var count uint64
rows := conn.QueryRowContext(clickhouse.Context(context.Background(), clickhouse.WithSpan(
trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{1, 2, 3, 4, 5},
TraceID: trace.TraceID{5, 4, 3, 2, 1},
}),
)), "SELECT COUNT() FROM (SELECT number FROM system.numbers LIMIT 5)")
if err := rows.Scan(&count); err != nil {
return err
}
// 注意:不要跳过 rows.Err() 的检查
if err := rows.Err(); err != nil {
return err
}
fmt.Printf("count: %d\n", count)
完整示例
标准 API 支持与原生 ClickHouse API 相同的压缩算法,即在块级别支持 lz4 和 zstd 压缩。此外,HTTP 连接还支持 gzip、deflate 和 br 压缩。如果启用了其中任意一种,插入时以及查询响应中的块都会被压缩。其他请求 (例如 ping 或查询请求) 则保持未压缩状态。这与 lz4 和 zstd 选项的行为一致。
如果使用 OpenDB 方法建立连接,可以传入 Compression 配置。其中还可以指定压缩级别 (见下文) 。如果通过带 DSN 的 sql.Open 进行连接,请使用参数 compress。该参数既可以指定具体的压缩算法,即 gzip、deflate、br、zstd 或 lz4,也可以是布尔值。如果将其设为 true,则会使用 lz4。默认值为 none,即禁用压缩。
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionBrotli,
Level: 5,
},
Protocol: clickhouse.HTTP,
})
完整示例
conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s&compress=gzip&compress_level=5", env.Host, env.HttpPort, env.Username, env.Password))
完整示例
可通过 DSN 参数 compress_level 或 Compression 选项中的 Level 字段控制所应用的压缩级别。默认值为 0,但具体含义因算法而异:
gzip - -2 (最快速度) 到 9 (最佳压缩)
deflate - -2 (最快速度) 到 9 (最佳压缩)
br - 0 (最快速度) 到 11 (最佳压缩)
zstd, lz4 - 会被忽略