Pular para o conteúdo principal
Se precisar de ajuda, abra uma issue no repositório ou faça uma pergunta no Slack público do ClickHouse.
ClickHouse Kafka Connect Sink é o conector do Kafka que envia dados de um tópico do Kafka para uma tabela do ClickHouse.

Licença

O Kafka Connector Sink é distribuído nos termos da Licença Apache 2.0

Requisitos do ambiente

O framework Kafka Connect v2.7 ou posterior deve estar instalado no ambiente.

Matriz de compatibilidade de versões

Versão do ClickHouse Kafka ConnectVersão do ClickHouseKafka ConnectConfluent Platform
1.0.0> 23.3> 2.7> 6.1

Principais recursos

  • Vem com semântica exactly-once pronta para uso. É baseado em um novo recurso central do ClickHouse chamado KeeperMap (usado como armazenamento de estado pelo conector) e permite uma arquitetura minimalista.
  • Suporte a armazenamentos de estado de terceiros: atualmente, o padrão é em memória, mas pode usar o KeeperMap (Redis será adicionado em breve).
  • Integração principal: desenvolvida, mantida e suportada pela ClickHouse.
  • Testado continuamente no ClickHouse Cloud.
  • Inserções de dados com schema declarado e sem schema.
  • Suporte a todos os tipos de dados do ClickHouse.

Instruções de instalação

Obtenha os detalhes da conexão

Para se conectar ao ClickHouse via HTTP(S), você precisa das seguintes informações:
Parâmetro(s)Descrição
HOST and PORTNormalmente, a porta é 8443 ao usar TLS ou 8123 quando não se usa TLS.
DATABASE NAMEPor padrão, há um banco de dados chamado default; use o nome do banco de dados ao qual você deseja se conectar.
USERNAME and PASSWORDPor padrão, o nome de usuário é default. Use o nome de usuário apropriado para o seu caso de uso.
Os detalhes do seu serviço do ClickHouse Cloud estão disponíveis no console do ClickHouse Cloud. Selecione um serviço e clique em Connect: Escolha HTTPS. Os detalhes de conexão são exibidos em um comando curl de exemplo. Se você estiver usando ClickHouse autogerenciado, os detalhes de conexão são definidos pelo administrador do seu ClickHouse.

Instruções gerais de instalação

O conector é distribuído como um único arquivo JAR contendo todos os arquivos de classe necessários para executar o plugin. Para instalar o plugin, siga estas etapas:
  • Baixe um arquivo ZIP contendo o arquivo JAR do conector na página de Releases do repositório ClickHouse Kafka Connect Sink.
  • Extraia o conteúdo do arquivo ZIP e copie-o para o local desejado.
  • Adicione à configuração plugin.path, no arquivo de propriedades do Connect, o caminho para o diretório do plugin, para que o Confluent Platform possa encontrá-lo.
  • Forneça um nome de tópico, o hostname da instância do ClickHouse e a senha na configuração.
connector.class=com.clickhouse.kafka.connect.ClickHouseSinkConnector
tasks.max=1
topics=<topic_name>
ssl=true
jdbcConnectionProperties=?sslmode=STRICT
security.protocol=SSL
hostname=<hostname>
database=<database_name>
password=<password>
ssl.truststore.location=/tmp/kafka.client.truststore.jks
port=8443
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
exactlyOnce=true
username=default
schemas.enable=false
  • Reinicie a Confluent Platform.
  • Se você usa a Confluent Platform, faça login na UI do Confluent Control Center para verificar se o ClickHouse Sink está disponível na lista de conectores.

Opções de configuração

Para conectar o ClickHouse Sink ao servidor ClickHouse, você precisa fornecer:
  • detalhes da conexão: hostname (obrigatório) e porta (opcional)
  • credenciais do usuário: senha (obrigatória) e nome de usuário (opcional)
  • classe do conector: com.clickhouse.kafka.connect.ClickHouseSinkConnector (obrigatória)
  • topics ou topics.regex: os tópicos do Kafka a serem consumidos — os nomes dos tópicos devem corresponder aos nomes das tabelas (obrigatório)
  • conversores de chave e valor: defina-os com base no tipo de dados do seu tópico. Obrigatórios se ainda não estiverem definidos na configuração do worker.
