基于 eBPF 的边覆盖度测试方法指导
本文由 AI 生成,内容为通用方法论指导,请结合实际场景评估适用性。
基于 eBPF 的边覆盖度测试方法指导
本文档是一份通用方法论,指导如何在任意 DBMS 或 C/C++ 应用项目中实现基于 eBPF 的运行时边覆盖度采集。可独立使用,也可与 gcov/lcov 行覆盖度配合形成双覆盖度系统。
目录
- 什么是边覆盖度
- 为什么需要边覆盖度
- 技术选型与前置条件
- 实施步骤总览
- Step 1:确定插桩目标函数
- Step 2:编写 bpftrace 探针脚本
- Step 3:实现探针挂载/卸载逻辑
- Step 4:实现日志去重与累积
- Step 5:构建采集守护进程
- Step 6:数据持久化与历史记录
- Step 7:可视化与报告
- Step 8:与行覆盖度集成
- 容器化部署注意事项
- 常见问题与排错
- 附录:可复用脚本模板
1. 什么是边覆盖度
1.1 定义
边覆盖度度量的是:在程序运行过程中,有多少种不同的执行路径被触发过。
具体定义为:唯一 (目标函数 + 完整调用栈) 组合的数量。
- 对同一目标函数
foo()的两次调用,如果调用栈不同(例如一次来自main→bar→foo,另一次来自main→baz→foo),计为 2 条不同的边 - 如果调用栈完全相同,即使时间不同,也只计为 1 条边
1.2 与传统覆盖度的区别
| 维度 | 行覆盖度 (gcov) | 分支覆盖度 (gcov) | 边覆盖度 (eBPF) |
|---|---|---|---|
| 度量对象 | 被执行的源码行 | 被执行的分支 | 唯一执行路径 |
| 插桩时机 | 编译时 | 编译时 | 运行时 |
| 粒度 | 源码行级 | 条件分支级 | 调用路径级 |
| 信息维度 | 二维(文件+行号) | 二维(文件+分支点) | 多维(函数+完整调用栈) |
| 典型问题 | “这行代码跑了没?” | “if/else 都走到了没?” | “这个函数通过哪些不同路径被调用?” |
1.3 适用场景
边覆盖度特别适合以下测试场景:
- 模糊测试 (Fuzzing):评估 fuzzer 是否探索到了足够多的代码路径
- 回归测试评估:检查回归测试用例是否覆盖了关键执行路径
- 并发/事务测试:区分同一函数在不同事务流程中的调用路径(如事务提交 vs 后台刷写)
- 性能测试路径分析:识别热路径和冷路径
2. 为什么需要边覆盖度
2.1 行覆盖度的局限
行覆盖度只能告诉你”哪些代码行被执行了”,但无法区分:
场景 A:SELECT → row_search_mvcc → buf_page_get_gen
场景 B:UPDATE → row_ins_clust_index_entry → buf_page_get_gen
两次调用 buf_page_get_gen 在行覆盖中只计一次,但它们代表完全不同的执行路径和业务语义。
2.2 边覆盖度的价值
- 路径多样性:量化测试用例触发了多少种不同的调用路径
- 深度覆盖:发现仅靠行覆盖无法暴露的深层路径(如异常处理路径、并发竞态路径)
- 零停机:运行时动态插桩,不需要重启被测进程
- 聚焦关键函数:只在关心的函数上插桩,避免全量插桩的性能开销
3. 技术选型与前置条件
3.1 核心技术栈
| 组件 | 用途 | 最低版本 |
|---|---|---|
| bpftrace | eBPF 探针脚本语言 | v0.16+ (推荐 v0.20+) |
| Linux Kernel | eBPF 运行时支持 | 5.4+ (推荐 5.10+) |
| Python 3 | 日志去重与数据处理 | 3.6+ |
| Bash | 编排脚本 | 4.0+ |
3.2 内核要求
# 检查内核版本
uname -r # 需要 5.4+
# 检查 eBPF 支持
ls /sys/kernel/debug/tracing/uprobes # 应存在
# 检查 bpftrace
bpftrace --version
# 检查权限(需要 root 或 CAP_BPF + CAP_TRACING)
sudo bpftrace -e 'BEGIN { printf("ok\n"); exit(); }'
3.3 被测程序要求
- 必须是未剥离符号的 ELF 二进制(不能
strip) - 如果运行在容器中,宿主机需要能访问容器内二进制文件
- 编译时不要
-fvisibility=hidden(否则 uprobe 无法匹配符号) - 推荐保留调试信息(
-g),但非必须
3.4 决策树:是否适合使用边覆盖度
你的测试是否涉及长时间运行的进程?
├── 否 → 考虑编译时插桩方案(AFL、gcov 等)
└── 是 → 进一步判断
├── 你是否关注"哪些路径被触发"而非仅"哪些行被执行"?
│ ├── 否 → gcov 行覆盖足够
│ └── 是 → 边覆盖度适合
└── 被测进程是否为 C/C++ 原生二进制?
├── 否(JVM/Go/解释型语言)→ 需要适配,见 14.5 节
└── 是 → 直接适用本方案
4. 实施步骤总览
┌──────────────────────────────────────────────────────────────────┐
│ 边覆盖度实施路线图 │
│ │
│ Step 1: 确定插桩目标函数 │
│ ↓ │
│ Step 2: 编写 bpftrace 探针脚本 │
│ ↓ │
│ Step 3: 实现探针挂载/卸载逻辑 │
│ ↓ │
│ Step 4: 实现日志去重与累积 │
│ ↓ │
│ Step 5: 构建采集守护进程 │
│ ↓ │
│ Step 6: 数据持久化与历史记录 │
│ ↓ │
│ Step 7: 可视化与报告 │
│ ↓ │
│ Step 8: (可选) 与行覆盖度集成 │
└──────────────────────────────────────────────────────────────────┘
5. Step 1:确定插桩目标函数
5.1 选择原则
不是所有函数都值得插桩。选择标准:
- 业务关键路径上的入口函数:如命令分发、请求处理、事务提交
- 被多种路径调用的热点函数:同一个函数被不同业务流程调用时调用栈不同
- 子系统边界函数:跨子系统的调用点,如 SQL 层到存储引擎层的入口
- 并发/事务相关函数:锁操作、缓冲区管理、日志写入
5.2 选择方法
# 方法 1:通过代码分析确定关键函数
# 查找函数被调用次数最多的函数(callers 多样性高)
objdump -d /path/to/binary | grep 'call.*func_name' | wc -l
# 方法 2:使用 perf 初步识别热点
perf record -g -p <pid> -- sleep 60
perf report # 查看 callers 分布
# 方法 3:通过 ctags/cscope 统计函数引用
grep -r "func_name" --include="*.c" --include="*.cpp" | wc -l
5.3 函数数量建议
| 场景 | 建议数量 | 说明 |
|---|---|---|
| 快速验证 | 3-5 个 | 核心入口函数 |
| 标准覆盖 | 8-15 个 | 覆盖主要子系统边界 |
| 深度分析 | 20-30 个 | 需评估 bpftrace 性能开销 |
5.4 实例:DBMS 项目的目标函数
以 MySQL/InnoDB 为例,分层选择:
| 层次 | 函数 | 为什么选它 |
|---|---|---|
| SQL 入口 | dispatch_command |
所有 SQL 的入口 |
| SQL 解析 | mysql_parse |
语法分析 |
| SQL 执行 | mysql_execute_command |
执行器入口 |
| 事务 | trx_commit |
事务提交路径 |
| 读操作 | row_search_mvcc |
MVCC 读 |
| 锁 | lock_rec_lock |
记录锁操作 |
| 缓冲池 | buf_page_get_gen |
页面获取 |
| 写操作 | row_ins_clust_index_entry, row_upd_clust_step |
聚簇索引写 |
| B-tree | btr_cur_search_to_nth_level |
B-tree 搜索 |
6. Step 2:编写 bpftrace 探针脚本
6.1 脚本模板
以下是一个通用模板,替换 TARGET_BINARY、TARGET_PID 和函数名即可:
#!/usr/bin/env bpftrace
BEGIN { printf("=== edge tracing started (PID=%d) ===\n", pid); }
// ---- 复制此 block 并替换函数名 ----
uprobe:TARGET_BINARY:*FUNC_NAME_1*
/pid == TARGET_PID/
{
printf("CALL FUNC_NAME_1\n");
printf("%s\n", ustack());
}
// ---- end block ----
// ---- 重复添加更多函数 ----
uprobe:TARGET_BINARY:*FUNC_NAME_2*
/pid == TARGET_PID/
{
printf("CALL FUNC_NAME_2\n");
printf("%s\n", ustack());
}
END { printf("=== edge tracing stopped ===\n"); }
6.2 关键语法说明
# uprobe 语法
uprobe:<binary_path>:<symbol_pattern>
# PID 过滤(只捕获目标进程)
/pid == TARGET_PID/
# 用户态调用栈
ustack()
# 输出标记(用于后续解析)
printf("CALL <function_name>\n")
6.3 C++ 符号匹配
C++ 函数经过 name mangling,直接使用源码函数名可能匹配不到。解决方案:
# 方法 1:使用 wildcard 匹配(推荐)
uprobe:/path/to/binary:*my_function* # 匹配 _Z11my_functionv 等
# 方法 2:查看 mangled 名称
nm /path/to/binary | grep my_function
# 方法 3:使用 demangled 名称(bpftrace 0.17+)
# bpftrace 自动 demangle,但 wildcard 更可靠
6.4 模板实例化脚本
生产中使用占位符 + sed 替换,避免硬编码:
# edge.bt 模板内容
TEMPLATE='#!/usr/bin/env bpftrace
BEGIN { printf("=== edge tracing started (PID=%d) ===\n", pid); }
uprobe:BIN_PATH:*dispatch_command*
/pid == TARGET_PID/
{
printf("CALL dispatch_command\n");
printf("%s\n", ustack());
}
END { printf("=== edge tracing stopped ===\n"); }'
# 运行时实例化
sed -e "s|BIN_PATH|/usr/bin/mysqld|g" \
-e "s/TARGET_PID/12345/g" \
edge.bt > edge_run.bt
sudo bpftrace edge_run.bt > edges.log 2>&1 &
6.5 验证探针是否正常工作
# 快速测试:手动触发一次
sudo bpftrace -e '
uprobe:/usr/bin/mysqld:*dispatch_command*
/pid == 12345/
{
printf("HIT dispatch_command\n");
printf("%s\n", ustack());
}
' -c 'sleep 1' # 1 秒超时
# 预期输出:如果在 1 秒内有 SQL 查询执行,会看到 HIT 行 + 调用栈
# 如果无输出:检查 PID 是否正确、符号是否匹配、进程是否在执行
7. Step 3:实现探针挂载/卸载逻辑
7.1 挂载流程
edge_trace_attach() {
local binary_path="$1"
local target_pid="$2"
local output_dir="$3"
local template="$4"
# 前置检查
if ! sudo -n bpftrace --version &>/dev/null; then
echo "ERROR: bpftrace not available or no sudo permission"
return 1
fi
if ! sudo test -f "$binary_path"; then
echo "ERROR: binary not found: $binary_path"
return 1
fi
# 生成实例化脚本
local ts=$(date +%Y%m%d_%H%M%S)
local run_bt="${output_dir}/edge_run_${ts}.bt"
sed -e "s|BIN_PATH|${binary_path}|g" \
-e "s/TARGET_PID/${target_pid}/g" \
"$template" > "$run_bt"
# 后台启动 bpftrace
local log_file="${output_dir}/edges_${ts}.log"
sudo bpftrace "$run_bt" >> "$log_file" 2>&1 &
BPF_PID=$!
# 验证启动成功
sleep 3
if ! kill -0 "$BPF_PID" 2>/dev/null; then
echo "ERROR: bpftrace failed to start, check $log_file"
BPF_PID=""
return 1
fi
echo "bpftrace attached (pid=$BPF_PID, target=$target_pid)"
LOG_FILE="$log_file"
LOG_TS="$ts"
}
7.2 卸载流程
edge_trace_detach() {
if [[ -z "${BPF_PID:-}" ]]; then
return 0
fi
echo "Detaching bpftrace (pid=$BPF_PID)"
# 优雅停止(SIGINT 让 bpftrace 输出 END 块)
sudo kill -INT "$BPF_PID" 2>/dev/null || true
# 等待退出,最多 10 秒
local waited=0
while [[ $waited -lt 10 ]]; do
if ! kill -0 "$BPF_PID" 2>/dev/null; then
echo "bpftrace stopped after ${waited}s"
break
fi
sleep 1
((waited++))
done
# 强制兜底
sudo kill -9 "$BPF_PID" 2>/dev/null || true
BPF_PID=""
}
7.3 容器内进程的 PID 解析
如果被测程序运行在 Docker 容器中:
get_container_pid() {
local container="$1"
docker inspect --format '' "$container"
}
get_target_pid_in_container() {
local container="$1"
local target_comm="$2" # 如 "mysqld"
local container_pid=$(get_container_pid "$container")
# 遍历容器 init 进程的子进程,精确匹配进程名
ps --ppid "$container_pid" -o pid=,comm= 2>/dev/null \
| awk -v target="$target_comm" '$2 == target {print $1; exit}'
}
注意:使用 comm 精确匹配而非进程名模糊匹配,避免误匹配辅助进程(如 mysqld_safe vs mysqld)。
7.4 容器内二进制路径解析
get_binary_path_in_container() {
local container="$1"
local relative_path="$2" # 如 "/usr/local/mysql/bin/mysqld"
# 方法 1:Docker OverlayFS 物理路径(推荐,无 I/O 竞争)
local merged_dir
merged_dir=$(docker inspect --format '' "$container" 2>/dev/null)
local physical_path="${merged_dir}${relative_path}"
if [[ -n "$merged_dir" ]] && sudo test -f "$physical_path"; then
echo "$physical_path"
return 0
fi
# 方法 2:通过 /proc/PID/root/ 访问(回退方案)
local container_pid=$(get_container_pid "$container")
local proc_path="/proc/${container_pid}/root${relative_path}"
if sudo test -f "$proc_path"; then
echo "$proc_path"
return 0
fi
echo "ERROR: cannot resolve binary path" >&2
return 1
}
8. Step 4:实现日志去重与累积
8.1 为什么需要去重
bpftrace 每次函数调用都输出一行 CALL + 完整调用栈。在高频调用场景下(如 buf_page_get_gen 每秒可能触发数千次),原始日志会非常大。去重可以:
- 将数千条重复日志压缩为少量唯一路径
- 实现跨时间窗口的累积统计
- 支持增量分析(每个 epoch 新增了多少条路径)
8.2 去重算法
核心思想:将每条调用路径视为一个有序字符串列表,用 Python set 的 tuple 去重。
#!/usr/bin/env python3
"""
edge_dedup.py - 边覆盖度日志去重工具
用法:
python3 edge_dedup.py <raw_log> [--cumulative <cumulative_file>]
输出:
<raw_log>.uniq - 单次去重结果
<cumulative_file> - 累积去重结果(可选)
stdout: EDGE_RAW=N EDGE_UNIQUE=M EDGE_CUMULATIVE=K
"""
import sys
import os
def parse_edges(filepath):
"""将原始日志解析为边的列表。每条边 = [CALL行, 栈帧1, 栈帧2, ...]"""
edges = []
current_edge = []
with open(filepath) as f:
for line in f:
# 过滤 bpftrace 元数据
if line.startswith('Attaching') or \
line.startswith('=== edge') or \
line.startswith('=== eBPF') or \
not line.strip():
continue
if line.startswith('CALL '):
if current_edge:
edges.append(current_edge)
current_edge = [line]
elif current_edge:
current_edge.append(line)
if current_edge:
edges.append(current_edge)
return edges
def dedup_edges(edges):
"""去重:将边列表转为 set of tuples"""
return set(tuple(e) for e in edges)
def write_edges(edges_set, filepath):
"""将去重后的边集合写入文件"""
with open(filepath, 'w') as f:
for edge in edges_set:
f.write(''.join(edge) + '\n')
def merge_cumulative(cumulative_file, new_edges):
"""合并新边到累积文件"""
existing = set()
if os.path.exists(cumulative_file):
existing = dedup_edges(parse_edges(cumulative_file))
merged = existing | new_edges
write_edges(merged, cumulative_file)
return merged
def main():
if len(sys.argv) < 2:
print("Usage: edge_dedup.py <raw_log> [--cumulative <file>]")
sys.exit(1)
raw_log = sys.argv[1]
cumulative_file = None
if '--cumulative' in sys.argv:
idx = sys.argv.index('--cumulative')
if idx + 1 < len(sys.argv):
cumulative_file = sys.argv[idx + 1]
# 解析原始日志
edges = parse_edges(raw_log)
raw_count = len(edges)
# 单次去重
unique = dedup_edges(edges)
unique_count = len(unique)
# 写入 .uniq 文件
uniq_file = raw_log + '.uniq'
write_edges(unique, uniq_file)
# 累积合并
cumulative_count = unique_count
if cumulative_file:
merged = merge_cumulative(cumulative_file, unique)
cumulative_count = len(merged)
# 输出统计
print(f"EDGE_RAW={raw_count} EDGE_UNIQUE={unique_count} EDGE_CUMULATIVE={cumulative_count}")
if __name__ == '__main__':
main()
8.3 去重效果示例
原始日志: 15,234 行 (1,203 条 CALL 记录)
↓ 去重
单 epoch: 487 条唯一边
↓ 累积合并 (与前 5 个 epoch 合并)
累积总计: 2,707 条唯一边
压缩比通常在 10:1 ~ 100:1 之间。
8.4 调用栈规范化(进阶)
不同次编译或不同容器可能导致栈帧地址微调。如果需要跨环境比较,可以规范化:
def normalize_stack_frame(frame):
"""移除地址偏移,只保留函数名"""
# 输入: " 0x7f3a2c1234 buf_page_get_gen+0x1a\n"
# 输出: "buf_page_get_gen"
parts = frame.strip().split()
if len(parts) >= 2:
func = parts[1].split('+')[0] # 去掉 +0x1a 偏移
return func
return frame.strip()
注意:规范化会降低粒度。同名函数在不同调用深度的调用栈可能被错误合并。建议仅在需要跨环境对比时使用。
9. Step 5:构建采集守护进程
9.1 Daemon 架构
┌─────────────────────┐
│ 测试编排脚本 │
│ (run_scenario.sh) │
└─────────┬───────────┘
│ 后台启动
▼
┌─────────────────────┐
│ coverage_daemon │
│ (后台进程) │
└─────────┬───────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Epoch 1 │ │ Epoch 2 │ │ Epoch N │
│ │ │ │ │ │
│ attach() │ │ attach() │ │ attach() │
│ sleep(N) │ │ sleep(N) │ │ sleep(N) │
│ detach() │ │ detach() │ │ detach() │
│ dedup() │ │ dedup() │ │ dedup() │
│ persist() │ │ persist() │ │ persist() │
└──────────────┘ └──────────────┘ └──────────────┘
9.2 Daemon 实现
#!/usr/bin/env bash
# edge_coverage_daemon.sh - 边覆盖度采集守护进程
set -euo pipefail
# 配置参数(通过环境变量传入)
BINARY_PATH="${BINARY_PATH:?需要设置 BINARY_PATH}"
TARGET_PID="${TARGET_PID:?需要设置 TARGET_PID}"
OUTPUT_DIR="${OUTPUT_DIR:?需要设置 OUTPUT_DIR}"
TEMPLATE="${TEMPLATE:-./edge.bt}"
EPOCH_DURATION="${EPOCH_DURATION:-3600}" # 每 epoch 秒数,默认 1 小时
FIRST_DELAY="${FIRST_DELAY:-0}" # 首次采集前等待秒数
CUMULATIVE_FILE="${OUTPUT_DIR}/edges_cumulative.log"
mkdir -p "$OUTPUT_DIR"
# 全局变量
BPF_PID=""
LOG_FILE=""
LOG_TS=""
# ... (edge_trace_attach 和 edge_trace_detach 函数同上) ...
# 去重函数
dedup_current_log() {
if [[ -z "${LOG_FILE:-}" ]] || [[ ! -s "$LOG_FILE" ]]; then
EDGE_RAW=0; EDGE_UNIQUE=0; EDGE_CUMULATIVE=0
return 0
fi
local output
output=$(python3 edge_dedup.py "$LOG_FILE" --cumulative "$CUMULATIVE_FILE")
# 解析输出
EDGE_RAW=$(echo "$output" | grep -oP 'EDGE_RAW=\K\d+')
EDGE_UNIQUE=$(echo "$output" | grep -oP 'EDGE_UNIQUE=\K\d+')
EDGE_CUMULATIVE=$(echo "$output" | grep -oP 'EDGE_CUMULATIVE=\K\d+')
echo "Epoch stats: raw=$EDGE_RAW unique=$EDGE_UNIQUE cumulative=$EDGE_CUMULATIVE"
# 删除原始日志,保留 .uniq
rm -f "$LOG_FILE"
}
# 主循环
main() {
echo "Edge coverage daemon starting"
echo " binary=$BINARY_PATH"
echo " target_pid=$TARGET_PID"
echo " epoch_duration=${EPOCH_DURATION}s"
echo " first_delay=${FIRST_DELAY}s"
EDGE_CUMULATIVE=0
if [[ "$FIRST_DELAY" -gt 0 ]]; then
echo "Waiting ${FIRST_DELAY}s before first sample..."
sleep "$FIRST_DELAY"
fi
local epoch=0
while true; do
epoch=$((epoch + 1))
echo "=== Epoch $epoch start ==="
edge_trace_attach "$BINARY_PATH" "$TARGET_PID" "$OUTPUT_DIR" "$TEMPLATE"
sleep "$EPOCH_DURATION"
edge_trace_detach
dedup_current_log
# 写入历史记录
local ts=$(date +%s)
echo "${epoch},${ts},${EDGE_RAW},${EDGE_UNIQUE},${EDGE_CUMULATIVE}" \
>> "${OUTPUT_DIR}/edge_history.csv"
echo "=== Epoch $epoch complete ==="
done
}
# 信号处理:优雅退出
trap 'echo "Daemon stopping..."; edge_trace_detach; exit 0' SIGTERM SIGINT
main "$@"
9.3 调用方式
# 直接运行
BINARY_PATH=/usr/bin/mysqld \
TARGET_PID=12345 \
OUTPUT_DIR=./edges \
EPOCH_DURATION=1800 \
FIRST_DELAY=300 \
bash edge_coverage_daemon.sh &
# 与测试编排集成
run_test_with_coverage() {
local container="$1"
# 获取目标进程 PID
local target_pid=$(get_target_pid_in_container "$container" "mysqld")
local binary_path=$(get_binary_path_in_container "$container" "/usr/local/mysql/bin/mysqld")
# 启动 daemon
BINARY_PATH="$binary_path" \
TARGET_PID="$target_pid" \
OUTPUT_DIR="./scenarios/my_scenario/edges" \
bash edge_coverage_daemon.sh &
DAEMON_PID=$!
# 运行测试
run_fuzzer "$container"
# 停止 daemon
kill "$DAEMON_PID" 2>/dev/null
wait "$DAEMON_PID" 2>/dev/null
}
9.4 Epoch 时长选择建议
| 测试类型 | 建议 Epoch 时长 | 理由 |
|---|---|---|
| 短期冒烟测试 | 5-10 分钟 | 快速反馈 |
| 标准模糊测试 | 30-60 分钟 | 平衡精度与开销 |
| 长期压力测试 | 2-4 小时 | 减少 attach/detach 频率 |
| 持续集成 | 15-30 分钟 | 匹配 CI pipeline 时长 |
10. Step 6:数据持久化与历史记录
10.1 推荐的目录结构
output/
├── edges/
│ ├── edge.bt # 探针模板
│ ├── edge_run.bt # 实例化脚本(运行时生成)
│ ├── edges_20260511_100000.log # 原始日志(去重后删除)
│ ├── edges_20260511_100000.log.uniq # 单 epoch 去重结果
│ ├── edges_20260511_110000.log.uniq
│ ├── edges_cumulative.log # 跨 epoch 累积去重
│ └── edge_history.csv # 时序统计记录
└── coverage/
└── coverage_history.csv # 如果集成了行覆盖
10.2 edge_history.csv 格式
epoch,timestamp,raw_calls,unique_calls,cumulative_unique
1,1715300000,1523,487,487
2,1715303600,2891,1203,1456
3,1715307200,4102,892,2108
4,1715310800,3567,654,2507
raw_calls:本 epoch 内 bpftrace 捕获的总调用次数unique_calls:本 epoch 内去重后的唯一路径数cumulative_unique:跨所有 epoch 的累积唯一路径数
10.3 持久化脚本
#!/usr/bin/env python3
"""edge_history.py - 读取 edge_history.csv 并生成摘要"""
import csv
import sys
def summarize(csv_path):
with open(csv_path) as f:
reader = csv.DictReader(f)
rows = list(reader)
if not rows:
print("No data")
return
first = rows[0]
last = rows[-1]
total_epochs = len(rows)
print(f"Total epochs: {total_epochs}")
print(f"Duration: {int(last['timestamp']) - int(first['timestamp'])} seconds")
print(f"Raw calls (last epoch): {last['raw_calls']}")
print(f"Unique edges (last epoch): {last['unique_calls']}")
print(f"Cumulative unique edges: {last['cumulative_unique']}")
# 计算增长速率
if total_epochs >= 2:
edges_per_epoch = (int(last['cumulative_unique']) - int(first['cumulative_unique'])) / (total_epochs - 1)
print(f"Average new edges per epoch: {edges_per_epoch:.1f}")
if __name__ == '__main__':
summarize(sys.argv[1])
11. Step 7:可视化与报告
11.1 推荐图表类型
| 图表 | 用途 | X 轴 | Y 轴 |
|---|---|---|---|
| 累积边数 vs 时间 | 观察路径探索趋势 | 小时 | 累积唯一边数 |
| 新增边数 vs 时间 | 观察边际收益 | 小时 | 每 epoch 新增边数 |
| 边数 vs case 数 | 评估测试效率 | 累积 case 数 | 累积唯一边数 |
| 多场景对比 | 比较不同测试策略 | 小时 | 累积唯一边数 |
11.2 matplotlib 绘图模板
#!/usr/bin/env python3
"""plot_edge_coverage.py - 边覆盖度可视化"""
import csv
import sys
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
def load_edge_history(csv_path):
"""加载 edge_history.csv"""
data = []
with open(csv_path) as f:
reader = csv.DictReader(f)
for row in reader:
data.append({
'epoch': int(row['epoch']),
'timestamp': int(row['timestamp']),
'raw': int(row['raw_calls']),
'unique': int(row['unique_calls']),
'cumulative': int(row['cumulative_unique']),
})
return data
def plot_cumulative(data, output_path, label="scenario"):
"""绘制累积边数曲线"""
if not data:
return
t0 = data[0]['timestamp']
hours = [(d['timestamp'] - t0) / 3600.0 for d in data]
cumulative = [d['cumulative'] for d in data]
fig, ax = plt.subplots(figsize=(10, 5))
ax.plot(hours, cumulative, marker='.', markersize=4, linewidth=1.5, label=label)
ax.set_xlabel("Hours elapsed")
ax.set_ylabel("Cumulative unique edges")
ax.set_title("Edge Coverage Over Time")
ax.grid(True, alpha=0.3)
ax.legend()
ax.yaxis.set_major_formatter(
mticker.FuncFormatter(lambda x, _: f'{x/1000:.0f}k' if x >= 1000 else str(int(x)))
)
plt.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
print(f"Saved: {output_path}")
def plot_new_edges_per_epoch(data, output_path, label="scenario"):
"""绘制每 epoch 新增边数柱状图"""
if len(data) < 2:
return
t0 = data[0]['timestamp']
hours = [(d['timestamp'] - t0) / 3600.0 for d in data[1:]]
new_edges = [data[i]['cumulative'] - data[i-1]['cumulative'] for i in range(1, len(data))]
fig, ax = plt.subplots(figsize=(10, 5))
ax.bar(hours, new_edges, width=0.8, alpha=0.7, label=label)
ax.set_xlabel("Hours elapsed")
ax.set_ylabel("New edges in epoch")
ax.set_title("New Edge Discovery Rate")
ax.grid(True, alpha=0.3, axis='y')
ax.legend()
plt.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
print(f"Saved: {output_path}")
def plot_multi_scenario(scenario_dirs, output_path):
"""多场景对比图"""
COLORS = ["#e74c3c", "#2ecc71", "#3498db", "#9b59b6", "#f39c12", "#1abc9c"]
fig, ax = plt.subplots(figsize=(12, 6))
for i, (name, csv_path) in enumerate(scenario_dirs.items()):
data = load_edge_history(csv_path)
if not data:
continue
t0 = data[0]['timestamp']
hours = [(d['timestamp'] - t0) / 3600.0 for d in data]
cumulative = [d['cumulative'] for d in data]
color = COLORS[i % len(COLORS)]
ax.plot(hours, cumulative, marker='.', markersize=3,
linewidth=1.5, label=name, color=color)
ax.set_xlabel("Hours elapsed")
ax.set_ylabel("Cumulative unique edges")
ax.set_title("Edge Coverage Comparison")
ax.grid(True, alpha=0.3)
ax.legend()
ax.yaxis.set_major_formatter(
mticker.FuncFormatter(lambda x, _: f'{x/1000:.0f}k' if x >= 1000 else str(int(x)))
)
plt.tight_layout()
fig.savefig(output_path, dpi=150)
plt.close(fig)
print(f"Saved: {output_path}")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("csv_path", help="edge_history.csv path")
parser.add_argument("-o", "--output", default="./charts")
parser.add_argument("--label", default="scenario")
args = parser.parse_args()
import os
os.makedirs(args.output, exist_ok=True)
data = load_edge_history(args.csv_path)
plot_cumulative(data, os.path.join(args.output, "edge_cumulative.png"), args.label)
plot_new_edges_per_epoch(data, os.path.join(args.output, "edge_new_per_epoch.png"), args.label)
12. Step 8:与行覆盖度集成
12.1 集成架构
边覆盖度和行覆盖度可以共享同一个 daemon 和同一个 history 文件:
# coverage_history.csv(统一格式)
epoch,timestamp,scope,value1,value2,value3
epoch_001,1715300000,overall,45.23,38.10,22.56
epoch_001,1715300000,innodb,52.10,45.30,28.90
epoch_001,1715300000,edge,1523,487,487
12.2 统一 daemon 模板
coverage_daemon() {
local container="$1"
local output_dir="$2"
local epoch_duration="${EPOCH_DURATION:-3600}"
sleep "${FIRST_DELAY:-0}"
local epoch=0
while true; do
epoch=$((epoch + 1))
local label="epoch_$(printf '%03d' $epoch)"
# 1. eBPF 边覆盖:attach
edge_trace_attach "$BINARY_PATH" "$TARGET_PID" "$output_dir/edges" "$TEMPLATE"
# 2. 等待一个 epoch
sleep "$epoch_duration"
# 3. eBPF 边覆盖:detach + 去重
edge_trace_detach
dedup_current_log
# 4. gcov 行覆盖:flush + capture(如果启用)
if [[ "${GCOV_ENABLED:-0}" == "1" ]]; then
flush_gcov "$container"
lcov_capture "$container" "$output_dir/coverage" "$label"
fi
# 5. 写入统一 history
local ts=$(date +%s)
echo "${label},${ts},edge,${EDGE_RAW},${EDGE_UNIQUE},${EDGE_CUMULATIVE}" \
>> "$output_dir/coverage_history.csv"
# ... 写入 overall/innodb/tx 行 ...
done
}
13. 容器化部署注意事项
13.1 权限要求
宿主机上运行 bpftrace 需要:
# 方法 1:以 root 运行
sudo bpftrace edge_run.bt
# 方法 2:授予必要 capabilities(更安全)
# 需要 CAP_BPF 和 CAP_TRACING(Linux 5.8+)
# 或者 CAP_SYS_ADMIN(旧内核)
13.2 Docker 特殊处理
# 容器需要 --privileged 或特定 capabilities
docker run --cap-add=SYS_PTRACE --cap-add=SYS_ADMIN ...
# 或者在宿主机上直接 attach 容器内进程(推荐)
# 不需要容器内安装 bpftrace
13.3 二进制路径访问
容器内二进制在宿主机上的访问方式:
# 方法 1:Docker OverlayFS 物理路径(推荐)
MergedDir=$(docker inspect --format '' <container>)
BINARY="${MergedDir}/usr/local/bin/myapp"
# 方法 2:/proc/PID/root/ 路径(可能有 I/O 竞争)
CONTAINER_PID=$(docker inspect --format '' <container>)
BINARY="/proc/${CONTAINER_PID}/root/usr/local/bin/myapp"
# 方法 3:bind mount(最稳定但需要改 docker run)
docker run -v /usr/local/bin/myapp:/host/myapp:ro ...
13.4 Kubernetes 环境
在 K8s 中,需要在 DaemonSet 中部署 eBPF 采集器:
apiVersion: apps/v1
kind: DaemonSet
spec:
template:
spec:
hostPID: true # 关键:共享宿主机 PID namespace
containers:
- name: edge-collector
securityContext:
privileged: true # 或使用 capabilities
volumeMounts:
- name: proc
mountPath: /host/proc
readOnly: true
volumes:
- name: proc
hostPath:
path: /proc
14. 常见问题与排错
14.1 bpftrace 启动失败
ERROR: failed to attach uprobe
排查步骤:
# 1. 检查符号是否存在
nm /path/to/binary | grep target_function
# 2. 检查二进制是否可读
sudo file /path/to/binary
sudo test -r /path/to/binary && echo "OK" || echo "NOT READABLE"
# 3. 检查 PID 是否存活
ps -p <target_pid>
# 4. 手动测试单个 uprobe
sudo bpftrace -e 'uprobe:/path/to/binary:*target_func* { printf("HIT\n"); }' -c 'sleep 1'
14.2 日志为空(无 CALL 输出)
可能原因:
- 目标进程在采样期间没有执行被插桩的函数
- PID 过滤器不匹配(进程重启后 PID 变化)
- uprobe 未成功 attach(bpftrace 启动时有 warning)
# 检查 bpftrace 是否成功 attach
# 看日志第一行是否有 "=== edge tracing started ==="
head -1 edges.log
# 检查 uprobe 是否注册成功
sudo cat /sys/kernel/debug/tracing/uprobes | grep target_function
14.3 调用栈符号显示为地址
CALL buf_page_get_gen
0x7f3a2c1234
0x7f3a2c5678
原因:二进制被 strip 或缺少符号表。
# 检查是否有符号
file /path/to/binary
# 应显示 "not stripped"
# 如果是 stripped,使用 debuginfo 包
# Debian/Ubuntu: apt install mysql-server-dbgsym
# RHEL/CentOS: debuginfo-install mysql-server
14.4 性能开销过大
bpftrace uprobe 在高频函数上可能有显著开销。
# 降低开销的方法:
# 1. 减少插桩函数数量(只保留最关键的)
# 2. 使用采样过滤(每 N 次调用才采样一次)
uprobe:/path/to/binary:*hot_func*
/pid == TARGET_PID && rand() % 100 == 0/
{
printf("CALL hot_func\n");
printf("%s\n", ustack());
}
# 3. 限制调用栈深度
ustake(8) # 只取前 8 帧
# 4. 缩短 epoch 时长,增加 attach/detach 间隔
14.5 非 C/C++ 语言的适配
| 语言 | 适配方式 |
|---|---|
| Go | 直接可用,Go 编译为原生二进制,符号名如 main.myFunc |
| Rust | 直接可用,使用 *func_name* 匹配 mangled 符号 |
| JVM (Java/Kotlin) | 使用 usym() + 需要 -XX:+PreserveFramePointer 或 Async-Profiler 方案 |
| Python/Node.js | 不适用 uprobe,考虑使用 USDT probes 或 perf |
| C# (.NET) | 需要 DOTNET_EnableDiagnostics=1,使用 USDT |
14.6 内核版本兼容性
| 功能 | 最低内核 | 说明 |
|---|---|---|
| 基本 uprobe | 3.5+ | 核心功能 |
ustack() |
4.6+ | 用户态栈回溯 |
usym() |
4.7+ | 用户态符号解析 |
| Container PID 过滤 | 4.9+ | pid namespace 感知 |
| bpftrace wildcard | 4.18+ | *func* 通配符匹配 |
15. 附录:可复用脚本模板
15.1 一键部署脚本
#!/usr/bin/env bash
# deploy_edge_coverage.sh - 一键部署边覆盖度采集
set -euo pipefail
# 配置
BINARY="${1:?用法: $0 <binary_path> <target_pid> [output_dir]}"
TARGET_PID="${2:?用法: $0 <binary_path> <target_pid> [output_dir]}"
OUTPUT_DIR="${3:-./edge_coverage_$(date +%Y%m%d_%H%M%S)}"
TEMPLATE_DIR="$(dirname "$0")/templates"
mkdir -p "$OUTPUT_DIR/edges"
# 检查依赖
command -v bpftrace >/dev/null 2>&1 || { echo "ERROR: bpftrace not found"; exit 1; }
command -v python3 >/dev/null 2>&1 || { echo "ERROR: python3 not found"; exit 1; }
# 生成探针模板(根据用户指定的函数列表)
cat > "$OUTPUT_DIR/edges/edge.bt" << 'BTPL'
#!/usr/bin/env bpftrace
BEGIN { printf("=== edge tracing started (PID=%d) ===\n", pid); }
# 在此添加 uprobe 块,每个目标函数一个
# uprobe:BIN_PATH:*your_function*
# /pid == TARGET_PID/
# {
# printf("CALL your_function\n");
# printf("%s\n", ustack());
# }
END { printf("=== edge tracing stopped ===\n"); }
BTPL
echo "Deployed to $OUTPUT_DIR"
echo "Edit $OUTPUT_DIR/edges/edge.bt to add target functions"
echo "Then run: bash edge_coverage_daemon.sh"
15.2 完整的 edge.bt 生成器
#!/usr/bin/env bash
# generate_edge_bt.sh - 从函数列表生成 bpftrace 脚本
# 用法: $0 <binary_path> <pid> <func1> [func2] [func3] ...
BINARY="$1"
PID="$2"
shift 2
cat << EOF
#!/usr/bin/env bpftrace
BEGIN { printf("=== edge tracing started (PID=%d) ===\n", pid); }
EOF
for func in "$@"; do
cat << EOF
uprobe:${BINARY}:*${func}*
/pid == ${PID}/
{
printf("CALL ${func}\\n");
printf("%s\\n", ustack());
}
EOF
done
cat << 'EOF'
END { printf("=== edge tracing stopped ===\n"); }
EOF
使用示例:
# 生成脚本
bash generate_edge_bt.sh /usr/bin/mysqld 12345 \
dispatch_command \
mysql_parse \
trx_commit \
buf_page_get_gen \
row_search_mvcc \
> edge.bt
# 直接使用
sudo bpftrace edge.bt > edges.log 2>&1 &
15.3 快速验证脚本
#!/usr/bin/env bash
# verify_edge_trace.sh - 快速验证边覆盖度采集是否正常工作
BINARY="${1:?用法: $0 <binary_path> <target_pid>}"
TARGET_PID="${2:?}"
echo "=== 快速验证 ==="
echo "目标二进制: $BINARY"
echo "目标 PID: $TARGET_PID"
# 1. 检查 bpftrace
echo -n "检查 bpftrace... "
if sudo -n bpftrace --version &>/dev/null; then
echo "OK ($(sudo bpftrace --version 2>&1 | head -1))"
else
echo "FAIL (需要 sudo 权限或 bpftrace 未安装)"
exit 1
fi
# 2. 检查目标进程
echo -n "检查目标进程... "
if ps -p "$TARGET_PID" >/dev/null 2>&1; then
echo "OK ($(ps -p "$TARGET_PID" -o comm=))"
else
echo "FAIL (PID $TARGET_PID 不存在)"
exit 1
fi
# 3. 检查二进制可访问性
echo -n "检查二进制... "
if sudo test -r "$BINARY"; then
echo "OK"
else
echo "FAIL ($BINARY 不可读)"
exit 1
fi
# 4. 测试 uprobe attach
echo -n "测试 uprobe... "
TEST_LOG=$(mktemp)
sudo timeout 5 bpftrace -e "
uprobe:${BINARY}:*dispatch_command*
/pid == ${TARGET_PID}/
{
printf("TEST_HIT\\n");
printf(\"%s\\n\", ustack());
}
" > "$TEST_LOG" 2>&1 &
TEST_PID=$!
sleep 3
sudo kill -INT "$TEST_PID" 2>/dev/null
wait "$TEST_PID" 2>/dev/null
if [[ -s "$TEST_LOG" ]]; then
echo "OK (收到 $(grep -c 'TEST_HIT' "$TEST_LOG") 次命中)"
else
echo "WARN (3 秒内未捕获到事件,可能目标函数未被调用)"
fi
rm -f "$TEST_LOG"
echo "=== 验证完成 ==="
快速参考卡
┌─────────────────────────────────────────────────────────────┐
│ 边覆盖度实施清单 │
│ │
│ □ 确认内核版本 >= 5.4,bpftrace 已安装 │
│ □ 确认被测二进制未 strip,符号可读 │
│ □ 选择 5-15 个关键插桩函数 │
│ □ 编写 edge.bt 探针模板(使用占位符) │
│ □ 实现 attach/detach 函数(PID 解析 + 路径解析) │
│ □ 实现日志去重脚本(Python tuple set) │
│ □ 构建 epoch 循环 daemon │
│ □ 设计 edge_history.csv 持久化格式 │
│ □ 实现可视化(累积曲线 + 新增速率) │
│ □ 验证:手动触发 → 检查日志 → 确认去重正确 │
│ □ 长期运行:监控磁盘空间、bpftrace 内存占用 │
└─────────────────────────────────────────────────────────────┘
- 本文作者: wengsy150943
-
版权声明: 本博客所有文章除特别声明外,均采用
BY-NC-SA
许可协议。转载请注明出处!