跳转到主要内容
DataStore 性能分析器可帮助您测量执行时间并找出性能瓶颈。

快速入门

from chdb import datastore as pd
from chdb.datastore.config import config, get_profiler

# 启用性能分析
config.enable_profiling()

# 执行操作
ds = pd.read_csv("large_data.csv")
result = (ds
    .filter(ds['amount'] > 100)
    .groupby('category')
    .agg({'amount': 'sum'})
    .sort('sum', ascending=False)
    .head(10)
    .to_df()
)

# 查看报告
profiler = get_profiler()
print(profiler.report())

启用性能分析

from chdb.datastore.config import config

# 启用性能分析
config.enable_profiling()

# 禁用性能分析
config.disable_profiling()

# 检查性能分析是否已启用
print(config.profiling_enabled)  # True 或 False

性能分析器 API

获取 性能分析器

from chdb.datastore.config import get_profiler

profiler = get_profiler()

report()

显示性能分析报告。
profiler.report(min_duration_ms=0.1)
参数:
参数类型默认值描述
min_duration_msfloat0.1仅显示耗时 >= 该值的步骤
示例输出:
======================================================================
EXECUTION PROFILE
======================================================================
   45.79ms (100.0%) Total Execution
     23.25ms ( 50.8%) Query Planning [ops_count=2]
     22.29ms ( 48.7%) SQL Segment 1 [ops=2]
       20.48ms ( 91.9%) SQL Execution
        1.74ms (  7.8%) Result to DataFrame
----------------------------------------------------------------------
      TOTAL:    45.79ms
======================================================================
报告显示:
  • 每个步骤的耗时 (毫秒)
  • 父级时间/总时间的百分比
  • 操作的层级嵌套关系
  • 每个步骤的元数据 (例如 ops_countops)

step()

手动统计代码块的耗时。
with profiler.step("custom_operation"):
    # 在此处编写你的代码
    expensive_operation()

clear()

清除所有性能分析数据。
profiler.clear()

summary()

获取步骤名称到耗时 (毫秒) 的字典。
summary = profiler.summary()
for name, duration in summary.items():
    print(f"{name}: {duration:.2f}ms")
示例输出:
Total Execution: 45.79ms
Total Execution.Cache Check: 0.00ms
Total Execution.Query Planning: 23.25ms
Total Execution.SQL Segment 1: 22.29ms
Total Execution.SQL Segment 1.SQL Execution: 20.48ms
Total Execution.SQL Segment 1.Result to DataFrame: 1.74ms

了解报告

步骤名称

步骤名称描述
Total Execution总执行时间
Query Planning查询计划耗时
SQL Segment N执行 SQL 片段 N
SQL Execution实际 SQL 查询执行
Result to DataFrame将结果转换为 pandas DataFrame
Cache Check检查查询缓存
Cache Write将结果写入缓存

耗时

  • 规划步骤 (查询规划) :通常很快
  • 执行步骤 (SQL 执行) :实际执行工作的环节
  • 传输步骤 (结果转为 DataFrame) :将数据转换为 pandas

识别性能瓶颈

======================================================================
EXECUTION PROFILE
======================================================================
  200.50ms (100.0%) Total Execution
    10.25ms (  5.1%) Query Planning [ops_count=4]
   190.00ms ( 94.8%) SQL Segment 1 [ops=4]
     185.00ms ( 97.4%) SQL Execution    <- Main bottleneck
       5.00ms (  2.6%) Result to DataFrame
----------------------------------------------------------------------
      TOTAL:   200.50ms
======================================================================

性能分析方式

分析单个查询

config.enable_profiling()
profiler = get_profiler()
profiler.clear()  # 清除之前的数据

# 执行查询
result = ds.filter(...).groupby(...).agg(...).to_df()

# 查看此查询的 profile
print(profiler.report())

分析多个查询的性能

config.enable_profiling()
profiler = get_profiler()
profiler.clear()

# 查询 1
with profiler.step("Query 1"):
    result1 = query1.to_df()