A tabela completa de opções de configuração:
Nome da propriedadeDescriçãoValor padrão
hostname (Obrigatório)O hostname ou endereço IP do servidorN/A
portA porta do ClickHouse — o padrão é 8443 (para HTTPS na Cloud), mas para HTTP (o padrão para self-hosted), deve ser 81238443
sslHabilita a conexão SSL com o ClickHousetrue
jdbcConnectionPropertiesPropriedades de conexão ao se conectar ao ClickHouse. Deve começar com ? e os parâmetros devem ser unidos por & no formato param=value""
usernameNome de usuário do banco de dados do ClickHousedefault
password (Obrigatório)Senha do banco de dados ClickHouseN/D
databaseNome do banco de dados ClickHousedefault
connector.class (Obrigatório)Classe do conector (defina explicitamente e mantenha o valor padrão)"com.clickhouse.kafka.connect.ClickHouseSinkConnector"
tasks.maxNúmero de tarefas do conector"1"
errors.retry.timeoutDuração máxima de nova tentativa do Kafka Connect em milissegundos. 0 para não fazer novas tentativas. -1 para tentativas infinitas. O valor recomendado é superior a “10000” ms (10 segundos) Tempo limite"0"
exactlyOnceExactly Once ativado"false"
topics (Obrigatório)Os tópicos do Kafka a serem consultados — os nomes dos tópicos devem corresponder aos nomes das tabelas""
key.converter (Obrigatório* - Consulte a descrição)Defina de acordo com os tipos das suas chaves. Obrigatório aqui se você estiver passando chaves (e elas não estiverem definidas na configuração do worker)."org.apache.kafka.connect.storage.StringConverter"
value.converter (Obrigatório* - Consulte a descrição)Defina com base no tipo de dados no seu tópico. Suporta os formatos JSON, String, Avro ou Protobuf. Obrigatório aqui se não estiver definido na configuração do worker."org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enableSuporte a schema no conversor de valor do connector"false"
errors.toleranceTolerância a erro do connector. Suportado: none, all"none"
errors.deadletterqueue.topic.nameSe configurado (com errors.tolerance=all), uma DLQ será usada para lotes que falharem (consulte Solução de problemas)""
errors.deadletterqueue.context.headers.enableAdiciona headers extras para a DLQ""
clickhouseSettingsLista de configurações do ClickHouse separadas por vírgulas (por exemplo, “insert_quorum=2, etc…”)""
topic2TableMapLista separada por vírgulas que mapeia nomes de tópicos para nomes de tabelas (por exemplo, “topic1=table1, topic2=table2, etc…”)""
tableRefreshIntervalTempo (em segundos) para atualizar o cache da definição da tabela0
keeperOnClusterPermite configurar o parâmetro ON CLUSTER para instâncias auto-hospedadas (por exemplo, ON CLUSTER clusterNameInConfigFileDefinition) na tabela connect_state de exactly-once (consulte Consultas de DDL distribuído""
bypassRowBinaryPermite desativar o uso de RowBinary e RowBinaryWithDefaults para dados baseados em schema (Avro, Protobuf etc.) — só deve ser usado quando os dados tiverem colunas ausentes e Nullable/Default não forem aceitáveis"false"
dateTimeFormatsFormatos de data e hora para fazer o parsing de campos DateTime64 do schema, separados por ; (por exemplo, someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss).""
tolerateStateMismatchPermite que o connector descarte registros “anteriores” ao offset atual armazenado em AFTER_PROCESSING (por exemplo, se o offset 5 for enviado e o offset 250 tiver sido o último offset registrado). Deve ser usado para corrigir a ingestão após uma falha e deve ser revertido para "false" depois disso."false"
ignorePartitionsWhenBatchingIgnora a partição ao coletar mensagens para inserção (mas apenas se exactlyOnce for false). Observação de desempenho: quanto mais tarefas do conector, menos partições do Kafka serão atribuídas por tarefa — isso pode significar ganhos decrescentes."false"
bufferCount (a partir da v1.3.6)Número de registros a manter em buffer na memória antes de gravar no ClickHouse. 0 desabilita o buffer interno. O uso de buffer não é compatível com exactlyOnce=true."0"
bufferFlushTime (a partir da v1.3.6)Tempo máximo, em milissegundos, para manter registros no buffer antes de fazer o flush quando exactlyOnce=false. 0 desabilita o flush baseado em tempo. O valor padrão é 0. Necessário apenas para limiar baseado em tempo. Só tem efeito quando bufferCount > 0."0"
reportInsertedOffsets (a partir da v1.3.6)Permite retornar do preCommit apenas os offsets inseridos com sucesso (em vez de currentOffsets) quando exactlyOnce=false. Isso não se aplica quando ignorePartitionsWhenBatching=true, caso em que currentOffsets ainda são retornados."false"

Tabelas de destino

O ClickHouse Connect Sink lê mensagens de tópicos do Kafka e as grava nas tabelas apropriadas. O ClickHouse Connect Sink grava dados em tabelas já existentes. Certifique-se de que uma tabela de destino com o schema adequado tenha sido criada no ClickHouse antes de começar a inserir dados nela. Cada tópico requer uma tabela de destino dedicada no ClickHouse. O nome da tabela de destino deve corresponder ao nome do tópico de origem.

Pré-processamento

Se você precisar transformar mensagens de saída antes de enviá-las para o ClickHouse Kafka Connect Sink, use Transformações do Kafka Connect.

Tipos de dados suportados

Com um schema declarado:
Tipo do Kafka ConnectTipo do ClickHouseSuportadoPrimitivo
STRINGStringSim
STRINGJSON. Veja abaixo (1)Sim
INT8Int8Sim
INT16Int16Sim
INT32Int32Sim
INT64Int64Sim
FLOAT32Float32Sim
FLOAT64Float64Sim
BOOLEANBooleanSim
ARRAYArray(T)Não
MAPMap(Primitive, T)Não
STRUCTVariant(T1, T2, …)Não
STRUCTTuple(a T1, b T2, …)Não
STRUCTNested(a T1, b T2, …)Não
STRUCTJSON. Veja abaixo (1), (2)Não
BYTESStringNão
org.apache.kafka.connect.data.TimeInt64 / DateTime64Não
org.apache.kafka.connect.data.TimestampInt32 / Date32Não
org.apache.kafka.connect.data.DecimalDecimalNão
  • (1) - JSON é suportado apenas quando as configurações do ClickHouse incluem input_format_binary_read_json_as_string=1. Isso funciona apenas para a família de formatos RowBinary, e a configuração afeta todas as colunas na requisição de insert, portanto todas elas devem ser do tipo string. Nesse caso, o conector converterá STRUCT em uma string JSON.
  • (2) - Quando struct tem unions como oneof, o converter deve ser configurado para NÃO adicionar prefixo/sufixo aos nomes de campo. Há a configuração generate.index.for.unions=false para o ProtobufConverter.
Sem um schema declarado: Um registro é convertido em JSON e enviado ao ClickHouse como um valor no formato JSONEachRow.

Receitas de configuração

Estas são algumas receitas de configuração comuns para você começar rapidamente.

Configuração básica

A configuração mais simples para começar pressupõe que você esteja executando o Kafka Connect no modo distribuído e tenha um servidor ClickHouse em execução em localhost:8443 com SSL habilitado; os dados estão em JSON sem schema.
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "1",
    "consumer.override.max.poll.records": "5000",
    "consumer.override.max.partition.fetch.bytes": "5242880",
    "database": "default",
    "errors.retry.timeout": "60000",
    "exactlyOnce": "false",
    "hostname": "localhost",
    "port": "8443",
    "ssl": "true",
    "jdbcConnectionProperties": "?ssl=true&sslmode=strict",
    "username": "default",
    "password": "<PASSWORD>",
    "topics": "<TOPIC_NAME>",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "clickhouseSettings": ""
  }
}
A configuração do conector acima exige que você habilite as substituições do cliente na configuração do worker por meio de connector.client.config.override.policy=All. Consulte a documentação do Kafka Connect para mais informações.

