ClickHouse Kafka Connect Sink é o conector do Kafka que envia dados de um tópico do Kafka para uma tabela do ClickHouse.
O Kafka Connector Sink é distribuído nos termos da Licença Apache 2.0
O framework Kafka Connect v2.7 ou posterior deve estar instalado no ambiente.
Matriz de compatibilidade de versões
| Versão do ClickHouse Kafka Connect | Versão do ClickHouse | Kafka Connect | Confluent Platform |
|---|
| 1.0.0 | > 23.3 | > 2.7 | > 6.1 |
- Vem com semântica exactly-once pronta para uso. É baseado em um novo recurso central do ClickHouse chamado KeeperMap (usado como armazenamento de estado pelo conector) e permite uma arquitetura minimalista.
- Suporte a armazenamentos de estado de terceiros: atualmente, o padrão é em memória, mas pode usar o KeeperMap (Redis será adicionado em breve).
- Integração principal: desenvolvida, mantida e suportada pela ClickHouse.
- Testado continuamente no ClickHouse Cloud.
- Inserções de dados com schema declarado e sem schema.
- Suporte a todos os tipos de dados do ClickHouse.
Obtenha os detalhes da conexão
Para se conectar ao ClickHouse via HTTP(S), você precisa das seguintes informações:
| Parâmetro(s) | Descrição |
|---|
HOST and PORT | Normalmente, a porta é 8443 ao usar TLS ou 8123 quando não se usa TLS. |
DATABASE NAME | Por padrão, há um banco de dados chamado default; use o nome do banco de dados ao qual você deseja se conectar. |
USERNAME and PASSWORD | Por padrão, o nome de usuário é default. Use o nome de usuário apropriado para o seu caso de uso. |
Os detalhes do seu serviço do ClickHouse Cloud estão disponíveis no console do ClickHouse Cloud.
Selecione um serviço e clique em Connect:
Escolha HTTPS. Os detalhes de conexão são exibidos em um comando curl de exemplo.
Se você estiver usando ClickHouse autogerenciado, os detalhes de conexão são definidos pelo administrador do seu ClickHouse.
Instruções gerais de instalação
O conector é distribuído como um único arquivo JAR contendo todos os arquivos de classe necessários para executar o plugin.
Para instalar o plugin, siga estas etapas:
- Baixe um arquivo ZIP contendo o arquivo JAR do conector na página de Releases do repositório ClickHouse Kafka Connect Sink.
- Extraia o conteúdo do arquivo ZIP e copie-o para o local desejado.
- Adicione à configuração plugin.path, no arquivo de propriedades do Connect, o caminho para o diretório do plugin, para que o Confluent Platform possa encontrá-lo.
- Forneça um nome de tópico, o hostname da instância do ClickHouse e a senha na configuração.
connector.class=com.clickhouse.kafka.connect.ClickHouseSinkConnector
tasks.max=1
topics=<topic_name>
ssl=true
jdbcConnectionProperties=?sslmode=STRICT
security.protocol=SSL
hostname=<hostname>
database=<database_name>
password=<password>
ssl.truststore.location=/tmp/kafka.client.truststore.jks
port=8443
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
exactlyOnce=true
username=default
schemas.enable=false
- Reinicie a Confluent Platform.
- Se você usa a Confluent Platform, faça login na UI do Confluent Control Center para verificar se o ClickHouse Sink está disponível na lista de conectores.
Para conectar o ClickHouse Sink ao servidor ClickHouse, você precisa fornecer:
- detalhes da conexão: hostname (obrigatório) e porta (opcional)
- credenciais do usuário: senha (obrigatória) e nome de usuário (opcional)
- classe do conector:
com.clickhouse.kafka.connect.ClickHouseSinkConnector (obrigatória)
- topics ou topics.regex: os tópicos do Kafka a serem consumidos — os nomes dos tópicos devem corresponder aos nomes das tabelas (obrigatório)
- conversores de chave e valor: defina-os com base no tipo de dados do seu tópico. Obrigatórios se ainda não estiverem definidos na configuração do worker.
A tabela completa de opções de configuração:
| Nome da propriedade | Descrição | Valor padrão |
|---|
hostname (Obrigatório) | O hostname ou endereço IP do servidor | N/A |
port | A porta do ClickHouse — o padrão é 8443 (para HTTPS na Cloud), mas para HTTP (o padrão para self-hosted), deve ser 8123 | 8443 |
ssl | Habilita a conexão SSL com o ClickHouse | true |
jdbcConnectionProperties | Propriedades de conexão ao se conectar ao ClickHouse. Deve começar com ? e os parâmetros devem ser unidos por & no formato param=value | "" |
username | Nome de usuário do banco de dados do ClickHouse | default |
password (Obrigatório) | Senha do banco de dados ClickHouse | N/D |
database | Nome do banco de dados ClickHouse | default |
connector.class (Obrigatório) | Classe do conector (defina explicitamente e mantenha o valor padrão) | "com.clickhouse.kafka.connect.ClickHouseSinkConnector" |
tasks.max | Número de tarefas do conector | "1" |
errors.retry.timeout | Duração máxima de nova tentativa do Kafka Connect em milissegundos. 0 para não fazer novas tentativas. -1 para tentativas infinitas. O valor recomendado é superior a “10000” ms (10 segundos) Tempo limite | "0" |
exactlyOnce | Exactly Once ativado | "false" |
topics (Obrigatório) | Os tópicos do Kafka a serem consultados — os nomes dos tópicos devem corresponder aos nomes das tabelas | "" |
key.converter (Obrigatório* - Consulte a descrição) | Defina de acordo com os tipos das suas chaves. Obrigatório aqui se você estiver passando chaves (e elas não estiverem definidas na configuração do worker). | "org.apache.kafka.connect.storage.StringConverter" |
value.converter (Obrigatório* - Consulte a descrição) | Defina com base no tipo de dados no seu tópico. Suporta os formatos JSON, String, Avro ou Protobuf. Obrigatório aqui se não estiver definido na configuração do worker. | "org.apache.kafka.connect.json.JsonConverter" |
value.converter.schemas.enable | Suporte a schema no conversor de valor do connector | "false" |
errors.tolerance | Tolerância a erro do connector. Suportado: none, all | "none" |
errors.deadletterqueue.topic.name | Se configurado (com errors.tolerance=all), uma DLQ será usada para lotes que falharem (consulte Solução de problemas) | "" |
errors.deadletterqueue.context.headers.enable | Adiciona headers extras para a DLQ | "" |
clickhouseSettings | Lista de configurações do ClickHouse separadas por vírgulas (por exemplo, “insert_quorum=2, etc…”) | "" |
topic2TableMap | Lista separada por vírgulas que mapeia nomes de tópicos para nomes de tabelas (por exemplo, “topic1=table1, topic2=table2, etc…”) | "" |
tableRefreshInterval | Tempo (em segundos) para atualizar o cache da definição da tabela | 0 |
keeperOnCluster | Permite configurar o parâmetro ON CLUSTER para instâncias auto-hospedadas (por exemplo, ON CLUSTER clusterNameInConfigFileDefinition) na tabela connect_state de exactly-once (consulte Consultas de DDL distribuído | "" |
bypassRowBinary | Permite desativar o uso de RowBinary e RowBinaryWithDefaults para dados baseados em schema (Avro, Protobuf etc.) — só deve ser usado quando os dados tiverem colunas ausentes e Nullable/Default não forem aceitáveis | "false" |
dateTimeFormats | Formatos de data e hora para fazer o parsing de campos DateTime64 do schema, separados por ; (por exemplo, someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss). | "" |
tolerateStateMismatch | Permite que o connector descarte registros “anteriores” ao offset atual armazenado em AFTER_PROCESSING (por exemplo, se o offset 5 for enviado e o offset 250 tiver sido o último offset registrado). Deve ser usado para corrigir a ingestão após uma falha e deve ser revertido para "false" depois disso. | "false" |
ignorePartitionsWhenBatching | Ignora a partição ao coletar mensagens para inserção (mas apenas se exactlyOnce for false). Observação de desempenho: quanto mais tarefas do conector, menos partições do Kafka serão atribuídas por tarefa — isso pode significar ganhos decrescentes. | "false" |
bufferCount (a partir da v1.3.6) | Número de registros a manter em buffer na memória antes de gravar no ClickHouse. 0 desabilita o buffer interno. O uso de buffer não é compatível com exactlyOnce=true. | "0" |
bufferFlushTime (a partir da v1.3.6) | Tempo máximo, em milissegundos, para manter registros no buffer antes de fazer o flush quando exactlyOnce=false. 0 desabilita o flush baseado em tempo. O valor padrão é 0. Necessário apenas para limiar baseado em tempo. Só tem efeito quando bufferCount > 0. | "0" |
reportInsertedOffsets (a partir da v1.3.6) | Permite retornar do preCommit apenas os offsets inseridos com sucesso (em vez de currentOffsets) quando exactlyOnce=false. Isso não se aplica quando ignorePartitionsWhenBatching=true, caso em que currentOffsets ainda são retornados. | "false" |
O ClickHouse Connect Sink lê mensagens de tópicos do Kafka e as grava nas tabelas apropriadas. O ClickHouse Connect Sink grava dados em tabelas já existentes. Certifique-se de que uma tabela de destino com o schema adequado tenha sido criada no ClickHouse antes de começar a inserir dados nela.
Cada tópico requer uma tabela de destino dedicada no ClickHouse. O nome da tabela de destino deve corresponder ao nome do tópico de origem.
Se você precisar transformar mensagens de saída antes de enviá-las para o ClickHouse Kafka Connect
Sink, use Transformações do Kafka Connect.
Tipos de dados suportados
Com um schema declarado:
| Tipo do Kafka Connect | Tipo do ClickHouse | Suportado | Primitivo |
|---|
| STRING | String | ✅ | Sim |
| STRING | JSON. Veja abaixo (1) | ✅ | Sim |
| INT8 | Int8 | ✅ | Sim |
| INT16 | Int16 | ✅ | Sim |
| INT32 | Int32 | ✅ | Sim |
| INT64 | Int64 | ✅ | Sim |
| FLOAT32 | Float32 | ✅ | Sim |
| FLOAT64 | Float64 | ✅ | Sim |
| BOOLEAN | Boolean | ✅ | Sim |
| ARRAY | Array(T) | ✅ | Não |
| MAP | Map(Primitive, T) | ✅ | Não |
| STRUCT | Variant(T1, T2, …) | ✅ | Não |
| STRUCT | Tuple(a T1, b T2, …) | ✅ | Não |
| STRUCT | Nested(a T1, b T2, …) | ✅ | Não |
| STRUCT | JSON. Veja abaixo (1), (2) | ✅ | Não |
| BYTES | String | ✅ | Não |
| org.apache.kafka.connect.data.Time | Int64 / DateTime64 | ✅ | Não |
| org.apache.kafka.connect.data.Timestamp | Int32 / Date32 | ✅ | Não |
| org.apache.kafka.connect.data.Decimal | Decimal | ✅ | Não |
-
(1) - JSON é suportado apenas quando as configurações do ClickHouse incluem
input_format_binary_read_json_as_string=1. Isso funciona apenas para a família de formatos RowBinary, e a configuração afeta todas as colunas na requisição de insert, portanto todas elas devem ser do tipo string. Nesse caso, o conector converterá STRUCT em uma string JSON.
-
(2) - Quando struct tem unions como
oneof, o converter deve ser configurado para NÃO adicionar prefixo/sufixo aos nomes de campo. Há a configuração generate.index.for.unions=false para o ProtobufConverter.
Sem um schema declarado:
Um registro é convertido em JSON e enviado ao ClickHouse como um valor no formato JSONEachRow.
Estas são algumas receitas de configuração comuns para você começar rapidamente.
A configuração mais simples para começar pressupõe que você esteja executando o Kafka Connect no modo distribuído e tenha um servidor ClickHouse em execução em localhost:8443 com SSL habilitado; os dados estão em JSON sem schema.
{
"name": "clickhouse-connect",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"tasks.max": "1",
"consumer.override.max.poll.records": "5000",
"consumer.override.max.partition.fetch.bytes": "5242880",
"database": "default",
"errors.retry.timeout": "60000",
"exactlyOnce": "false",
"hostname": "localhost",
"port": "8443",
"ssl": "true",
"jdbcConnectionProperties": "?ssl=true&sslmode=strict",
"username": "default",
"password": "<PASSWORD>",
"topics": "<TOPIC_NAME>",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"clickhouseSettings": ""
}
}
A configuração do conector acima exige que você habilite as substituições do cliente na configuração do worker por meio de connector.client.config.override.policy=All. Consulte a documentação do Kafka Connect para mais informações.
O conector pode consumir dados de vários tópicos
{
"name": "clickhouse-connect",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
...
"topics": "SAMPLE_TOPIC, ANOTHER_TOPIC, YET_ANOTHER_TOPIC",
...
}
}
{
"name": "clickhouse-connect",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
...
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "<DLQ_TOPIC>",
"errors.deadletterqueue.context.headers.enable": "true",
}
}
Suporte a schema do Avro
{
"name": "clickhouse-connect",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
...
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "<SCHEMA_REGISTRY_HOST>:<PORT>",
"value.converter.schemas.enable": "true",
}
}
Mapeamento de tipos do Avro
O mapeamento de tipos abaixo é definido por io.confluent.connect.avro.AvroConverter, a implementação oficial do serializador/desserializador Avro no Kafka Connect. Consulte a documentação do Kafka Connect para informações avançadas sobre a lógica de conversão.
✅: Compatível
❌: Não compatível
️⚠️: Parcialmente compatível
| Tipo Avro | Tipo do Kafka Connect | Compatível | Observações |
|---|
| null | N/A | ❌ | Não compatível como tipo independente, mas pode ser usado em uniões |
| boolean | BOOLEAN | ✅ | |
| int | INT8/INT16/INT32 | ✅ | Por padrão, é INT32. É resolvido como INT8 se o schema tiver a propriedade connect.type=int8 (de forma análoga para INT16 se connect.type=int16) |
| long | INT64 | ✅ | |
| float | FLOAT32 | ✅ | |
| double | FLOAT64 | ✅ | |
| bytes | BYTES | ✅ | |
| string | STRING | ✅ | |
| record | STRUCT | ✅ | |
| enum | STRING | ✅ | |
| array | ARRAY/MAP | ✅ | Por padrão, é ARRAY. É resolvido como MAP se o campo tiver sido originalmente construído por meio de AvroData.fromConnectSchema (fonte) |
| map | MAP | ✅ | |
| union | STRUCT/<T> | ⚠️ | Por padrão, é STRUCT. É resolvido como o tipo único T na definição da união se flatten.singleton.unions=true (consulte a documentação) |
| fixed | BYTES | ⚠️ | O tipo lógico decimal de tamanho fixo não é compatível (veja abaixo) |
Consulte Tipos de dados suportados para ver o mapeamento entre os tipos do Kafka Connect e os tipos do ClickHouse.
Schemas Avro sem suporte
Os seguintes schemas Avro não têm suporte no conector:
- tipo lógico
decimal em fixed
{"name": "decimal_18_4", "type": "fixed", "size": 8, "logicalType": "decimal", "precision": 18, "scale": 4}
{"name": "mixed_union", "type": ["null", "string", "int"], "default": null}
{
"name": "record_union",
"type": [
{
"type": "record",
"name": "TypeA",
"fields": [
{
"name": "label",
"type": "string"
}
]
},
{
"type": "record",
"name": "TypeB",
"fields": [
{
"name": "count",
"type": "int"
}
]
}
]
}
Suporte a schema do Protobuf
{
"name": "clickhouse-connect",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
...
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url": "<SCHEMA_REGISTRY_HOST>:<PORT>",
"value.converter.schemas.enable": "true",
}
}
Observação: se você encontrar problemas com classes ausentes, nem todo ambiente inclui o conversor protobuf, e talvez seja necessário usar uma versão alternativa do JAR com as dependências incluídas.
Mapeamento de tipos do Protobuf
O mapeamento de tipos abaixo é definido por io.confluent.connect.protobuf.ProtobufConverter, a implementação oficial de serialização/desserialização de Protobuf no Kafka Connect. Consulte a documentação do Kafka Connect para obter informações avançadas sobre a lógica de conversão.
✅: Suportado
❌: Não suportado
️⚠️: Parcialmente suportado
| Tipo do Protobuf | Tipo do Kafka Connect | Suportado | Observações |
|---|
| double | FLOAT64 | ✅ | |
| float | FLOAT32 | ✅ | |
| int32 | INT8/INT16/INT32 | ✅ | O padrão é INT32. Passa a ser INT8 se o schema tiver a opção connect.type=int8 (de forma análoga para INT16 se connect.type=int16) |
| sint32 | INT8/INT16/INT32 | ✅ | O padrão é INT32. Passa a ser INT8 se o schema tiver a opção connect.type=int8 (de forma análoga para INT16 se connect.type=int16) |
| sfixed32 | INT8/INT16/INT32 | ✅ | O padrão é INT32. Passa a ser INT8 se o schema tiver a opção connect.type=int8 (de forma análoga para INT16 se connect.type=int16) |
| uint32 | INT64 | ✅ | |
| fixed32 | INT64 | ✅ | |
| int64 | INT64 | ✅ | |
| uint64 | INT64 | ✅ | |
| sint64 | INT64 | ✅ | |
| fixed64 | INT64 | ✅ | |
| sfixed64 | INT64 | ✅ | |
| bool | BOOLEAN | ✅ | |
| string | STRING | ✅ | |
| bytes | BYTES | ✅ | |
| enum | INT32/STRING | ✅ | O padrão é STRING. Passa a ser INT32 se int.for.enums=true (consulte a documentação do Schema Registry) |
| message | STRUCT | ⚠️ | Consulte a seção Schemas não suportados abaixo |
| repeated T (where T is not a map entry) | ARRAY | ✅ | |
map<K, V> | MAP | ✅ | |
| oneof | STRUCT | ⚠️ | Consulte a seção abaixo sobre a tradução de oneof para schema do ClickHouse |
| google.protobuf.DoubleValue | FLOAT64 | ✅ | |
| google.protobuf.FloatValue | FLOAT32 | ✅ | |
| google.protobuf.Int64Value | INT64 | ✅ | |
| google.protobuf.UInt64Value | INT64 | ✅ | |
| google.protobuf.UInt32Value | INT64 | ✅ | |
| google.protobuf.Int32Value | INT32 | ✅ | |
| google.protobuf.BoolValue | BOOLEAN | ✅ | |
| google.protobuf.StringValue | STRING | ✅ | |
| google.protobuf.BytesValue | BYTES | ✅ | |
| google.protobuf.Timestamp | org.apache.kafka.connect.data.Timestamp | ✅ | |
| google.type.Date | org.apache.kafka.connect.data.Date | ✅ | |
| google.type.TimeOfDay | org.apache.kafka.connect.data.Time | ✅ | |
Consulte Tipos de dados suportados para ver o mapeamento entre os tipos do Kafka Connect e os tipos do ClickHouse.
Observação sobre a tradução de campos oneof para colunas do ClickHouse
O conector não oferece suporte à tradução de unions (oneof) do Protobuf para o tipo Variant do ClickHouse. Em vez disso, liste os campos oneof como campos Nullable individuais no schema da sua tabela do ClickHouse.
Por exemplo:
syntax = "proto3";
package com.clickhouse.kafka.connect.proto.test;
message StringIntUnion {
oneof mixed {
string mixed_string = 2;
int32 mixed_int = 3;
}
}
é traduzido na seguinte definição de tabela do ClickHouse:
CREATE TABLE IF NOT EXISTS `StringIntUnion`
(
mixed_string Nullable(String),
mixed_int Nullable(Int32)
) ENGINE = ...;
Esquemas Protobuf sem suporte
Os seguintes esquemas Protobuf não são suportados pelo conector:
- uniões com várias mensagens (antes da versão 26.1 do CH)
syntax = "proto3";
package com.clickhouse.kafka.connect.proto.test;
message TwoRecords {
oneof payload {
TypeA type_a = 2;
TypeB type_b = 3;
}
// traduz para Nullable(Tuple(label String)) no ClickHouse, o que não é suportado
message TypeA {
string label = 1;
}
// traduz para Nullable(Tuple(count Int32)) no ClickHouse, o que não é suportado
message TypeB {
int32 count = 1;
}
}
A partir da versão 26.1 do CH, esse schema é suportado quando allow_experimental_nullable_tuple_type=1 (consulte esta página da documentação).
Suporte a schema JSON
{
"name": "clickhouse-connect",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
...
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
}
}
Suporte a String
O conector oferece suporte ao String Converter em diferentes formatos do ClickHouse: JSON, CSV e TSV.
{
"name": "clickhouse-connect",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
...
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"customInsertFormat": "true",
"insertFormat": "CSV"
}
}
O buffering interno permite que a tarefa do sink acumule registros de várias chamadas poll() e os envie ao ClickHouse em batches maiores. Isso pode melhorar a vazão em workloads em que cada poll() produz muitos batches pequenos por partição.
Comportamento principal:
bufferCount controla quantos registros são mantidos em buffer antes do flush.
bufferFlushTime define um tempo máximo de espera (em milissegundos) antes de fazer o flush dos registros em buffer.
bufferFlushTime só tem efeito quando bufferCount > 0.
bufferCount=0 e bufferFlushTime=0 mantêm o buffering desabilitado (comportamento padrão).
- O buffering não é compatível quando
exactlyOnce=true.
Por que o buffering é incompatível com o modo exactly-once:
O buffering altera os limites dos batches, o que quebra a desduplicação de blocos do ClickHouse e a máquina de estados de offset do conector.
Para resolver isso, desative o modo exactly-once com exactlyOnce=false na configuração do conector ou desative o buffering com bufferCount=0.
Exemplo:
{
"name": "clickhouse-connect",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
...
"exactlyOnce": "false",
"bufferCount": "5000",
"bufferFlushTime": "2000"
}
}
O logging é fornecido automaticamente pela plataforma Kafka Connect.
O destino e o formato dos logs podem ser configurados por meio do arquivo de configuração do Kafka Connect.
Se estiver usando o Confluent Platform, os logs poderão ser visualizados executando um comando de CLI:
confluent local services connect log
Para mais detalhes, confira o tutorial oficial.
O ClickHouse Kafka Connect reporta métricas de runtime por meio do Java Management Extensions (JMX). O JMX vem habilitado no Kafka Connector por padrão.
Métricas específicas do ClickHouse
O conector expõe métricas personalizadas por meio do seguinte nome de MBean:
com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask{id}
| Nome da métrica | Tipo | Descrição |
|---|
receivedRecords | long | Número total de registros recebidos. |
recordProcessingTime | long | Tempo total, em nanossegundos, gasto no agrupamento e na conversão de registros para uma estrutura unificada. |
taskProcessingTime | long | Tempo total, em nanossegundos, gasto no processamento e na inserção de dados no ClickHouse. |
Métricas de Produtor/Consumidor do Kafka
O conector expõe métricas padrão de produtor e consumidor do Kafka que fornecem informações sobre o fluxo de dados, a taxa de transferência e o desempenho.
Métricas no Nível do Tópico:
records-sent-total: Número total de registros enviados para o tópico
bytes-sent-total: Total de bytes enviados para o tópico
record-send-rate: Taxa média de registros enviados por segundo
byte-rate: Taxa média de bytes enviados por segundo
compression-rate: Taxa de compressão obtida
Métricas no Nível da Partição:
records-sent-total: Total de registros enviados para a partição
bytes-sent-total: Total de bytes enviados para a partição
records-lag: Lag atual na partição
records-lead: Lead atual na partição
replica-fetch-lag: Informações de lag das réplicas
Métricas de Conexão no Nível do Nó:
connection-creation-total: Total de conexões criadas com o nó do Kafka
connection-close-total: Total de conexões encerradas
request-total: Total de solicitações enviadas ao nó
response-total: Total de respostas recebidas do nó
request-rate: Taxa média de solicitações por segundo
response-rate: Taxa média de respostas por segundo
Essas métricas ajudam a monitorar:
- Taxa de transferência: Acompanhar as taxas de ingestão de dados
- Lag: Identificar gargalos e atrasos no processamento
- Compressão: Medir a eficiência da compressão de dados
- Saúde da conexão: Monitorar a conectividade e a estabilidade da rede
Métricas do Kafka Connect Framework
O conector se integra ao framework do Kafka Connect e expõe métricas sobre o ciclo de vida das tarefas e o rastreamento de erros.
Métricas de status das tarefas:
task-count: Número total de tarefas no conector
running-task-count: Número de tarefas em execução no momento
paused-task-count: Número de tarefas pausadas no momento
failed-task-count: Número de tarefas que falharam
destroyed-task-count: Número de tarefas destruídas
unassigned-task-count: Número de tarefas não atribuídas
Os valores de status das tarefas incluem: running, paused, failed, destroyed, unassigned
Métricas de erro:
deadletterqueue-produce-failures: Número de gravações na DLQ que falharam
deadletterqueue-produce-requests: Total de tentativas de gravação na DLQ
last-error-timestamp: Timestamp do último erro
records-skip-total: Número total de registros ignorados devido a erros
records-retry-total: Número total de registros que passaram por nova tentativa
errors-total: Número total de erros encontrados
Métricas de desempenho:
offset-commit-failures: Número de commits de offset que falharam
offset-commit-avg-time-ms: Tempo médio dos commits de offset
offset-commit-max-time-ms: Tempo máximo dos commits de offset
put-batch-avg-time-ms: Tempo médio para processar um lote
put-batch-max-time-ms: Tempo máximo para processar um lote
source-record-poll-total: Total de registros coletados
Boas práticas de monitoramento
- Monitore o lag do consumidor: Acompanhe
records-lag por partição para identificar gargalos de processamento
- Acompanhe as taxas de erro: Observe
errors-total e records-skip-total para detectar problemas de qualidade dos dados
- Observe a integridade das tarefas: Monitore as métricas de status das tarefas para garantir que estejam em execução corretamente
- Meça a vazão: Use
records-send-rate e byte-rate para acompanhar o desempenho da ingestão
- Monitore a integridade da conexão: Verifique as métricas de conexão no nível do nó para identificar problemas de rede
- Acompanhe a eficiência da compressão: Use
compression-rate para otimizar a transferência de dados
Para ver definições detalhadas das métricas JMX e a integração com o Prometheus, consulte o arquivo de configuração jmx-export-connector.yml.
- Não há suporte a exclusões.
- O tamanho do batch é herdado das propriedades do consumer do Kafka.
- Ao usar o KeeperMap para exactly-once, se o offset for alterado ou recuado, será necessário excluir o conteúdo do KeeperMap para esse tópico específico. (Consulte o guia de solução de problemas abaixo para mais detalhes)
Esta seção aborda estratégias de ajuste de desempenho para o ClickHouse Kafka Connect Sink. O ajuste de desempenho é essencial ao lidar com casos de uso de alta vazão ou quando é necessário otimizar o uso de recursos e minimizar o lag.
O ajuste de desempenho normalmente é necessário nos seguintes cenários:
- Cargas de trabalho de alta vazão: ao processar milhões de eventos por segundo de tópicos do Kafka
- Consumer lag: quando seu conector não consegue acompanhar a taxa de produção de dados, causando um atraso cada vez maior
- Restrições de recursos: quando você precisa otimizar o uso de CPU, memória ou rede
- Múltiplos tópicos: ao consumir simultaneamente vários tópicos de alto volume
- Mensagens pequenas: ao lidar com muitas mensagens pequenas que se beneficiariam do agrupamento em lotes no lado do servidor
O ajuste de desempenho normalmente NÃO é necessário quando:
- Você está processando volumes baixos a moderados (< 10.000 mensagens/segundo)
- O consumer lag é estável e aceitável para o seu caso de uso
- As configurações padrão do conector já atendem aos seus requisitos de vazão
- Seu cluster ClickHouse consegue lidar facilmente com a carga de entrada
Entendendo o fluxo de dados
Antes de ajustar, é importante entender como os dados fluem pelo conector:
- Kafka Connect Framework busca mensagens dos tópicos do Kafka em segundo plano
- O conector faz polling de mensagens no buffer interno do framework
- O conector agrupa as mensagens em lotes com base no tamanho do polling
- O ClickHouse recebe a inserção em lote via HTTP/S
- O ClickHouse processa a inserção (de forma síncrona ou assíncrona)
O desempenho pode ser otimizado em cada uma dessas etapas.
Ajuste do tamanho do lote no Kafka Connect
O primeiro nível de otimização é controlar a quantidade de dados que o conector recebe por lote do Kafka.
Configurações de fetch
O Kafka Connect (o framework) busca mensagens de tópicos do Kafka em segundo plano, independentemente do conector:
fetch.min.bytes: Quantidade mínima de dados antes de o framework repassar os dados ao conector (padrão: 1 byte)
fetch.max.bytes: Quantidade máxima de dados a buscar em uma única solicitação (padrão: 52428800 / 50 MB)
fetch.max.wait.ms: Tempo máximo de espera antes de retornar os dados se fetch.min.bytes não for atingido (padrão: 500 ms)
No Confluent Cloud, para ajustar essas configurações, é necessário abrir um chamado de suporte pelo Confluent Cloud.
Configurações de polling
O conector busca mensagens no buffer do framework por polling:
max.poll.records: Número máximo de registros retornados em uma única consulta de polling (padrão: 500)
max.partition.fetch.bytes: Quantidade máxima de dados por partição (padrão: 1048576 / 1 MB)
No Confluent Cloud, para ajustar essas configurações, é necessário abrir um chamado de suporte pelo Confluent Cloud.
Configurações recomendadas para alta vazão
Para obter o melhor desempenho com ClickHouse, procure usar lotes maiores:
# Aumentar o número de registros por poll
consumer.override.max.poll.records=5000
# Aumentar o tamanho de fetch da partição (5 MB)
consumer.override.max.partition.fetch.bytes=5242880
# Opcional: Aumentar o tamanho mínimo de fetch para aguardar mais dados (1 MB)
consumer.override.fetch.min.bytes=1048576
# Opcional: Reduzir o tempo de espera se a latência for crítica
consumer.override.fetch.max.wait.ms=300
As propriedades acima exigem que você habilite overrides de cliente na configuração do worker por meio de connector.client.config.override.policy=All. Consulte a documentação do Kafka Connect para mais informações.
Importante: As configurações de fetch do Kafka Connect representam dados compactados, enquanto o ClickHouse recebe dados não compactados. Ajuste essas configurações com base na sua taxa de compressão.
Trade-offs:
- Lotes maiores = Melhor desempenho de ingestão no ClickHouse, menos partes, menor sobrecarga
- Lotes maiores = Maior uso de memória, com possível aumento da latência de ponta a ponta
- Lotes grandes demais = Risco de timeouts, erros de OutOfMemory ou de exceder
max.poll.interval.ms
Mais detalhes: documentação da Confluent | documentação do Kafka
As inserções assíncronas são um recurso poderoso quando o conector envia lotes relativamente pequenos ou quando você quer otimizar ainda mais a ingestão, transferindo para o ClickHouse a responsabilidade pelo agrupamento em lotes.
Quando usar inserções assíncronas
Considere ativar inserções assíncronas quando:
- Muitos lotes pequenos: Seu conector envia pequenos lotes com frequência (< 1000 linhas por lote)
- Alta concorrência: Várias tarefas do conector estão gravando na mesma tabela
- Implantação distribuída: Você executa muitas instâncias do conector em hosts diferentes
- Sobrecarga na criação de partes: Você está enfrentando erros de “too many partes”
- Carga de trabalho mista: Combinação de ingestão em tempo real com cargas de trabalho de consulta
NÃO use inserções assíncronas quando:
- Você já estiver enviando lotes grandes (> 10.000 linhas por lote) com frequência controlada
- Você precisar de visibilidade imediata dos dados (as consultas precisam ver os dados instantaneamente)
- A semântica exactly-once com
wait_for_async_insert=0 entrar em conflito com seus requisitos
- Seu caso de uso puder se beneficiar, em vez disso, de melhorias no batching no lado do cliente
Como as inserções assíncronas funcionam
Com as inserções assíncronas ativadas, o ClickHouse:
- Recebe a consulta INSERT do conector
- Grava os dados em um buffer na memória (em vez de gravá-los imediatamente no disco)
- Retorna sucesso ao conector (se
wait_for_async_insert=0)
- Grava o buffer no disco quando uma destas condições é atendida:
- O buffer atinge
async_insert_max_data_size (padrão: 100 MB)
async_insert_busy_timeout_ms milissegundos se passaram desde a primeira inserção (padrão: 1000 ms)
- Número máximo de consultas acumuladas (
async_insert_max_query_number, padrão: 100)
Isso reduz significativamente o número de partes criadas e melhora a vazão geral.
Habilitando inserções assíncronas
Adicione as configurações de async insert ao parâmetro de configuração clickhouseSettings:
{
"name": "clickhouse-connect",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
...
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
}
}
Principais configurações:
async_insert=1: Habilita inserções assíncronas
wait_for_async_insert=1 (recomendado): O conector espera os dados serem gravados no armazenamento do ClickHouse antes de confirmar o recebimento. Isso oferece garantias de entrega.
wait_for_async_insert=0: O conector confirma o recebimento imediatamente após armazenar os dados em buffer. Melhor desempenho, mas os dados podem ser perdidos se o servidor falhar antes da gravação.
Ajuste do comportamento de inserção assíncrona
Você pode ajustar o comportamento de flush do async insert:
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1,async_insert_max_data_size=104857600,async_insert_busy_timeout_ms=1000"
Parâmetros de ajuste comuns:
async_insert_max_data_size (padrão: 104857600 / 100 MB): Tamanho máximo do buffer antes da gravação
async_insert_busy_timeout_ms (padrão: 1000): Tempo máximo (ms) antes da gravação
async_insert_stale_timeout_ms (padrão: 0): Tempo (ms) desde o último insert antes da gravaação
async_insert_max_query_number (padrão: 100): Número máximo de consultas antes da gravação
Trade-offs:
- Benefícios: Menos partes, melhor desempenho de merge, menor sobrecarga de CPU, maior throughput em cenários de alta concorrência
- Considerações: Os dados não ficam imediatamente disponíveis para consulta, latência de ponta a ponta um pouco maior
- Riscos: Perda de dados em caso de falha do servidor se
wait_for_async_insert=0, possível pressão de memória com buffers grandes
Inserções assíncronas com semântica de exactly-once
Ao usar exactlyOnce=true com inserções assíncronas:
{
"config": {
"exactlyOnce": "true",
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
}
}
Importante: Sempre use wait_for_async_insert=1 com exactly-once para garantir que os commits de offset ocorram somente depois que os dados forem persistidos.
Para mais informações sobre async inserts, consulte a documentação de async inserts do ClickHouse.
Aumente o paralelismo para melhorar a vazão:
Tarefas por conector
Cada tarefa processa um subconjunto das partições do tópico. Mais tarefas = mais paralelismo, mas:
- O número máximo efetivo de tarefas = número de partições do tópico
- Cada tarefa mantém sua própria conexão com o ClickHouse
- Mais tarefas = maior sobrecarga e possível contenção de recursos
Recomendação: Comece com tasks.max igual ao número de partições do tópico e depois ajuste com base nas métricas de CPU e vazão.
Ignorando partições no agrupamento em lotes
Por padrão, o conector agrupa as mensagens em lotes por partição. Para obter maior vazão, você pode agrupar em lotes entre partições:
"ignorePartitionsWhenBatching": "true"
** Aviso**: Use apenas quando exactlyOnce=false. Essa configuração pode melhorar a vazão ao criar batches maiores, mas elimina as garantias de ordenação em cada partição.
Múltiplos tópicos de alta vazão
Se o conector estiver configurado para assinar vários tópicos, você estiver usando topic2TableMap para mapear tópicos para tabelas e estiver enfrentando um gargalo na inserção, o que resulta em consumer lag, considere criar um conector por tópico.
O principal motivo para isso acontecer é que, atualmente, os lotes são inseridos em cada tabela em série.
Recomendação: para vários tópicos de alto volume, implante uma instância de conector por tópico para maximizar a vazão de inserção em paralelo.
Considerações sobre o engine de tabela do ClickHouse
Escolha o engine de tabela do ClickHouse adequado para seu caso de uso:
MergeTree: Melhor para a maioria dos casos de uso; equilibra o desempenho de consulta e inserção
ReplicatedMergeTree: Necessário para alta disponibilidade; adiciona sobrecarga de replicação
*MergeTree com ORDER BY adequado: Otimize para seus padrões de consulta
Configurações a considerar:
CREATE TABLE my_table (...)
ENGINE = MergeTree()
ORDER BY (timestamp, id)
SETTINGS
-- Aumentar o número máximo de insert threads para escrita paralela de partes
max_insert_threads = 4,
-- Permitir inserts com quorum para maior confiabilidade (ReplicatedMergeTree)
insert_quorum = 2
Para as configurações de insert em nível de conector:
"clickhouseSettings": "insert_quorum=2,insert_quorum_timeout=60000"
Pool de conexões e timeouts
O conector mantém conexões HTTP com o ClickHouse. Ajuste os timeouts para redes com alta latência:
"clickhouseSettings": "socket_timeout=300000,connection_timeout=30000"
socket_timeout (padrão: 30000 ms): Tempo máximo para operações de leitura
connection_timeout (padrão: 10000 ms): Tempo máximo para estabelecer a conexão
Aumente esses valores se ocorrerem erros de timeout com lotes grandes.
Monitore estas métricas principais:
- Consumer lag: Use ferramentas de monitoramento do Kafka para acompanhar o lag por partição
- Métricas do conector: Monitore
receivedRecords, recordProcessingTime, taskProcessingTime via JMX (consulte Monitoring)
- Métricas do ClickHouse:
system.asynchronous_inserts: Monitore o uso do buffer de async insert
system.parts: Monitore o número de partes para detectar problemas de merge
system.merges: Monitore merges ativos
system.events: Acompanhe InsertedRows, InsertedBytes, FailedInsertQuery
Problemas comuns de desempenho:
| Sintoma | Possível causa | Solução |
|---|
| Consumer lag alto | Lotes pequenos demais | Aumente max.poll.records, habilite inserções assíncronas |
| Erros de “Too many partes” | Inserções pequenas e frequentes | Habilite inserções assíncronas, aumente o tamanho do lote |
| Erros de timeout | Lote grande, rede lenta | Reduza o tamanho do lote, aumente socket_timeout, verifique a rede |
| Alto uso de CPU | Muitas partes pequenas | Habilite inserções assíncronas, aumente as configurações de merge |
| Erros de OutOfMemory | Tamanho do lote grande demais | Reduza max.poll.records, max.partition.fetch.bytes |
| Carga desigual entre tarefas | Distribuição desigual de partições | Rebalanceie as partições ou ajuste tasks.max |
- Comece com os padrões, depois meça e ajuste com base no desempenho real
- Prefira lotes maiores: Busque 10.000-100.000 linhas por insert, quando possível
- Use inserções assíncronas ao enviar muitos lotes pequenos ou em cenários de alta concorrência
- Sempre use
wait_for_async_insert=1 com semântica de exactly-once
- Escale horizontalmente: Aumente
tasks.max até o número de partições
- Um conector por tópico de alto volume para obter vazão máxima
- Monitore continuamente: Acompanhe o consumer lag, a contagem de partes e a atividade de merge
- Teste minuciosamente: Sempre teste alterações de configuração sob carga realista antes da implantação em produção
Exemplo: Configuração de alta vazão
Aqui está um exemplo completo otimizado para alta vazão:
{
"name": "clickhouse-high-throughput",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"tasks.max": "8",
"topics": "high_volume_topic",
"hostname": "my-clickhouse-host.cloud",
"port": "8443",
"database": "default",
"username": "default",
"password": "<PASSWORD>",
"ssl": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"exactlyOnce": "false",
"ignorePartitionsWhenBatching": "true",
"consumer.override.max.poll.records": "10000",
"consumer.override.max.partition.fetch.bytes": "5242880",
"consumer.override.fetch.min.bytes": "1048576",
"consumer.override.fetch.max.wait.ms": "500",
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1,async_insert_max_data_size=16777216,async_insert_busy_timeout_ms=1000,socket_timeout=300000"
}
}
A configuração do conector acima exige que você habilite sobrescritas de cliente na configuração do seu worker por meio de connector.client.config.override.policy=All. Consulte a documentação do Kafka Connect para mais informações.
Esta configuração:
- Processa até 10.000 registros por poll
- Agrupa partições em lotes para inserts maiores
- Usa inserções assíncronas com buffer de 16 MB
- Executa 8 tarefas em paralelo (corresponda à sua quantidade de partições)
- Otimizada para vazão em vez de ordenação estrita
”Inconsistência de estado para o tópico [someTopic] partição [0]”
Isso acontece quando o offset armazenado no KeeperMap é diferente do offset armazenado no Kafka, geralmente quando um tópico foi excluído
ou quando o offset foi ajustado manualmente.
Para corrigir isso, você precisará excluir os valores antigos armazenados para esse tópico + partição:
-- Primeiro, identifique o banco de dados usado para armazenar os dados.
SELECT * FROM [database].connect_state
-- Identifique a chave que corresponde ao tópico e à partição.
ALTER TABLE [database].connect_state DELETE WHERE key = [keyname]
Esse ajuste pode ter implicações para a semântica exactly-once.
”Quais erros o conector tentará novamente?”
No momento, o foco está em identificar erros transitórios para os quais é possível fazer nova tentativa, incluindo:
ClickHouseException - Esta é uma exceção genérica que pode ser lançada pelo ClickHouse.
Em geral, ela é lançada quando o servidor está sobrecarregado, e os seguintes códigos de erro são considerados especialmente transitórios:
- 3 - UNEXPECTED_END_OF_FILE
- 107 - FILE_DOESNT_EXIST
- 159 - TIMEOUT_EXCEEDED
- 164 - READONLY
- 202 - TOO_MANY_SIMULTANEOUS_QUERIES
- 203 - NO_FREE_CONNECTION
- 209 - SOCKET_TIMEOUT
- 210 - NETWORK_ERROR
- 241 - MEMORY_LIMIT_EXCEEDED
- 242 - TABLE_IS_READ_ONLY
- 252 - TOO_MANY_PARTS
- 285 - TOO_FEW_LIVE_REPLICAS
- 319 - UNKNOWN_STATUS_OF_INSERT
- 425 - SYSTEM_ERROR
- 999 - KEEPER_EXCEPTION
SocketTimeoutException - Esta é lançada quando ocorre timeout no socket.
UnknownHostException - Esta é lançada quando não é possível resolver o host.
IOException - Esta é lançada quando há um problema de rede.
”Todos os meus dados estão em branco/zerados”
Provavelmente, os campos nos seus dados não correspondem aos campos da tabela — isso é especialmente comum com CDC (e o formato Debezium).
Uma solução comum é adicionar a transformação flatten à configuração do seu conector:
transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=_
Isso transformará seus dados de um JSON aninhado em um JSON plano (usando _ como delimitador). Os campos na tabela passarão então a seguir o formato “campo1_campo2_campo3” (ou seja, “before_id”, “after_id”, etc.).
”Quero usar minhas chaves do Kafka no ClickHouse”
As chaves do Kafka não são armazenadas no campo value por padrão, mas você pode usar a transformação KeyToValue para mover a chave para o campo value (com o novo nome de campo _key):
transforms=keyToValue
transforms.keyToValue.type=com.clickhouse.kafka.connect.transforms.KeyToValue
transforms.keyToValue.field=_key