# 查询 2
with profiler.step("Query 2"):
    result2 = query2.to_df()

print(profiler.report())

方案对比

profiler = get_profiler()

# 方法 1:先过滤再分组
profiler.clear()
with profiler.step("filter_then_groupby"):
    result1 = ds.filter(ds['x'] > 10).groupby('y').sum().to_df()
summary1 = profiler.summary()
time1 = summary1.get('filter_then_groupby', 0)

# 方法 2:先分组再过滤
profiler.clear()
with profiler.step("groupby_then_filter"):
    result2 = ds.groupby('y').sum().filter(ds['x'] > 10).to_df()
summary2 = profiler.summary()
time2 = summary2.get('groupby_then_filter', 0)

print(f"Approach 1: {time1:.2f}ms")
print(f"Approach 2: {time2:.2f}ms")
print(f"Winner: {'Approach 1' if time1 < time2 else 'Approach 2'}")

优化建议

1. 检查 SQL 执行时间

如果 SQL execution 是瓶颈:
  • 添加更多过滤条件以减少数据量
  • 使用 Parquet 而不是 CSV
  • 检查是否使用了合适的索引 (针对数据库来源)

2. 检查 I/O 时间

如果 read_csvread_parquet 是瓶颈:
  • 使用 Parquet (列式、已压缩)
  • 只读取所需的列
  • 如果可能,尽量在源头过滤

3. 检查数据传输

如果 to_df 很慢:
  • 结果集可能过大
  • 增加更多过滤条件或设置限制
  • 使用 head() 预览

4. 引擎对比

from chdb.datastore.config import config

# 使用 chdb 进行性能分析
config.use_chdb()
profiler.clear()
result_chdb = query.to_df()
time_chdb = profiler.total_duration_ms

# 使用 pandas 进行性能分析
config.use_pandas()
profiler.clear()
result_pandas = query.to_df()
time_pandas = profiler.total_duration_ms

print(f"chdb: {time_chdb:.2f}ms")
print(f"pandas: {time_pandas:.2f}ms")

最佳实践

1. 先进行性能分析,再优化

# 不要猜测——要测量!
config.enable_profiling()
result = your_query.to_df()
print(get_profiler().report())

2. 每次测试之间清理

profiler.clear()  # 清除之前的数据
# 运行测试
print(profiler.report())

3. 使用 min_duration_ms 聚焦重点

# 仅显示耗时 >= 100ms 的操作
profiler.report(min_duration_ms=100)

4. 分析有代表性的数据

# 使用真实数据规模进行性能分析
# 小型测试数据可能无法暴露真实瓶颈

5. 在生产环境中禁用

# 开发环境
config.enable_profiling()

# 生产环境
config.set_profiling_enabled(False)  # 避免额外开销

示例:完整性能分析会话

from chdb import datastore as pd
from chdb.datastore.config import config, get_profiler

# 设置
config.enable_profiling()
config.enable_debug()  # 同时查看正在发生的情况
profiler = get_profiler()

# 加载数据
profiler.clear()
print("=== Loading Data ===")
ds = pd.read_csv("sales_2024.csv")  # 1000万行
print(profiler.report())

# 查询 1:简单过滤器
profiler.clear()
print("\n=== Query 1: Simple Filter ===")
result1 = ds.filter(ds['amount'] > 1000).to_df()
print(profiler.report())

# 查询 2:复杂聚合
profiler.clear()
print("\n=== Query 2: Complex Aggregation ===")
result2 = (ds
    .filter(ds['amount'] > 100)
    .groupby('region', 'category')
    .agg({
        'amount': ['sum', 'mean', 'count'],
        'quantity': 'sum'
    })
    .sort('sum', ascending=False)
    .head(20)
    .to_df()
)
print(profiler.report())

# 摘要
print("\n=== Summary ===")
print(f"Query 1: {len(result1)} rows")
print(f"Query 2: {len(result2)} rows")
最后修改于 2026年6月10日