Configuração básica com vários tópicos

O conector pode consumir dados de vários tópicos
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "topics": "SAMPLE_TOPIC, ANOTHER_TOPIC, YET_ANOTHER_TOPIC",
    ...
  }
}

Configuração básica com DLQ

{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "<DLQ_TOPIC>",
    "errors.deadletterqueue.context.headers.enable": "true",
  }
}

Uso com diferentes formatos de dados

Suporte a schema do Avro
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "<SCHEMA_REGISTRY_HOST>:<PORT>",
    "value.converter.schemas.enable": "true",
  }
}
Mapeamento de tipos do Avro
O mapeamento de tipos abaixo é definido por io.confluent.connect.avro.AvroConverter, a implementação oficial do serializador/desserializador Avro no Kafka Connect. Consulte a documentação do Kafka Connect para informações avançadas sobre a lógica de conversão. ✅: Compatível ❌: Não compatível ️⚠️: Parcialmente compatível
Tipo AvroTipo do Kafka ConnectCompatívelObservações
nullN/ANão compatível como tipo independente, mas pode ser usado em uniões
booleanBOOLEAN
intINT8/INT16/INT32Por padrão, é INT32. É resolvido como INT8 se o schema tiver a propriedade connect.type=int8 (de forma análoga para INT16 se connect.type=int16)
longINT64
floatFLOAT32
doubleFLOAT64
bytesBYTES
stringSTRING
recordSTRUCT
enumSTRING
arrayARRAY/MAPPor padrão, é ARRAY. É resolvido como MAP se o campo tiver sido originalmente construído por meio de AvroData.fromConnectSchema (fonte)
mapMAP
unionSTRUCT/<T>⚠️Por padrão, é STRUCT. É resolvido como o tipo único T na definição da união se flatten.singleton.unions=true (consulte a documentação)
fixedBYTES⚠️O tipo lógico decimal de tamanho fixo não é compatível (veja abaixo)
Consulte Tipos de dados suportados para ver o mapeamento entre os tipos do Kafka Connect e os tipos do ClickHouse.
Schemas Avro sem suporte
Os seguintes schemas Avro não têm suporte no conector:
  • tipo lógico decimal em fixed
{"name": "decimal_18_4", "type": "fixed", "size": 8, "logicalType": "decimal", "precision": 18, "scale": 4}
  • uniões Nullable
{"name": "mixed_union", "type": ["null", "string", "int"], "default": null}
  • unions em registros
{
  "name": "record_union",
  "type": [
    {
      "type": "record",
      "name": "TypeA",
      "fields": [
        {
          "name": "label",
          "type": "string"
        }
      ]
    },
    {
      "type": "record",
      "name": "TypeB",
      "fields": [
        {
          "name": "count",
          "type": "int"
        }
      ]
    }
  ]
}
Suporte a schema do Protobuf
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "<SCHEMA_REGISTRY_HOST>:<PORT>",
    "value.converter.schemas.enable": "true",
  }
}
Observação: se você encontrar problemas com classes ausentes, nem todo ambiente inclui o conversor protobuf, e talvez seja necessário usar uma versão alternativa do JAR com as dependências incluídas.
Mapeamento de tipos do Protobuf
O mapeamento de tipos abaixo é definido por io.confluent.connect.protobuf.ProtobufConverter, a implementação oficial de serialização/desserialização de Protobuf no Kafka Connect. Consulte a documentação do Kafka Connect para obter informações avançadas sobre a lógica de conversão. ✅: Suportado ❌: Não suportado ️⚠️: Parcialmente suportado
Tipo do ProtobufTipo do Kafka ConnectSuportadoObservações
doubleFLOAT64
floatFLOAT32
int32INT8/INT16/INT32O padrão é INT32. Passa a ser INT8 se o schema tiver a opção connect.type=int8 (de forma análoga para INT16 se connect.type=int16)
sint32INT8/INT16/INT32O padrão é INT32. Passa a ser INT8 se o schema tiver a opção connect.type=int8 (de forma análoga para INT16 se connect.type=int16)
sfixed32INT8/INT16/INT32O padrão é INT32. Passa a ser INT8 se o schema tiver a opção connect.type=int8 (de forma análoga para INT16 se connect.type=int16)
uint32INT64
fixed32INT64
int64INT64
uint64INT64
sint64INT64
fixed64INT64
sfixed64INT64
boolBOOLEAN
stringSTRING
bytesBYTES
enumINT32/STRINGO padrão é STRING. Passa a ser INT32 se int.for.enums=true (consulte a documentação do Schema Registry)
messageSTRUCT⚠️Consulte a seção Schemas não suportados abaixo
repeated T (where T is not a map entry)ARRAY
map<K, V>MAP
oneofSTRUCT⚠️Consulte a seção abaixo sobre a tradução de oneof para schema do ClickHouse
google.protobuf.DoubleValueFLOAT64
google.protobuf.FloatValueFLOAT32
google.protobuf.Int64ValueINT64
google.protobuf.UInt64ValueINT64
google.protobuf.UInt32ValueINT64
google.protobuf.Int32ValueINT32
google.protobuf.BoolValueBOOLEAN
google.protobuf.StringValueSTRING
google.protobuf.BytesValueBYTES
google.protobuf.Timestamporg.apache.kafka.connect.data.Timestamp
google.type.Dateorg.apache.kafka.connect.data.Date
google.type.TimeOfDayorg.apache.kafka.connect.data.Time
Consulte Tipos de dados suportados para ver o mapeamento entre os tipos do Kafka Connect e os tipos do ClickHouse.
Observação sobre a tradução de campos oneof para colunas do ClickHouse
O conector não oferece suporte à tradução de unions (oneof) do Protobuf para o tipo Variant do ClickHouse. Em vez disso, liste os campos oneof como campos Nullable individuais no schema da sua tabela do ClickHouse. Por exemplo:
syntax = "proto3";

