Exemplos completos de código para a API padrão podem ser encontrados aqui.
Para a configuração da conexão, consulte Configuração.
Para os tipos de dados suportados e os mapeamentos de tipos do Go, consulte Tipos de dados.
A API database/sql, ou API “padrão”, permite usar o cliente em cenários nos quais o código da aplicação deve ser agnóstico em relação aos bancos de dados subjacentes, seguindo uma interface padrão. Isso tem um custo: camadas adicionais de abstração e indireção, além de primitivos que nem sempre estão alinhados ao ClickHouse. Ainda assim, esses custos normalmente são aceitáveis em cenários nos quais as ferramentas precisam se conectar a vários bancos de dados.
Além disso, este cliente oferece suporte ao uso de HTTP como camada de transporte — os dados ainda serão codificados no formato Native para desempenho ideal.
A conexão pode ser feita por meio de uma string DSN no formato clickhouse://<host>:<port>?<query_option>=<value> com o método Open, ou por meio do método clickhouse.OpenDB. Este último não faz parte da especificação database/sql, mas retorna uma instância de sql.DB. Esse método oferece funcionalidades como profiling, para as quais não há uma forma óbvia de exposição pela especificação database/sql.
func Connect() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
return conn.Ping()
}
func ConnectDSN() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://%s:%d?username=%s&password=%s", env.Host, env.Port, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
Exemplo completo
Em todos os exemplos a seguir, salvo indicação explícita em contrário, assumimos que a variável conn do ClickHouse já foi criada e está disponível.
A maioria das opções de configuração é compartilhada com a ClickHouse API. Consulte Configuração para ver as configurações compartilhadas. Os seguintes parâmetros de DSN específicos de SQL estão disponíveis:
hosts - lista separada por vírgulas de hosts com um único endereço para balanceamento de carga e failover - consulte Conexão com vários nós.
username/password - credenciais de autenticação - consulte Autenticação
database - seleciona o banco de dados padrão atual
dial_timeout - uma string de duração é uma sequência de números decimais, possivelmente com sinal, cada um com fração opcional e um sufixo de unidade, como 300ms, 1s. As unidades de tempo válidas são ms, s, m.
connection_open_strategy - random/in_order (o padrão é random) - consulte Conexão com vários nós
round_robin - seleciona um servidor do conjunto em round-robin
in_order - o primeiro servidor ativo é escolhido na ordem especificada
debug - habilita a saída de depuração (valor booleano)
compress - especifica o algoritmo de compressão - none (padrão), zstd, lz4, gzip, deflate, br. Se definido como true, lz4 será usado. Somente lz4 e zstd são compatíveis com a comunicação nativa.
compress_level - nível de compressão (o padrão é 0). Consulte compressão. Isso é específico do algoritmo:
gzip - -2 (Melhor velocidade) a 9 (Melhor compressão)
deflate - -2 (Melhor velocidade) a 9 (Melhor compressão)
br - 0 (Melhor velocidade) a 11 (Melhor compressão)
zstd, lz4 - ignorado
secure - estabelece uma conexão SSL segura (o padrão é false)
skip_verify - ignora a verificação do certificado (o padrão é false)
block_buffer_size - permite controlar o tamanho do buffer de bloco. Consulte BlockBufferSize. (o padrão é 2)
func ConnectSettings() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://127.0.0.1:9001,127.0.0.1:9002,%s:%d/%s?username=%s&password=%s&dial_timeout=10s&connection_open_strategy=round_robin&debug=true&compress=lz4", env.Host, env.Port, env.Database, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
Exemplo completo
Por padrão, as conexões são estabelecidas pelo protocolo nativo. Para usuários que precisem de HTTP, isso pode ser habilitado modificando o DSN para incluir o protocolo HTTP ou especificando o protocolo nas opções de conexão.
func ConnectHTTP() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Protocol: clickhouse.HTTP,
})
return conn.Ping()
}
func ConnectDSNHTTP() error {
env, err := GetStdTestEnvironment()
if err != nil {
return err
}
conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s", env.Host, env.HttpPort, env.Username, env.Password))
if err != nil {
return err
}
return conn.Ping()
}
Exemplo completo
Somente HTTPAs sessões só são necessárias ao usar o transporte HTTP. As conexões TCP nativas já têm uma sessão integrada automaticamente.
Ao usar HTTP, passe um session_id como configuração para habilitar recursos vinculados à sessão, como tabelas temporárias.
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Protocol: clickhouse.HTTP,
Settings: clickhouse.Settings{
"session_id": uuid.NewString(),
},
})
if _, err := conn.Exec(`DROP TABLE IF EXISTS example`); err != nil {
return err
}
_, err = conn.Exec(`
CREATE TEMPORARY TABLE IF NOT EXISTS example (
Col1 UInt8
)
`)
if err != nil {
return err
}
scope, err := conn.Begin()
if err != nil {
return err
}
batch, err := scope.Prepare("INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 10; i++ {
_, err := batch.Exec(
uint8(i),
)
if err != nil {
return err
}
}
rows, err := conn.Query("SELECT * FROM example")
if err != nil {
return err
}
defer rows.Close()
var (
col1 uint8
)
for rows.Next() {
if err := rows.Scan(&col1); err != nil {
return err
}
fmt.Printf("row: col1=%d\n", col1)
}
// NOTA: Não ignore a verificação de rows.Err()
if err := rows.Err(); err != nil {
return err
}
Exemplo completo
Após obter uma conexão, você pode executar instruções sql por meio do método Exec.
conn.Exec(`DROP TABLE IF EXISTS example`)
_, err = conn.Exec(`
CREATE TABLE IF NOT EXISTS example (
Col1 UInt8,
Col2 String
) engine=Memory
`)
if err != nil {
return err
}
_, err = conn.Exec("INSERT INTO example VALUES (1, 'test-1')")
Exemplo completo
Este método não aceita um contexto — por padrão, ele é executado com o contexto em segundo plano. Se necessário, você pode usar ExecContext — consulte Usando contexto.
É possível obter a semântica de lote criando um sql.Tx por meio do método Being. A partir dele, é possível obter um lote usando o método Prepare com a instrução INSERT. Isso retorna um sql.Stmt, ao qual é possível adicionar linhas usando o método Exec. O lote será acumulado na memória até que Commit seja executado no sql.Tx original.
batch, err := scope.Prepare("INSERT INTO example")
if err != nil {
return err
}
for i := 0; i < 1000; i++ {
_, err := batch.Exec(
uint8(42),
"ClickHouse", "Inc",
uuid.New(),
map[string]uint8{"key": 1}, // Map(String, UInt8)
[]string{"Q", "W", "E", "R", "T", "Y"}, // Array(String)
[]interface{}{ // Tuple(String, UInt8, Array(Map(String, String)))
"String Value", uint8(5), []map[string]string{
map[string]string{"key": "value"},
map[string]string{"key": "value"},
map[string]string{"key": "value"},
},
},
time.Now(),
)
if err != nil {
return err
}
}
return scope.Commit()
Exemplo completo
É possível consultar uma única linha usando o método QueryRow. Isso retorna um *sql.Row, no qual é possível invocar Scan com ponteiros para variáveis nas quais as colunas devem ser atribuídas. Uma variante QueryRowContext permite passar um contexto diferente de background - veja Usando Context.
row := conn.QueryRow("SELECT * FROM example")
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 interface{}
col8 time.Time
)
if err := row.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
Exemplo completo
Para iterar por várias linhas, é necessário usar o método Query. Ele retorna uma struct *sql.Rows, na qual o método Next pode ser chamado para percorrer as linhas. O equivalente QueryContext permite passar um contexto.
rows, err := conn.Query("SELECT * FROM example")
if err != nil {
return err
}
defer rows.Close()
var (
col1 uint8
col2, col3, col4 string
col5 map[string]uint8
col6 []string
col7 interface{}
col8 time.Time
)
for rows.Next() {
if err := rows.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
return err
}
fmt.Printf("row: col1=%d, col2=%s, col3=%s, col4=%s, col5=%v, col6=%v, col7=%v, col8=%v\n", col1, col2, col3, col4, col5, col6, col7, col8)
}
// NOTA: Não omita a verificação de rows.Err()
if err := rows.Err(); err != nil {
return err
}
Exemplo completo
As inserções assíncronas podem ser feitas executando uma inserção por meio do método ExecContext. Para isso, passe a ele um contexto com o modo assíncrono habilitado, como mostrado abaixo. Isso permite que o usuário especifique se o cliente deve aguardar o servidor concluir a inserção ou responder assim que os dados forem recebidos. Isso controla efetivamente o parâmetro wait_for_async_insert.
const ddl = `
CREATE TABLE example (
Col1 UInt64
, Col2 String
, Col3 Array(UInt8)
, Col4 DateTime
) ENGINE = Memory
`
if _, err := conn.Exec(ddl); err != nil {
return err
}
ctx := clickhouse.Context(context.Background(), clickhouse.WithStdAsync(false))
{
for i := 0; i < 100; i++ {
_, err := conn.ExecContext(ctx, fmt.Sprintf(`INSERT INTO example VALUES (
%d, '%s', [1, 2, 3, 4, 5, 6, 7, 8, 9], now()
)`, i, "Golang SQL database driver"))
if err != nil {
return err
}
}
}
Exemplo completo
A API padrão oferece os mesmos recursos de vinculação de parâmetros da ClickHouse API, permitindo passar parâmetros aos métodos Exec, Query e QueryRow (e às variantes equivalentes de Context). Há suporte a parâmetros posicionais, nomeados e numerados.
var count uint64
// bind posicional
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 >= ? AND Col3 < ?", 500, now.Add(time.Duration(750)*time.Second)).Scan(&count); err != nil {
return err
}
// 250
fmt.Printf("Positional bind count: %d\n", count)
// bind numérico
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= $2 AND Col3 > $1", now.Add(time.Duration(150)*time.Second), 250).Scan(&count); err != nil {
return err
}
// 100
fmt.Printf("Numeric bind count: %d\n", count)
// bind nomeado
if err = conn.QueryRow(ctx, "SELECT count() FROM example WHERE Col1 <= @col1 AND Col3 > @col3", clickhouse.Named("col1", 100), clickhouse.Named("col3", now.Add(time.Duration(50)*time.Second))).Scan(&count); err != nil {
return err
}
// 50
fmt.Printf("Named bind count: %d\n", count)
Exemplo completo
Observe que os casos especiais ainda se aplicam.
A API padrão oferece a mesma capacidade de transmitir prazos, sinais de cancelamento e outros valores com escopo de requisição por meio do contexto que a ClickHouse API. Diferentemente da ClickHouse API, isso é feito usando variantes Context dos métodos; ou seja, métodos como Exec, que usam o contexto de background por padrão, têm uma variante ExecContext, à qual um contexto pode ser passado como primeiro parâmetro. Isso permite passar um contexto em qualquer etapa do fluxo de uma aplicação. Por exemplo, você pode passar um contexto ao estabelecer uma conexão via ConnContext ou ao solicitar uma linha de consulta via QueryRowContext. Exemplos de todos os métodos disponíveis são mostrados abaixo.
Para mais detalhes sobre como usar o contexto para transmitir prazos, sinais de cancelamento, IDs de consulta, chaves de quota e configurações de conexão, consulte Usando contexto da ClickHouse API.
ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{
"async_insert": "1",
}))
// consultas podem ser canceladas usando o contexto
ctx, cancel := context.WithCancel(context.Background())
go func() {
cancel()
}()
if err = conn.QueryRowContext(ctx, "SELECT sleep(3)").Scan(); err == nil {
return fmt.Errorf("expected cancel")
}
// define um prazo limite para uma consulta - isso cancelará a consulta após o tempo absoluto ser atingido. Encerra apenas a conexão,
// as consultas continuarão até a conclusão no ClickHouse
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
defer cancel()
if err := conn.PingContext(ctx); err == nil {
return fmt.Errorf("expected deadline exceeeded")
}
// define um id de consulta para auxiliar no rastreamento de consultas nos logs, ex.: veja system.query_log
var one uint8
ctx = clickhouse.Context(context.Background(), clickhouse.WithQueryID(uuid.NewString()))
if err = conn.QueryRowContext(ctx, "SELECT 1").Scan(&one); err != nil {
return err
}
conn.ExecContext(context.Background(), "DROP QUOTA IF EXISTS foobar")
defer func() {
conn.ExecContext(context.Background(), "DROP QUOTA IF EXISTS foobar")
}()
ctx = clickhouse.Context(context.Background(), clickhouse.WithQuotaKey("abcde"))
// define uma chave de quota - primeiro crie a quota
if _, err = conn.ExecContext(ctx, "CREATE QUOTA IF NOT EXISTS foobar KEYED BY client_key FOR INTERVAL 1 minute MAX queries = 5 TO default"); err != nil {
return err
}
// consultas podem ser canceladas usando o contexto
ctx, cancel = context.WithCancel(context.Background())
// obteremos alguns resultados antes do cancelamento
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
"max_block_size": "1",
}))
rows, err := conn.QueryContext(ctx, "SELECT sleepEachRow(1), number FROM numbers(100);")
if err != nil {
return err
}
defer rows.Close()
var (
col1 uint8
col2 uint8
)
for rows.Next() {
if err := rows.Scan(&col1, &col2); err != nil {
if col2 > 3 {
fmt.Println("expected cancel")
return nil
}
return err
}
fmt.Printf("row: col2=%d\n", col2)
if col2 == 3 {
cancel()
}
}
// NOTA: Não ignore a verificação de rows.Err()
if err := rows.Err(); err != nil {
return err
}
Exemplo completo
Assim como na ClickHouse API, as informações sobre o tipo da coluna ficam disponíveis para que você possa criar, em tempo de execução, instâncias de variáveis com a tipagem correta que podem ser passadas para o Scan. Isso permite ler colunas mesmo quando o tipo não é conhecido.
const query = `
SELECT
1 AS Col1
, 'Text' AS Col2
`
rows, err := conn.QueryContext(context.Background(), query)
if err != nil {
return err
}
defer rows.Close()
columnTypes, err := rows.ColumnTypes()
if err != nil {
return err
}
vars := make([]interface{}, len(columnTypes))
for i := range columnTypes {
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}
for rows.Next() {
if err := rows.Scan(vars...); err != nil {
return err
}
for _, v := range vars {
switch v := v.(type) {
case *string:
fmt.Println(*v)
case *uint8:
fmt.Println(*v)
}
}
}
// NOTA: Não omita a verificação de rows.Err()
if err := rows.Err(); err != nil {
return err
}
Exemplo completo
Tabelas externas permitem que o cliente envie dados ao ClickHouse com uma consulta SELECT. Esses dados são colocados em uma tabela temporária e podem ser usados na própria consulta para avaliação.
Para enviar dados externos junto com uma consulta, o usuário deve criar uma tabela externa por meio de ext.NewTable antes de passá-la pelo contexto.
table1, err := ext.NewTable("external_table_1",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
if err != nil {
return err
}
for i := 0; i < 10; i++ {
if err = table1.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now()); err != nil {
return err
}
}
table2, err := ext.NewTable("external_table_2",
ext.Column("col1", "UInt8"),
ext.Column("col2", "String"),
ext.Column("col3", "DateTime"),
)
for i := 0; i < 10; i++ {
table2.Append(uint8(i), fmt.Sprintf("value_%d", i), time.Now())
}
ctx := clickhouse.Context(context.Background(),
clickhouse.WithExternalTable(table1, table2),
)
rows, err := conn.QueryContext(ctx, "SELECT * FROM external_table_1")
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var (
col1 uint8
col2 string
col3 time.Time
)
rows.Scan(&col1, &col2, &col3)
fmt.Printf("col1=%d, col2=%s, col3=%v\n", col1, col2, col3)
}
// NOTA: Não ignore a verificação de rows.Err()
if err := rows.Err(); err != nil {
return err
}
var count uint64
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_table_1").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1: %d\n", count)
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM external_table_2").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_2: %d\n", count)
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM (SELECT * FROM external_table_1 UNION ALL SELECT * FROM external_table_2)").Scan(&count); err != nil {
return err
}
fmt.Printf("external_table_1 UNION external_table_2: %d\n", count)
Exemplo completo
O ClickHouse oferece suporte à propagação de contexto de rastreamento nos transportes TCP e HTTP. Use clickhouse.WithSpan para associar um span a uma consulta por meio do contexto.
Limitação do transporte HTTPEmbora o servidor ClickHouse aceite os cabeçalhos HTTP padrão traceparent / tracestate, o transporte HTTP do clickhouse-go ainda não os envia — WithSpan não tem efeito via HTTP. Como alternativa, você pode definir o cabeçalho manualmente por meio de HttpHeaders nas opções de conexão.
var count uint64
rows := conn.QueryRowContext(clickhouse.Context(context.Background(), clickhouse.WithSpan(
trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{1, 2, 3, 4, 5},
TraceID: trace.TraceID{5, 4, 3, 2, 1},
}),
)), "SELECT COUNT() FROM (SELECT number FROM system.numbers LIMIT 5)")
if err := rows.Scan(&count); err != nil {
return err
}
// NOTA: Não omita a verificação de rows.Err()
if err := rows.Err(); err != nil {
return err
}
fmt.Printf("count: %d\n", count)
Exemplo completo
A API padrão oferece suporte aos mesmos algoritmos de compressão da ClickHouse API, ou seja, compressão lz4 e zstd no nível de bloco. Além disso, gzip, deflate e br também são compatíveis com conexões HTTP. Se qualquer um deles estiver habilitado, a compressão será aplicada aos blocos durante a inserção e às respostas de consulta. Outras requisições, como pings ou requisições de consulta, permanecerão sem compressão. Isso é consistente com as opções lz4 e zstd.
Se você usar o método OpenDB para estabelecer uma conexão, poderá passar uma configuração de compressão. Isso inclui a possibilidade de especificar o nível de compressão (veja abaixo). Se a conexão for feita via sql.Open com DSN, use o parâmetro compress. Ele pode ser um algoritmo de compressão específico, ou seja, gzip, deflate, br, zstd ou lz4, ou uma flag booleana. Se definido como true, lz4 será usado. O padrão é none, ou seja, compressão desabilitada.
conn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", env.Host, env.HttpPort)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionBrotli,
Level: 5,
},
Protocol: clickhouse.HTTP,
})
Exemplo completo
conn, err := sql.Open("clickhouse", fmt.Sprintf("http://%s:%d?username=%s&password=%s&compress=gzip&compress_level=5", env.Host, env.HttpPort, env.Username, env.Password))
Exemplo completo
O nível de compressão aplicado pode ser controlado pelo parâmetro compress_level da DSN ou pelo campo Level da opção Compression. O padrão é 0, mas varia conforme o algoritmo:
gzip - -2 (Maior velocidade) a 9 (Melhor compressão)
deflate - -2 (Maior velocidade) a 9 (Melhor compressão)
br - 0 (Maior velocidade) a 11 (Melhor compressão)
zstd, lz4 - ignorado