package com.clickhouse.kafka.connect.proto.test;

message StringIntUnion {
  oneof mixed {
    string mixed_string = 2;
    int32 mixed_int = 3;
  }
}

é traduzido na seguinte definição de tabela do ClickHouse:
CREATE TABLE IF NOT EXISTS `StringIntUnion`
(
    mixed_string Nullable(String),
    mixed_int Nullable(Int32)
) ENGINE = ...;
Esquemas Protobuf sem suporte
Os seguintes esquemas Protobuf não são suportados pelo conector:
  • uniões com várias mensagens (antes da versão 26.1 do CH)
syntax = "proto3";

package com.clickhouse.kafka.connect.proto.test;

message TwoRecords {
  oneof payload {
    TypeA type_a = 2;
    TypeB type_b = 3;
  }

  // traduz para Nullable(Tuple(label String)) no ClickHouse, o que não é suportado
  message TypeA {
    string label = 1;
  }

  // traduz para Nullable(Tuple(count Int32)) no ClickHouse, o que não é suportado
  message TypeB {
    int32 count = 1;
  }
}
A partir da versão 26.1 do CH, esse schema é suportado quando allow_experimental_nullable_tuple_type=1 (consulte esta página da documentação).
Suporte a schema JSON
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  }
}
Suporte a String
O conector oferece suporte ao String Converter em diferentes formatos do ClickHouse: JSON, CSV e TSV.
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "customInsertFormat": "true",
    "insertFormat": "CSV"
  }
}

Buffering interno

O buffering interno permite que a tarefa do sink acumule registros de várias chamadas poll() e os envie ao ClickHouse em batches maiores. Isso pode melhorar a vazão em workloads em que cada poll() produz muitos batches pequenos por partição. Comportamento principal:
  • bufferCount controla quantos registros são mantidos em buffer antes do flush.
  • bufferFlushTime define um tempo máximo de espera (em milissegundos) antes de fazer o flush dos registros em buffer.
  • bufferFlushTime só tem efeito quando bufferCount > 0.
  • bufferCount=0 e bufferFlushTime=0 mantêm o buffering desabilitado (comportamento padrão).
  • O buffering não é compatível quando exactlyOnce=true.
Por que o buffering é incompatível com o modo exactly-once: O buffering altera os limites dos batches, o que quebra a desduplicação de blocos do ClickHouse e a máquina de estados de offset do conector. Para resolver isso, desative o modo exactly-once com exactlyOnce=false na configuração do conector ou desative o buffering com bufferCount=0. Exemplo:
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "exactlyOnce": "false",
    "bufferCount": "5000",
    "bufferFlushTime": "2000"
  }
}

Logging

O logging é fornecido automaticamente pela plataforma Kafka Connect. O destino e o formato dos logs podem ser configurados por meio do arquivo de configuração do Kafka Connect. Se estiver usando o Confluent Platform, os logs poderão ser visualizados executando um comando de CLI:
confluent local services connect log
Para mais detalhes, confira o tutorial oficial.

Monitoramento

O ClickHouse Kafka Connect reporta métricas de runtime por meio do Java Management Extensions (JMX). O JMX vem habilitado no Kafka Connector por padrão.

Métricas específicas do ClickHouse

O conector expõe métricas personalizadas por meio do seguinte nome de MBean:
com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask{id}
Nome da métricaTipoDescrição
receivedRecordslongNúmero total de registros recebidos.
recordProcessingTimelongTempo total, em nanossegundos, gasto no agrupamento e na conversão de registros para uma estrutura unificada.
taskProcessingTimelongTempo total, em nanossegundos, gasto no processamento e na inserção de dados no ClickHouse.

Métricas de Produtor/Consumidor do Kafka

O conector expõe métricas padrão de produtor e consumidor do Kafka que fornecem informações sobre o fluxo de dados, a taxa de transferência e o desempenho. Métricas no Nível do Tópico:
  • records-sent-total: Número total de registros enviados para o tópico
  • bytes-sent-total: Total de bytes enviados para o tópico
  • record-send-rate: Taxa média de registros enviados por segundo
  • byte-rate: Taxa média de bytes enviados por segundo
  • compression-rate: Taxa de compressão obtida
Métricas no Nível da Partição:
  • records-sent-total: Total de registros enviados para a partição
  • bytes-sent-total: Total de bytes enviados para a partição
  • records-lag: Lag atual na partição
  • records-lead: Lead atual na partição
  • replica-fetch-lag: Informações de lag das réplicas
Métricas de Conexão no Nível do Nó:
  • connection-creation-total: Total de conexões criadas com o nó do Kafka
  • connection-close-total: Total de conexões encerradas
  • request-total: Total de solicitações enviadas ao nó
  • response-total: Total de respostas recebidas do nó
  • request-rate: Taxa média de solicitações por segundo
  • response-rate: Taxa média de respostas por segundo
Essas métricas ajudam a monitorar:
  • Taxa de transferência: Acompanhar as taxas de ingestão de dados
  • Lag: Identificar gargalos e atrasos no processamento
  • Compressão: Medir a eficiência da compressão de dados
  • Saúde da conexão: Monitorar a conectividade e a estabilidade da rede

Métricas do Kafka Connect Framework

O conector se integra ao framework do Kafka Connect e expõe métricas sobre o ciclo de vida das tarefas e o rastreamento de erros. Métricas de status das tarefas:
  • task-count: Número total de tarefas no conector
  • running-task-count: Número de tarefas em execução no momento
  • paused-task-count: Número de tarefas pausadas no momento
  • failed-task-count: Número de tarefas que falharam
  • destroyed-task-count: Número de tarefas destruídas
  • unassigned-task-count: Número de tarefas não atribuídas
Os valores de status das tarefas incluem: running, paused, failed, destroyed, unassigned Métricas de erro:
  • deadletterqueue-produce-failures: Número de gravações na DLQ que falharam
  • deadletterqueue-produce-requests: Total de tentativas de gravação na DLQ
  • last-error-timestamp: Timestamp do último erro
  • records-skip-total: Número total de registros ignorados devido a erros
  • records-retry-total: Número total de registros que passaram por nova tentativa
  • errors-total: Número total de erros encontrados
Métricas de desempenho:
  • offset-commit-failures: Número de commits de offset que falharam
  • offset-commit-avg-time-ms: Tempo médio dos commits de offset
  • offset-commit-max-time-ms: Tempo máximo dos commits de offset
  • put-batch-avg-time-ms: Tempo médio para processar um lote
  • put-batch-max-time-ms: Tempo máximo para processar um lote
  • source-record-poll-total: Total de registros coletados

Boas práticas de monitoramento

  1. Monitore o lag do consumidor: Acompanhe records-lag por partição para identificar gargalos de processamento
  2. Acompanhe as taxas de erro: Observe errors-total e records-skip-total para detectar problemas de qualidade dos dados
  3. Observe a integridade das tarefas: Monitore as métricas de status das tarefas para garantir que estejam em execução corretamente
  4. Meça a vazão: Use records-send-rate e byte-rate para acompanhar o desempenho da ingestão
  5. Monitore a integridade da conexão: Verifique as métricas de conexão no nível do nó para identificar problemas de rede
  6. Acompanhe a eficiência da compressão: Use compression-rate para otimizar a transferência de dados
Para ver definições detalhadas das métricas JMX e a integração com o Prometheus, consulte o arquivo de configuração jmx-export-connector.yml.

Limitações

  • Não há suporte a exclusões.
  • O tamanho do batch é herdado das propriedades do consumer do Kafka.
  • Ao usar o KeeperMap para exactly-once, se o offset for alterado ou recuado, será necessário excluir o conteúdo do KeeperMap para esse tópico específico. (Consulte o guia de solução de problemas abaixo para mais detalhes)

Ajuste de desempenho e otimização da vazão

Esta seção aborda estratégias de ajuste de desempenho para o ClickHouse Kafka Connect Sink. O ajuste de desempenho é essencial ao lidar com casos de uso de alta vazão ou quando é necessário otimizar o uso de recursos e minimizar o lag.

Quando o ajuste de desempenho é necessário?

O ajuste de desempenho normalmente é necessário nos seguintes cenários:
  • Cargas de trabalho de alta vazão: ao processar milhões de eventos por segundo de tópicos do Kafka
  • Consumer lag: quando seu conector não consegue acompanhar a taxa de produção de dados, causando um atraso cada vez maior
  • Restrições de recursos: quando você precisa otimizar o uso de CPU, memória ou rede
  • Múltiplos tópicos: ao consumir simultaneamente vários tópicos de alto volume
  • Mensagens pequenas: ao lidar com muitas mensagens pequenas que se beneficiariam do agrupamento em lotes no lado do servidor
O ajuste de desempenho normalmente NÃO é necessário quando:
  • Você está processando volumes baixos a moderados (< 10.000 mensagens/segundo)
  • O consumer lag é estável e aceitável para o seu caso de uso
  • As configurações padrão do conector já atendem aos seus requisitos de vazão
  • Seu cluster ClickHouse consegue lidar facilmente com a carga de entrada

Entendendo o fluxo de dados

Antes de ajustar, é importante entender como os dados fluem pelo conector:
  1. Kafka Connect Framework busca mensagens dos tópicos do Kafka em segundo plano
  2. O conector faz polling de mensagens no buffer interno do framework
  3. O conector agrupa as mensagens em lotes com base no tamanho do polling
  4. O ClickHouse recebe a inserção em lote via HTTP/S
  5. O ClickHouse processa a inserção (de forma síncrona ou assíncrona)
O desempenho pode ser otimizado em cada uma dessas etapas.

Ajuste do tamanho do lote no Kafka Connect

O primeiro nível de otimização é controlar a quantidade de dados que o conector recebe por lote do Kafka.
Configurações de fetch
O Kafka Connect (o framework) busca mensagens de tópicos do Kafka em segundo plano, independentemente do conector:
  • fetch.min.bytes: Quantidade mínima de dados antes de o framework repassar os dados ao conector (padrão: 1 byte)
  • fetch.max.bytes: Quantidade máxima de dados a buscar em uma única solicitação (padrão: 52428800 / 50 MB)
  • fetch.max.wait.ms: Tempo máximo de espera antes de retornar os dados se fetch.min.bytes não for atingido (padrão: 500 ms)

No Confluent Cloud, para ajustar essas configurações, é necessário abrir um chamado de suporte pelo Confluent Cloud.
Configurações de polling
O conector busca mensagens no buffer do framework por polling:
  • max.poll.records: Número máximo de registros retornados em uma única consulta de polling (padrão: 500)
  • max.partition.fetch.bytes: Quantidade máxima de dados por partição (padrão: 1048576 / 1 MB)

No Confluent Cloud, para ajustar essas configurações, é necessário abrir um chamado de suporte pelo Confluent Cloud.
Para obter o melhor desempenho com ClickHouse, procure usar lotes maiores:
# Aumentar o número de registros por poll
consumer.override.max.poll.records=5000

# Aumentar o tamanho de fetch da partição (5 MB)
consumer.override.max.partition.fetch.bytes=5242880

# Opcional: Aumentar o tamanho mínimo de fetch para aguardar mais dados (1 MB)
consumer.override.fetch.min.bytes=1048576

# Opcional: Reduzir o tempo de espera se a latência for crítica
consumer.override.fetch.max.wait.ms=300
As propriedades acima exigem que você habilite overrides de cliente na configuração do worker por meio de connector.client.config.override.policy=All. Consulte a documentação do Kafka Connect para mais informações.
Importante: As configurações de fetch do Kafka Connect representam dados compactados, enquanto o ClickHouse recebe dados não compactados. Ajuste essas configurações com base na sua taxa de compressão. Trade-offs:
  • Lotes maiores = Melhor desempenho de ingestão no ClickHouse, menos partes, menor sobrecarga
  • Lotes maiores = Maior uso de memória, com possível aumento da latência de ponta a ponta
  • Lotes grandes demais = Risco de timeouts, erros de OutOfMemory ou de exceder max.poll.interval.ms
Mais detalhes: documentação da Confluent | documentação do Kafka

Inserções assíncronas

As inserções assíncronas são um recurso poderoso quando o conector envia lotes relativamente pequenos ou quando você quer otimizar ainda mais a ingestão, transferindo para o ClickHouse a responsabilidade pelo agrupamento em lotes.
Quando usar inserções assíncronas
Considere ativar inserções assíncronas quando:
  • Muitos lotes pequenos: Seu conector envia pequenos lotes com frequência (< 1000 linhas por lote)
  • Alta concorrência: Várias tarefas do conector estão gravando na mesma tabela
  • Implantação distribuída: Você executa muitas instâncias do conector em hosts diferentes
  • Sobrecarga na criação de partes: Você está enfrentando erros de “too many partes”
  • Carga de trabalho mista: Combinação de ingestão em tempo real com cargas de trabalho de consulta
NÃO use inserções assíncronas quando:
  • Você já estiver enviando lotes grandes (> 10.000 linhas por lote) com frequência controlada
  • Você precisar de visibilidade imediata dos dados (as consultas precisam ver os dados instantaneamente)
  • A semântica exactly-once com wait_for_async_insert=0 entrar em conflito com seus requisitos
  • Seu caso de uso puder se beneficiar, em vez disso, de melhorias no batching no lado do cliente
Como as inserções assíncronas funcionam
Com as inserções assíncronas ativadas, o ClickHouse:
  1. Recebe a consulta INSERT do conector
  2. Grava os dados em um buffer na memória (em vez de gravá-los imediatamente no disco)
  3. Retorna sucesso ao conector (se wait_for_async_insert=0)
  4. Grava o buffer no disco quando uma destas condições é atendida:
    • O buffer atinge async_insert_max_data_size (padrão: 100 MB)
    • async_insert_busy_timeout_ms milissegundos se passaram desde a primeira inserção (padrão: 1000 ms)
    • Número máximo de consultas acumuladas (async_insert_max_query_number, padrão: 100)
Isso reduz significativamente o número de partes criadas e melhora a vazão geral.
Habilitando inserções assíncronas
Adicione as configurações de async insert ao parâmetro de configuração clickhouseSettings:
{
  "name": "clickhouse-connect",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    ...
    "clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
  }
}
Principais configurações:
  • async_insert=1: Habilita inserções assíncronas
  • wait_for_async_insert=1 (recomendado): O conector espera os dados serem gravados no armazenamento do ClickHouse antes de confirmar o recebimento. Isso oferece garantias de entrega.
  • wait_for_async_insert=0: O conector confirma o recebimento imediatamente após armazenar os dados em buffer. Melhor desempenho, mas os dados podem ser perdidos se o servidor falhar antes da gravação.
Ajuste do comportamento de inserção assíncrona
Você pode ajustar o comportamento de flush do async insert:
"clickhouseSettings": "async_insert=1,wait_for_async_insert=1,async_insert_max_data_size=104857600,async_insert_busy_timeout_ms=1000"
Parâmetros de ajuste comuns:
  • async_insert_max_data_size (padrão: 104857600 / 100 MB): Tamanho máximo do buffer antes da gravação
  • async_insert_busy_timeout_ms (padrão: 1000): Tempo máximo (ms) antes da gravação
  • async_insert_stale_timeout_ms (padrão: 0): Tempo (ms) desde o último insert antes da gravaação
  • async_insert_max_query_number (padrão: 100): Número máximo de consultas antes da gravação
Trade-offs:
  • Benefícios: Menos partes, melhor desempenho de merge, menor sobrecarga de CPU, maior throughput em cenários de alta concorrência
  • Considerações: Os dados não ficam imediatamente disponíveis para consulta, latência de ponta a ponta um pouco maior
  • Riscos: Perda de dados em caso de falha do servidor se wait_for_async_insert=0, possível pressão de memória com buffers grandes
Inserções assíncronas com semântica de exactly-once
Ao usar exactlyOnce=true com inserções assíncronas:
{
  "config": {
    "exactlyOnce": "true",
    "clickhouseSettings": "async_insert=1,wait_for_async_insert=1"
  }
}
Importante: Sempre use wait_for_async_insert=1 com exactly-once para garantir que os commits de offset ocorram somente depois que os dados forem persistidos. Para mais informações sobre async inserts, consulte a documentação de async inserts do ClickHouse.

Paralelismo do conector

Aumente o paralelismo para melhorar a vazão:
Tarefas por conector
"tasks.max": "4"
Cada tarefa processa um subconjunto das partições do tópico. Mais tarefas = mais paralelismo, mas:
  • O número máximo efetivo de tarefas = número de partições do tópico
  • Cada tarefa mantém sua própria conexão com o ClickHouse
  • Mais tarefas = maior sobrecarga e possível contenção de recursos
Recomendação: Comece com tasks.max igual ao número de partições do tópico e depois ajuste com base nas métricas de CPU e vazão.
Ignorando partições no agrupamento em lotes
Por padrão, o conector agrupa as mensagens em lotes por partição. Para obter maior vazão, você pode agrupar em lotes entre partições:
"ignorePartitionsWhenBatching": "true"
** Aviso**: Use apenas quando exactlyOnce=false. Essa configuração pode melhorar a vazão ao criar batches maiores, mas elimina as garantias de ordenação em cada partição.

Múltiplos tópicos de alta vazão

Se o conector estiver configurado para assinar vários tópicos, você estiver usando topic2TableMap para mapear tópicos para tabelas e estiver enfrentando um gargalo na inserção, o que resulta em consumer lag, considere criar um conector por tópico. O principal motivo para isso acontecer é que, atualmente, os lotes são inseridos em cada tabela em série. Recomendação: para vários tópicos de alto volume, implante uma instância de conector por tópico para maximizar a vazão de inserção em paralelo.

Considerações sobre o engine de tabela do ClickHouse

Escolha o engine de tabela do ClickHouse adequado para seu caso de uso:
  • MergeTree: Melhor para a maioria dos casos de uso; equilibra o desempenho de consulta e inserção
  • ReplicatedMergeTree: Necessário para alta disponibilidade; adiciona sobrecarga de replicação
  • *MergeTree com ORDER BY adequado: Otimize para seus padrões de consulta
Configurações a considerar:
CREATE TABLE my_table (...)
ENGINE = MergeTree()
ORDER BY (timestamp, id)
SETTINGS 
    -- Aumentar o número máximo de insert threads para escrita paralela de partes
    max_insert_threads = 4,
    -- Permitir inserts com quorum para maior confiabilidade (ReplicatedMergeTree)
    insert_quorum = 2
Para as configurações de insert em nível de conector:
"clickhouseSettings": "insert_quorum=2,insert_quorum_timeout=60000"

Pool de conexões e timeouts

O conector mantém conexões HTTP com o ClickHouse. Ajuste os timeouts para redes com alta latência:
"clickhouseSettings": "socket_timeout=300000,connection_timeout=30000"
  • socket_timeout (padrão: 30000 ms): Tempo máximo para operações de leitura
  • connection_timeout (padrão: 10000 ms): Tempo máximo para estabelecer a conexão
Aumente esses valores se ocorrerem erros de timeout com lotes grandes.

Monitoramento e solução de problemas de desempenho

Monitore estas métricas principais:
  1. Consumer lag: Use ferramentas de monitoramento do Kafka para acompanhar o lag por partição
  2. Métricas do conector: Monitore receivedRecords, recordProcessingTime, taskProcessingTime via JMX (consulte Monitoring)
  3. Métricas do ClickHouse:
    • system.asynchronous_inserts: Monitore o uso do buffer de async insert
    • system.parts: Monitore o número de partes para detectar problemas de merge
    • system.merges: Monitore merges ativos
    • system.events: Acompanhe InsertedRows, InsertedBytes, FailedInsertQuery
Problemas comuns de desempenho:
SintomaPossível causaSolução
Consumer lag altoLotes pequenos demaisAumente max.poll.records, habilite inserções assíncronas
Erros de “Too many partes”Inserções pequenas e frequentesHabilite inserções assíncronas, aumente o tamanho do lote
Erros de timeoutLote grande, rede lentaReduza o tamanho do lote, aumente socket_timeout, verifique a rede
Alto uso de CPUMuitas partes pequenasHabilite inserções assíncronas, aumente as configurações de merge
Erros de OutOfMemoryTamanho do lote grande demaisReduza max.poll.records, max.partition.fetch.bytes
Carga desigual entre tarefasDistribuição desigual de partiçõesRebalanceie as partições ou ajuste tasks.max

Resumo das boas práticas

  1. Comece com os padrões, depois meça e ajuste com base no desempenho real
  2. Prefira lotes maiores: Busque 10.000-100.000 linhas por insert, quando possível
  3. Use inserções assíncronas ao enviar muitos lotes pequenos ou em cenários de alta concorrência
  4. Sempre use wait_for_async_insert=1 com semântica de exactly-once
  5. Escale horizontalmente: Aumente tasks.max até o número de partições
  6. Um conector por tópico de alto volume para obter vazão máxima
  7. Monitore continuamente: Acompanhe o consumer lag, a contagem de partes e a atividade de merge
  8. Teste minuciosamente: Sempre teste alterações de configuração sob carga realista antes da implantação em produção

Exemplo: Configuração de alta vazão

Aqui está um exemplo completo otimizado para alta vazão:
{
  "name": "clickhouse-high-throughput",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "8",
    
    "topics": "high_volume_topic",
    "hostname": "my-clickhouse-host.cloud",
    "port": "8443",
    "database": "default",
    "username": "default",
    "password": "<PASSWORD>",
    "ssl": "true",
    
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    
    "exactlyOnce": "false",
    "ignorePartitionsWhenBatching": "true",
    
    "consumer.override.max.poll.records": "10000",
    "consumer.override.max.partition.fetch.bytes": "5242880",
    "consumer.override.fetch.min.bytes": "1048576",
    "consumer.override.fetch.max.wait.ms": "500",
    
    "clickhouseSettings": "async_insert=1,wait_for_async_insert=1,async_insert_max_data_size=16777216,async_insert_busy_timeout_ms=1000,socket_timeout=300000"
  }
}
A configuração do conector acima exige que você habilite sobrescritas de cliente na configuração do seu worker por meio de connector.client.config.override.policy=All. Consulte a documentação do Kafka Connect para mais informações.
Esta configuração:
  • Processa até 10.000 registros por poll
  • Agrupa partições em lotes para inserts maiores
  • Usa inserções assíncronas com buffer de 16 MB
  • Executa 8 tarefas em paralelo (corresponda à sua quantidade de partições)
  • Otimizada para vazão em vez de ordenação estrita

Solução de problemas

”Inconsistência de estado para o tópico [someTopic] partição [0]

Isso acontece quando o offset armazenado no KeeperMap é diferente do offset armazenado no Kafka, geralmente quando um tópico foi excluído ou quando o offset foi ajustado manualmente. Para corrigir isso, você precisará excluir os valores antigos armazenados para esse tópico + partição:
-- Primeiro, identifique o banco de dados usado para armazenar os dados.
SELECT * FROM [database].connect_state

-- Identifique a chave que corresponde ao tópico e à partição.
ALTER TABLE [database].connect_state DELETE WHERE key = [keyname]
Esse ajuste pode ter implicações para a semântica exactly-once.

”Quais erros o conector tentará novamente?”

No momento, o foco está em identificar erros transitórios para os quais é possível fazer nova tentativa, incluindo:
  • ClickHouseException - Esta é uma exceção genérica que pode ser lançada pelo ClickHouse. Em geral, ela é lançada quando o servidor está sobrecarregado, e os seguintes códigos de erro são considerados especialmente transitórios:
    • 3 - UNEXPECTED_END_OF_FILE
    • 107 - FILE_DOESNT_EXIST
    • 159 - TIMEOUT_EXCEEDED
    • 164 - READONLY
    • 202 - TOO_MANY_SIMULTANEOUS_QUERIES
    • 203 - NO_FREE_CONNECTION
    • 209 - SOCKET_TIMEOUT
    • 210 - NETWORK_ERROR
    • 241 - MEMORY_LIMIT_EXCEEDED
    • 242 - TABLE_IS_READ_ONLY
    • 252 - TOO_MANY_PARTS
    • 285 - TOO_FEW_LIVE_REPLICAS
    • 319 - UNKNOWN_STATUS_OF_INSERT
    • 425 - SYSTEM_ERROR
    • 999 - KEEPER_EXCEPTION
  • SocketTimeoutException - Esta é lançada quando ocorre timeout no socket.
  • UnknownHostException - Esta é lançada quando não é possível resolver o host.
  • IOException - Esta é lançada quando há um problema de rede.

”Todos os meus dados estão em branco/zerados”

Provavelmente, os campos nos seus dados não correspondem aos campos da tabela — isso é especialmente comum com CDC (e o formato Debezium). Uma solução comum é adicionar a transformação flatten à configuração do seu conector:
transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=_
Isso transformará seus dados de um JSON aninhado em um JSON plano (usando _ como delimitador). Os campos na tabela passarão então a seguir o formato “campo1_campo2_campo3” (ou seja, “before_id”, “after_id”, etc.).

”Quero usar minhas chaves do Kafka no ClickHouse”

As chaves do Kafka não são armazenadas no campo value por padrão, mas você pode usar a transformação KeyToValue para mover a chave para o campo value (com o novo nome de campo _key):
transforms=keyToValue
transforms.keyToValue.type=com.clickhouse.kafka.connect.transforms.KeyToValue
transforms.keyToValue.field=_key
Última modificação em 10 de junho de 2026