15 minute read

本文由 AI 生成,内容为通用方法论指导,请结合实际场景评估适用性。

基于 eBPF 的边覆盖度测试方法指导

本文档是一份通用方法论,指导如何在任意 DBMS 或 C/C++ 应用项目中实现基于 eBPF 的运行时边覆盖度采集。可独立使用,也可与 gcov/lcov 行覆盖度配合形成双覆盖度系统。


目录

  1. 什么是边覆盖度
  2. 为什么需要边覆盖度
  3. 技术选型与前置条件
  4. 实施步骤总览
  5. Step 1:确定插桩目标函数
  6. Step 2:编写 bpftrace 探针脚本
  7. Step 3:实现探针挂载/卸载逻辑
  8. Step 4:实现日志去重与累积
  9. Step 5:构建采集守护进程
  10. Step 6:数据持久化与历史记录
  11. Step 7:可视化与报告
  12. Step 8:与行覆盖度集成
  13. 容器化部署注意事项
  14. 常见问题与排错
  15. 附录:可复用脚本模板

1. 什么是边覆盖度

1.1 定义

边覆盖度度量的是:在程序运行过程中,有多少种不同的执行路径被触发过。

具体定义为:唯一 (目标函数 + 完整调用栈) 组合的数量

  • 对同一目标函数 foo() 的两次调用,如果调用栈不同(例如一次来自 main→bar→foo,另一次来自 main→baz→foo),计为 2 条不同的边
  • 如果调用栈完全相同,即使时间不同,也只计为 1 条边

1.2 与传统覆盖度的区别

维度 行覆盖度 (gcov) 分支覆盖度 (gcov) 边覆盖度 (eBPF)
度量对象 被执行的源码行 被执行的分支 唯一执行路径
插桩时机 编译时 编译时 运行时
粒度 源码行级 条件分支级 调用路径级
信息维度 二维(文件+行号) 二维(文件+分支点) 多维(函数+完整调用栈)
典型问题 “这行代码跑了没?” “if/else 都走到了没?” “这个函数通过哪些不同路径被调用?”

1.3 适用场景

边覆盖度特别适合以下测试场景:

  • 模糊测试 (Fuzzing):评估 fuzzer 是否探索到了足够多的代码路径
  • 回归测试评估:检查回归测试用例是否覆盖了关键执行路径
  • 并发/事务测试:区分同一函数在不同事务流程中的调用路径(如事务提交 vs 后台刷写)
  • 性能测试路径分析:识别热路径和冷路径

2. 为什么需要边覆盖度

2.1 行覆盖度的局限

行覆盖度只能告诉你”哪些代码行被执行了”,但无法区分:

场景 A:SELECT → row_search_mvcc → buf_page_get_gen
场景 B:UPDATE → row_ins_clust_index_entry → buf_page_get_gen

两次调用 buf_page_get_gen 在行覆盖中只计一次,但它们代表完全不同的执行路径和业务语义。

2.2 边覆盖度的价值

  • 路径多样性:量化测试用例触发了多少种不同的调用路径
  • 深度覆盖:发现仅靠行覆盖无法暴露的深层路径(如异常处理路径、并发竞态路径)
  • 零停机:运行时动态插桩,不需要重启被测进程
  • 聚焦关键函数:只在关心的函数上插桩,避免全量插桩的性能开销

3. 技术选型与前置条件

3.1 核心技术栈

组件 用途 最低版本
bpftrace eBPF 探针脚本语言 v0.16+ (推荐 v0.20+)
Linux Kernel eBPF 运行时支持 5.4+ (推荐 5.10+)
Python 3 日志去重与数据处理 3.6+
Bash 编排脚本 4.0+

3.2 内核要求

# 检查内核版本
uname -r  # 需要 5.4+

# 检查 eBPF 支持
ls /sys/kernel/debug/tracing/uprobes  # 应存在

# 检查 bpftrace
bpftrace --version

# 检查权限(需要 root 或 CAP_BPF + CAP_TRACING)
sudo bpftrace -e 'BEGIN { printf("ok\n"); exit(); }'

3.3 被测程序要求

  • 必须是未剥离符号的 ELF 二进制(不能 strip
  • 如果运行在容器中,宿主机需要能访问容器内二进制文件
  • 编译时不要 -fvisibility=hidden(否则 uprobe 无法匹配符号)
  • 推荐保留调试信息(-g),但非必须

3.4 决策树:是否适合使用边覆盖度

你的测试是否涉及长时间运行的进程?
├── 否 → 考虑编译时插桩方案(AFL、gcov 等)
└── 是 → 进一步判断
    ├── 你是否关注"哪些路径被触发"而非仅"哪些行被执行"?
    │   ├── 否 → gcov 行覆盖足够
    │   └── 是 → 边覆盖度适合
    └── 被测进程是否为 C/C++ 原生二进制?
        ├── 否(JVM/Go/解释型语言)→ 需要适配,见 14.5 节
        └── 是 → 直接适用本方案

4. 实施步骤总览

┌──────────────────────────────────────────────────────────────────┐
│                     边覆盖度实施路线图                              │
│                                                                  │
│  Step 1: 确定插桩目标函数                                         │
│     ↓                                                            │
│  Step 2: 编写 bpftrace 探针脚本                                   │
│     ↓                                                            │
│  Step 3: 实现探针挂载/卸载逻辑                                     │
│     ↓                                                            │
│  Step 4: 实现日志去重与累积                                        │
│     ↓                                                            │
│  Step 5: 构建采集守护进程                                          │
│     ↓                                                            │
│  Step 6: 数据持久化与历史记录                                      │
│     ↓                                                            │
│  Step 7: 可视化与报告                                              │
│     ↓                                                            │
│  Step 8: (可选) 与行覆盖度集成                                     │
└──────────────────────────────────────────────────────────────────┘

5. Step 1:确定插桩目标函数

5.1 选择原则

不是所有函数都值得插桩。选择标准:

  1. 业务关键路径上的入口函数:如命令分发、请求处理、事务提交
  2. 被多种路径调用的热点函数:同一个函数被不同业务流程调用时调用栈不同
  3. 子系统边界函数:跨子系统的调用点,如 SQL 层到存储引擎层的入口
  4. 并发/事务相关函数:锁操作、缓冲区管理、日志写入

5.2 选择方法

# 方法 1:通过代码分析确定关键函数
# 查找函数被调用次数最多的函数(callers 多样性高)
objdump -d /path/to/binary | grep 'call.*func_name' | wc -l

# 方法 2:使用 perf 初步识别热点
perf record -g -p <pid> -- sleep 60
perf report  # 查看 callers 分布

# 方法 3:通过 ctags/cscope 统计函数引用
grep -r "func_name" --include="*.c" --include="*.cpp" | wc -l

5.3 函数数量建议

场景 建议数量 说明
快速验证 3-5 个 核心入口函数
标准覆盖 8-15 个 覆盖主要子系统边界
深度分析 20-30 个 需评估 bpftrace 性能开销

5.4 实例:DBMS 项目的目标函数

以 MySQL/InnoDB 为例,分层选择:

层次 函数 为什么选它
SQL 入口 dispatch_command 所有 SQL 的入口
SQL 解析 mysql_parse 语法分析
SQL 执行 mysql_execute_command 执行器入口
事务 trx_commit 事务提交路径
读操作 row_search_mvcc MVCC 读
lock_rec_lock 记录锁操作
缓冲池 buf_page_get_gen 页面获取
写操作 row_ins_clust_index_entry, row_upd_clust_step 聚簇索引写
B-tree btr_cur_search_to_nth_level B-tree 搜索

6. Step 2:编写 bpftrace 探针脚本

6.1 脚本模板

以下是一个通用模板,替换 TARGET_BINARYTARGET_PID 和函数名即可:

#!/usr/bin/env bpftrace

BEGIN { printf("=== edge tracing started (PID=%d) ===\n", pid); }

// ---- 复制此 block 并替换函数名 ----
uprobe:TARGET_BINARY:*FUNC_NAME_1*
/pid == TARGET_PID/
{
    printf("CALL FUNC_NAME_1\n");
    printf("%s\n", ustack());
}
// ---- end block ----

// ---- 重复添加更多函数 ----
uprobe:TARGET_BINARY:*FUNC_NAME_2*
/pid == TARGET_PID/
{
    printf("CALL FUNC_NAME_2\n");
    printf("%s\n", ustack());
}

END { printf("=== edge tracing stopped ===\n"); }

6.2 关键语法说明

# uprobe 语法
uprobe:<binary_path>:<symbol_pattern>

# PID 过滤(只捕获目标进程)
/pid == TARGET_PID/

# 用户态调用栈
ustack()

# 输出标记(用于后续解析)
printf("CALL <function_name>\n")

6.3 C++ 符号匹配

C++ 函数经过 name mangling,直接使用源码函数名可能匹配不到。解决方案:

# 方法 1:使用 wildcard 匹配(推荐)
uprobe:/path/to/binary:*my_function*  # 匹配 _Z11my_functionv 等

# 方法 2:查看 mangled 名称
nm /path/to/binary | grep my_function

# 方法 3:使用 demangled 名称(bpftrace 0.17+)
# bpftrace 自动 demangle,但 wildcard 更可靠

6.4 模板实例化脚本

生产中使用占位符 + sed 替换,避免硬编码:

# edge.bt 模板内容
TEMPLATE='#!/usr/bin/env bpftrace
BEGIN { printf("=== edge tracing started (PID=%d) ===\n", pid); }
uprobe:BIN_PATH:*dispatch_command*
/pid == TARGET_PID/
{
    printf("CALL dispatch_command\n");
    printf("%s\n", ustack());
}
END { printf("=== edge tracing stopped ===\n"); }'

# 运行时实例化
sed -e "s|BIN_PATH|/usr/bin/mysqld|g" \
    -e "s/TARGET_PID/12345/g" \
    edge.bt > edge_run.bt

sudo bpftrace edge_run.bt > edges.log 2>&1 &

6.5 验证探针是否正常工作

# 快速测试:手动触发一次
sudo bpftrace -e '
uprobe:/usr/bin/mysqld:*dispatch_command*
/pid == 12345/
{
    printf("HIT dispatch_command\n");
    printf("%s\n", ustack());
}
' -c 'sleep 1'  # 1 秒超时

# 预期输出:如果在 1 秒内有 SQL 查询执行,会看到 HIT 行 + 调用栈
# 如果无输出:检查 PID 是否正确、符号是否匹配、进程是否在执行

7. Step 3:实现探针挂载/卸载逻辑

7.1 挂载流程

edge_trace_attach() {
    local binary_path="$1"
    local target_pid="$2"
    local output_dir="$3"
    local template="$4"

    # 前置检查
    if ! sudo -n bpftrace --version &>/dev/null; then
        echo "ERROR: bpftrace not available or no sudo permission"
        return 1
    fi

    if ! sudo test -f "$binary_path"; then
        echo "ERROR: binary not found: $binary_path"
        return 1
    fi

    # 生成实例化脚本
    local ts=$(date +%Y%m%d_%H%M%S)
    local run_bt="${output_dir}/edge_run_${ts}.bt"
    sed -e "s|BIN_PATH|${binary_path}|g" \
        -e "s/TARGET_PID/${target_pid}/g" \
        "$template" > "$run_bt"

    # 后台启动 bpftrace
    local log_file="${output_dir}/edges_${ts}.log"
    sudo bpftrace "$run_bt" >> "$log_file" 2>&1 &
    BPF_PID=$!

    # 验证启动成功
    sleep 3
    if ! kill -0 "$BPF_PID" 2>/dev/null; then
        echo "ERROR: bpftrace failed to start, check $log_file"
        BPF_PID=""
        return 1
    fi

    echo "bpftrace attached (pid=$BPF_PID, target=$target_pid)"
    LOG_FILE="$log_file"
    LOG_TS="$ts"
}

7.2 卸载流程

edge_trace_detach() {
    if [[ -z "${BPF_PID:-}" ]]; then
        return 0
    fi

    echo "Detaching bpftrace (pid=$BPF_PID)"

    # 优雅停止(SIGINT 让 bpftrace 输出 END 块)
    sudo kill -INT "$BPF_PID" 2>/dev/null || true

    # 等待退出,最多 10 秒
    local waited=0
    while [[ $waited -lt 10 ]]; do
        if ! kill -0 "$BPF_PID" 2>/dev/null; then
            echo "bpftrace stopped after ${waited}s"
            break
        fi
        sleep 1
        ((waited++))
    done

    # 强制兜底
    sudo kill -9 "$BPF_PID" 2>/dev/null || true
    BPF_PID=""
}

7.3 容器内进程的 PID 解析

如果被测程序运行在 Docker 容器中:

get_container_pid() {
    local container="$1"
    docker inspect --format '' "$container"
}

get_target_pid_in_container() {
    local container="$1"
    local target_comm="$2"  # 如 "mysqld"

    local container_pid=$(get_container_pid "$container")

    # 遍历容器 init 进程的子进程,精确匹配进程名
    ps --ppid "$container_pid" -o pid=,comm= 2>/dev/null \
        | awk -v target="$target_comm" '$2 == target {print $1; exit}'
}

注意:使用 comm 精确匹配而非进程名模糊匹配,避免误匹配辅助进程(如 mysqld_safe vs mysqld)。

7.4 容器内二进制路径解析

get_binary_path_in_container() {
    local container="$1"
    local relative_path="$2"  # 如 "/usr/local/mysql/bin/mysqld"

    # 方法 1:Docker OverlayFS 物理路径(推荐,无 I/O 竞争)
    local merged_dir
    merged_dir=$(docker inspect --format '' "$container" 2>/dev/null)
    local physical_path="${merged_dir}${relative_path}"
    if [[ -n "$merged_dir" ]] && sudo test -f "$physical_path"; then
        echo "$physical_path"
        return 0
    fi

    # 方法 2:通过 /proc/PID/root/ 访问(回退方案)
    local container_pid=$(get_container_pid "$container")
    local proc_path="/proc/${container_pid}/root${relative_path}"
    if sudo test -f "$proc_path"; then
        echo "$proc_path"
        return 0
    fi

    echo "ERROR: cannot resolve binary path" >&2
    return 1
}

8. Step 4:实现日志去重与累积

8.1 为什么需要去重

bpftrace 每次函数调用都输出一行 CALL + 完整调用栈。在高频调用场景下(如 buf_page_get_gen 每秒可能触发数千次),原始日志会非常大。去重可以:

  • 将数千条重复日志压缩为少量唯一路径
  • 实现跨时间窗口的累积统计
  • 支持增量分析(每个 epoch 新增了多少条路径)

8.2 去重算法

核心思想:将每条调用路径视为一个有序字符串列表,用 Python set 的 tuple 去重。

#!/usr/bin/env python3
"""
edge_dedup.py - 边覆盖度日志去重工具

用法:
    python3 edge_dedup.py <raw_log> [--cumulative <cumulative_file>]

输出:
    <raw_log>.uniq         - 单次去重结果
    <cumulative_file>      - 累积去重结果(可选)
    stdout: EDGE_RAW=N EDGE_UNIQUE=M EDGE_CUMULATIVE=K
"""

import sys
import os


def parse_edges(filepath):
    """将原始日志解析为边的列表。每条边 = [CALL行, 栈帧1, 栈帧2, ...]"""
    edges = []
    current_edge = []

    with open(filepath) as f:
        for line in f:
            # 过滤 bpftrace 元数据
            if line.startswith('Attaching') or \
               line.startswith('=== edge') or \
               line.startswith('=== eBPF') or \
               not line.strip():
                continue

            if line.startswith('CALL '):
                if current_edge:
                    edges.append(current_edge)
                current_edge = [line]
            elif current_edge:
                current_edge.append(line)

        if current_edge:
            edges.append(current_edge)

    return edges


def dedup_edges(edges):
    """去重:将边列表转为 set of tuples"""
    return set(tuple(e) for e in edges)


def write_edges(edges_set, filepath):
    """将去重后的边集合写入文件"""
    with open(filepath, 'w') as f:
        for edge in edges_set:
            f.write(''.join(edge) + '\n')


def merge_cumulative(cumulative_file, new_edges):
    """合并新边到累积文件"""
    existing = set()
    if os.path.exists(cumulative_file):
        existing = dedup_edges(parse_edges(cumulative_file))

    merged = existing | new_edges
    write_edges(merged, cumulative_file)
    return merged


def main():
    if len(sys.argv) < 2:
        print("Usage: edge_dedup.py <raw_log> [--cumulative <file>]")
        sys.exit(1)

    raw_log = sys.argv[1]
    cumulative_file = None
    if '--cumulative' in sys.argv:
        idx = sys.argv.index('--cumulative')
        if idx + 1 < len(sys.argv):
            cumulative_file = sys.argv[idx + 1]

    # 解析原始日志
    edges = parse_edges(raw_log)
    raw_count = len(edges)

    # 单次去重
    unique = dedup_edges(edges)
    unique_count = len(unique)

    # 写入 .uniq 文件
    uniq_file = raw_log + '.uniq'
    write_edges(unique, uniq_file)

    # 累积合并
    cumulative_count = unique_count
    if cumulative_file:
        merged = merge_cumulative(cumulative_file, unique)
        cumulative_count = len(merged)

    # 输出统计
    print(f"EDGE_RAW={raw_count} EDGE_UNIQUE={unique_count} EDGE_CUMULATIVE={cumulative_count}")


if __name__ == '__main__':
    main()

8.3 去重效果示例

原始日志:  15,234 行 (1,203 条 CALL 记录)
    ↓ 去重
单 epoch:  487 条唯一边
    ↓ 累积合并 (与前 5 个 epoch 合并)
累积总计:  2,707 条唯一边

压缩比通常在 10:1 ~ 100:1 之间。

8.4 调用栈规范化(进阶)

不同次编译或不同容器可能导致栈帧地址微调。如果需要跨环境比较,可以规范化:

def normalize_stack_frame(frame):
    """移除地址偏移,只保留函数名"""
    # 输入: "        0x7f3a2c1234 buf_page_get_gen+0x1a\n"
    # 输出: "buf_page_get_gen"
    parts = frame.strip().split()
    if len(parts) >= 2:
        func = parts[1].split('+')[0]  # 去掉 +0x1a 偏移
        return func
    return frame.strip()

注意:规范化会降低粒度。同名函数在不同调用深度的调用栈可能被错误合并。建议仅在需要跨环境对比时使用。


9. Step 5:构建采集守护进程

9.1 Daemon 架构

                    ┌─────────────────────┐
                    │    测试编排脚本       │
                    │  (run_scenario.sh)  │
                    └─────────┬───────────┘
                              │ 后台启动
                              ▼
                    ┌─────────────────────┐
                    │   coverage_daemon   │
                    │   (后台进程)         │
                    └─────────┬───────────┘
                              │
          ┌───────────────────┼───────────────────┐
          ▼                   ▼                   ▼
   ┌──────────────┐  ┌──────────────┐  ┌──────────────┐
   │   Epoch 1    │  │   Epoch 2    │  │   Epoch N    │
   │              │  │              │  │              │
   │ attach()     │  │ attach()     │  │ attach()     │
   │ sleep(N)     │  │ sleep(N)     │  │ sleep(N)     │
   │ detach()     │  │ detach()     │  │ detach()     │
   │ dedup()      │  │ dedup()      │  │ dedup()      │
   │ persist()    │  │ persist()    │  │ persist()    │
   └──────────────┘  └──────────────┘  └──────────────┘

9.2 Daemon 实现

#!/usr/bin/env bash
# edge_coverage_daemon.sh - 边覆盖度采集守护进程

set -euo pipefail

# 配置参数(通过环境变量传入)
BINARY_PATH="${BINARY_PATH:?需要设置 BINARY_PATH}"
TARGET_PID="${TARGET_PID:?需要设置 TARGET_PID}"
OUTPUT_DIR="${OUTPUT_DIR:?需要设置 OUTPUT_DIR}"
TEMPLATE="${TEMPLATE:-./edge.bt}"
EPOCH_DURATION="${EPOCH_DURATION:-3600}"     # 每 epoch 秒数,默认 1 小时
FIRST_DELAY="${FIRST_DELAY:-0}"             # 首次采集前等待秒数
CUMULATIVE_FILE="${OUTPUT_DIR}/edges_cumulative.log"

mkdir -p "$OUTPUT_DIR"

# 全局变量
BPF_PID=""
LOG_FILE=""
LOG_TS=""

# ... (edge_trace_attach 和 edge_trace_detach 函数同上) ...

# 去重函数
dedup_current_log() {
    if [[ -z "${LOG_FILE:-}" ]] || [[ ! -s "$LOG_FILE" ]]; then
        EDGE_RAW=0; EDGE_UNIQUE=0; EDGE_CUMULATIVE=0
        return 0
    fi

    local output
    output=$(python3 edge_dedup.py "$LOG_FILE" --cumulative "$CUMULATIVE_FILE")

    # 解析输出
    EDGE_RAW=$(echo "$output" | grep -oP 'EDGE_RAW=\K\d+')
    EDGE_UNIQUE=$(echo "$output" | grep -oP 'EDGE_UNIQUE=\K\d+')
    EDGE_CUMULATIVE=$(echo "$output" | grep -oP 'EDGE_CUMULATIVE=\K\d+')

    echo "Epoch stats: raw=$EDGE_RAW unique=$EDGE_UNIQUE cumulative=$EDGE_CUMULATIVE"

    # 删除原始日志,保留 .uniq
    rm -f "$LOG_FILE"
}

# 主循环
main() {
    echo "Edge coverage daemon starting"
    echo "  binary=$BINARY_PATH"
    echo "  target_pid=$TARGET_PID"
    echo "  epoch_duration=${EPOCH_DURATION}s"
    echo "  first_delay=${FIRST_DELAY}s"

    EDGE_CUMULATIVE=0

    if [[ "$FIRST_DELAY" -gt 0 ]]; then
        echo "Waiting ${FIRST_DELAY}s before first sample..."
        sleep "$FIRST_DELAY"
    fi

    local epoch=0
    while true; do
        epoch=$((epoch + 1))
        echo "=== Epoch $epoch start ==="

        edge_trace_attach "$BINARY_PATH" "$TARGET_PID" "$OUTPUT_DIR" "$TEMPLATE"

        sleep "$EPOCH_DURATION"

        edge_trace_detach

        dedup_current_log

        # 写入历史记录
        local ts=$(date +%s)
        echo "${epoch},${ts},${EDGE_RAW},${EDGE_UNIQUE},${EDGE_CUMULATIVE}" \
            >> "${OUTPUT_DIR}/edge_history.csv"

        echo "=== Epoch $epoch complete ==="
    done
}

# 信号处理:优雅退出
trap 'echo "Daemon stopping..."; edge_trace_detach; exit 0' SIGTERM SIGINT

main "$@"

9.3 调用方式

# 直接运行
BINARY_PATH=/usr/bin/mysqld \
TARGET_PID=12345 \
OUTPUT_DIR=./edges \
EPOCH_DURATION=1800 \
FIRST_DELAY=300 \
bash edge_coverage_daemon.sh &

# 与测试编排集成
run_test_with_coverage() {
    local container="$1"

    # 获取目标进程 PID
    local target_pid=$(get_target_pid_in_container "$container" "mysqld")
    local binary_path=$(get_binary_path_in_container "$container" "/usr/local/mysql/bin/mysqld")

    # 启动 daemon
    BINARY_PATH="$binary_path" \
    TARGET_PID="$target_pid" \
    OUTPUT_DIR="./scenarios/my_scenario/edges" \
    bash edge_coverage_daemon.sh &
    DAEMON_PID=$!

    # 运行测试
    run_fuzzer "$container"

    # 停止 daemon
    kill "$DAEMON_PID" 2>/dev/null
    wait "$DAEMON_PID" 2>/dev/null
}

9.4 Epoch 时长选择建议

测试类型 建议 Epoch 时长 理由
短期冒烟测试 5-10 分钟 快速反馈
标准模糊测试 30-60 分钟 平衡精度与开销
长期压力测试 2-4 小时 减少 attach/detach 频率
持续集成 15-30 分钟 匹配 CI pipeline 时长

10. Step 6:数据持久化与历史记录

10.1 推荐的目录结构

output/
├── edges/
│   ├── edge.bt                          # 探针模板
│   ├── edge_run.bt                      # 实例化脚本(运行时生成)
│   ├── edges_20260511_100000.log        # 原始日志(去重后删除)
│   ├── edges_20260511_100000.log.uniq   # 单 epoch 去重结果
│   ├── edges_20260511_110000.log.uniq
│   ├── edges_cumulative.log             # 跨 epoch 累积去重
│   └── edge_history.csv                 # 时序统计记录
└── coverage/
    └── coverage_history.csv             # 如果集成了行覆盖

10.2 edge_history.csv 格式

epoch,timestamp,raw_calls,unique_calls,cumulative_unique
1,1715300000,1523,487,487
2,1715303600,2891,1203,1456
3,1715307200,4102,892,2108
4,1715310800,3567,654,2507
  • raw_calls:本 epoch 内 bpftrace 捕获的总调用次数
  • unique_calls:本 epoch 内去重后的唯一路径数
  • cumulative_unique:跨所有 epoch 的累积唯一路径数

10.3 持久化脚本

#!/usr/bin/env python3
"""edge_history.py - 读取 edge_history.csv 并生成摘要"""

import csv
import sys


def summarize(csv_path):
    with open(csv_path) as f:
        reader = csv.DictReader(f)
        rows = list(reader)

    if not rows:
        print("No data")
        return

    first = rows[0]
    last = rows[-1]
    total_epochs = len(rows)

    print(f"Total epochs: {total_epochs}")
    print(f"Duration: {int(last['timestamp']) - int(first['timestamp'])} seconds")
    print(f"Raw calls (last epoch): {last['raw_calls']}")
    print(f"Unique edges (last epoch): {last['unique_calls']}")
    print(f"Cumulative unique edges: {last['cumulative_unique']}")

    # 计算增长速率
    if total_epochs >= 2:
        edges_per_epoch = (int(last['cumulative_unique']) - int(first['cumulative_unique'])) / (total_epochs - 1)
        print(f"Average new edges per epoch: {edges_per_epoch:.1f}")


if __name__ == '__main__':
    summarize(sys.argv[1])

11. Step 7:可视化与报告

11.1 推荐图表类型

图表 用途 X 轴 Y 轴
累积边数 vs 时间 观察路径探索趋势 小时 累积唯一边数
新增边数 vs 时间 观察边际收益 小时 每 epoch 新增边数
边数 vs case 数 评估测试效率 累积 case 数 累积唯一边数
多场景对比 比较不同测试策略 小时 累积唯一边数

11.2 matplotlib 绘图模板

#!/usr/bin/env python3
"""plot_edge_coverage.py - 边覆盖度可视化"""

import csv
import sys
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker


def load_edge_history(csv_path):
    """加载 edge_history.csv"""
    data = []
    with open(csv_path) as f:
        reader = csv.DictReader(f)
        for row in reader:
            data.append({
                'epoch': int(row['epoch']),
                'timestamp': int(row['timestamp']),
                'raw': int(row['raw_calls']),
                'unique': int(row['unique_calls']),
                'cumulative': int(row['cumulative_unique']),
            })
    return data


def plot_cumulative(data, output_path, label="scenario"):
    """绘制累积边数曲线"""
    if not data:
        return

    t0 = data[0]['timestamp']
    hours = [(d['timestamp'] - t0) / 3600.0 for d in data]
    cumulative = [d['cumulative'] for d in data]

    fig, ax = plt.subplots(figsize=(10, 5))
    ax.plot(hours, cumulative, marker='.', markersize=4, linewidth=1.5, label=label)
    ax.set_xlabel("Hours elapsed")
    ax.set_ylabel("Cumulative unique edges")
    ax.set_title("Edge Coverage Over Time")
    ax.grid(True, alpha=0.3)
    ax.legend()
    ax.yaxis.set_major_formatter(
        mticker.FuncFormatter(lambda x, _: f'{x/1000:.0f}k' if x >= 1000 else str(int(x)))
    )

    plt.tight_layout()
    fig.savefig(output_path, dpi=150)
    plt.close(fig)
    print(f"Saved: {output_path}")


def plot_new_edges_per_epoch(data, output_path, label="scenario"):
    """绘制每 epoch 新增边数柱状图"""
    if len(data) < 2:
        return

    t0 = data[0]['timestamp']
    hours = [(d['timestamp'] - t0) / 3600.0 for d in data[1:]]
    new_edges = [data[i]['cumulative'] - data[i-1]['cumulative'] for i in range(1, len(data))]

    fig, ax = plt.subplots(figsize=(10, 5))
    ax.bar(hours, new_edges, width=0.8, alpha=0.7, label=label)
    ax.set_xlabel("Hours elapsed")
    ax.set_ylabel("New edges in epoch")
    ax.set_title("New Edge Discovery Rate")
    ax.grid(True, alpha=0.3, axis='y')
    ax.legend()

    plt.tight_layout()
    fig.savefig(output_path, dpi=150)
    plt.close(fig)
    print(f"Saved: {output_path}")


def plot_multi_scenario(scenario_dirs, output_path):
    """多场景对比图"""
    COLORS = ["#e74c3c", "#2ecc71", "#3498db", "#9b59b6", "#f39c12", "#1abc9c"]

    fig, ax = plt.subplots(figsize=(12, 6))

    for i, (name, csv_path) in enumerate(scenario_dirs.items()):
        data = load_edge_history(csv_path)
        if not data:
            continue
        t0 = data[0]['timestamp']
        hours = [(d['timestamp'] - t0) / 3600.0 for d in data]
        cumulative = [d['cumulative'] for d in data]
        color = COLORS[i % len(COLORS)]
        ax.plot(hours, cumulative, marker='.', markersize=3,
                linewidth=1.5, label=name, color=color)

    ax.set_xlabel("Hours elapsed")
    ax.set_ylabel("Cumulative unique edges")
    ax.set_title("Edge Coverage Comparison")
    ax.grid(True, alpha=0.3)
    ax.legend()
    ax.yaxis.set_major_formatter(
        mticker.FuncFormatter(lambda x, _: f'{x/1000:.0f}k' if x >= 1000 else str(int(x)))
    )

    plt.tight_layout()
    fig.savefig(output_path, dpi=150)
    plt.close(fig)
    print(f"Saved: {output_path}")


if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("csv_path", help="edge_history.csv path")
    parser.add_argument("-o", "--output", default="./charts")
    parser.add_argument("--label", default="scenario")
    args = parser.parse_args()

    import os
    os.makedirs(args.output, exist_ok=True)

    data = load_edge_history(args.csv_path)
    plot_cumulative(data, os.path.join(args.output, "edge_cumulative.png"), args.label)
    plot_new_edges_per_epoch(data, os.path.join(args.output, "edge_new_per_epoch.png"), args.label)

12. Step 8:与行覆盖度集成

12.1 集成架构

边覆盖度和行覆盖度可以共享同一个 daemon 和同一个 history 文件:

# coverage_history.csv(统一格式)
epoch,timestamp,scope,value1,value2,value3
epoch_001,1715300000,overall,45.23,38.10,22.56
epoch_001,1715300000,innodb,52.10,45.30,28.90
epoch_001,1715300000,edge,1523,487,487

12.2 统一 daemon 模板

coverage_daemon() {
    local container="$1"
    local output_dir="$2"
    local epoch_duration="${EPOCH_DURATION:-3600}"

    sleep "${FIRST_DELAY:-0}"

    local epoch=0
    while true; do
        epoch=$((epoch + 1))
        local label="epoch_$(printf '%03d' $epoch)"

        # 1. eBPF 边覆盖:attach
        edge_trace_attach "$BINARY_PATH" "$TARGET_PID" "$output_dir/edges" "$TEMPLATE"

        # 2. 等待一个 epoch
        sleep "$epoch_duration"

        # 3. eBPF 边覆盖:detach + 去重
        edge_trace_detach
        dedup_current_log

        # 4. gcov 行覆盖:flush + capture(如果启用)
        if [[ "${GCOV_ENABLED:-0}" == "1" ]]; then
            flush_gcov "$container"
            lcov_capture "$container" "$output_dir/coverage" "$label"
        fi

        # 5. 写入统一 history
        local ts=$(date +%s)
        echo "${label},${ts},edge,${EDGE_RAW},${EDGE_UNIQUE},${EDGE_CUMULATIVE}" \
            >> "$output_dir/coverage_history.csv"
        # ... 写入 overall/innodb/tx 行 ...
    done
}

13. 容器化部署注意事项

13.1 权限要求

宿主机上运行 bpftrace 需要:

# 方法 1:以 root 运行
sudo bpftrace edge_run.bt

# 方法 2:授予必要 capabilities(更安全)
# 需要 CAP_BPF 和 CAP_TRACING(Linux 5.8+)
# 或者 CAP_SYS_ADMIN(旧内核)

13.2 Docker 特殊处理

# 容器需要 --privileged 或特定 capabilities
docker run --cap-add=SYS_PTRACE --cap-add=SYS_ADMIN ...

# 或者在宿主机上直接 attach 容器内进程(推荐)
# 不需要容器内安装 bpftrace

13.3 二进制路径访问

容器内二进制在宿主机上的访问方式:

# 方法 1:Docker OverlayFS 物理路径(推荐)
MergedDir=$(docker inspect --format '' <container>)
BINARY="${MergedDir}/usr/local/bin/myapp"

# 方法 2:/proc/PID/root/ 路径(可能有 I/O 竞争)
CONTAINER_PID=$(docker inspect --format '' <container>)
BINARY="/proc/${CONTAINER_PID}/root/usr/local/bin/myapp"

# 方法 3:bind mount(最稳定但需要改 docker run)
docker run -v /usr/local/bin/myapp:/host/myapp:ro ...

13.4 Kubernetes 环境

在 K8s 中,需要在 DaemonSet 中部署 eBPF 采集器:

apiVersion: apps/v1
kind: DaemonSet
spec:
  template:
    spec:
      hostPID: true  # 关键:共享宿主机 PID namespace
      containers:
      - name: edge-collector
        securityContext:
          privileged: true  # 或使用 capabilities
        volumeMounts:
        - name: proc
          mountPath: /host/proc
          readOnly: true
      volumes:
      - name: proc
        hostPath:
          path: /proc

14. 常见问题与排错

14.1 bpftrace 启动失败

ERROR: failed to attach uprobe

排查步骤

# 1. 检查符号是否存在
nm /path/to/binary | grep target_function

# 2. 检查二进制是否可读
sudo file /path/to/binary
sudo test -r /path/to/binary && echo "OK" || echo "NOT READABLE"

# 3. 检查 PID 是否存活
ps -p <target_pid>

# 4. 手动测试单个 uprobe
sudo bpftrace -e 'uprobe:/path/to/binary:*target_func* { printf("HIT\n"); }' -c 'sleep 1'

14.2 日志为空(无 CALL 输出)

可能原因:

  1. 目标进程在采样期间没有执行被插桩的函数
  2. PID 过滤器不匹配(进程重启后 PID 变化)
  3. uprobe 未成功 attach(bpftrace 启动时有 warning)
# 检查 bpftrace 是否成功 attach
# 看日志第一行是否有 "=== edge tracing started ==="
head -1 edges.log

# 检查 uprobe 是否注册成功
sudo cat /sys/kernel/debug/tracing/uprobes | grep target_function

14.3 调用栈符号显示为地址

CALL buf_page_get_gen
        0x7f3a2c1234
        0x7f3a2c5678

原因:二进制被 strip 或缺少符号表。

# 检查是否有符号
file /path/to/binary
# 应显示 "not stripped"

# 如果是 stripped,使用 debuginfo 包
# Debian/Ubuntu: apt install mysql-server-dbgsym
# RHEL/CentOS: debuginfo-install mysql-server

14.4 性能开销过大

bpftrace uprobe 在高频函数上可能有显著开销。

# 降低开销的方法:

# 1. 减少插桩函数数量(只保留最关键的)

# 2. 使用采样过滤(每 N 次调用才采样一次)
uprobe:/path/to/binary:*hot_func*
/pid == TARGET_PID && rand() % 100 == 0/
{
    printf("CALL hot_func\n");
    printf("%s\n", ustack());
}

# 3. 限制调用栈深度
ustake(8)  # 只取前 8 帧

# 4. 缩短 epoch 时长,增加 attach/detach 间隔

14.5 非 C/C++ 语言的适配

语言 适配方式
Go 直接可用,Go 编译为原生二进制,符号名如 main.myFunc
Rust 直接可用,使用 *func_name* 匹配 mangled 符号
JVM (Java/Kotlin) 使用 usym() + 需要 -XX:+PreserveFramePointer 或 Async-Profiler 方案
Python/Node.js 不适用 uprobe,考虑使用 USDT probes 或 perf
C# (.NET) 需要 DOTNET_EnableDiagnostics=1,使用 USDT

14.6 内核版本兼容性

功能 最低内核 说明
基本 uprobe 3.5+ 核心功能
ustack() 4.6+ 用户态栈回溯
usym() 4.7+ 用户态符号解析
Container PID 过滤 4.9+ pid namespace 感知
bpftrace wildcard 4.18+ *func* 通配符匹配

15. 附录:可复用脚本模板

15.1 一键部署脚本

#!/usr/bin/env bash
# deploy_edge_coverage.sh - 一键部署边覆盖度采集

set -euo pipefail

# 配置
BINARY="${1:?用法: $0 <binary_path> <target_pid> [output_dir]}"
TARGET_PID="${2:?用法: $0 <binary_path> <target_pid> [output_dir]}"
OUTPUT_DIR="${3:-./edge_coverage_$(date +%Y%m%d_%H%M%S)}"
TEMPLATE_DIR="$(dirname "$0")/templates"

mkdir -p "$OUTPUT_DIR/edges"

# 检查依赖
command -v bpftrace >/dev/null 2>&1 || { echo "ERROR: bpftrace not found"; exit 1; }
command -v python3 >/dev/null 2>&1 || { echo "ERROR: python3 not found"; exit 1; }

# 生成探针模板(根据用户指定的函数列表)
cat > "$OUTPUT_DIR/edges/edge.bt" << 'BTPL'
#!/usr/bin/env bpftrace
BEGIN { printf("=== edge tracing started (PID=%d) ===\n", pid); }

# 在此添加 uprobe 块,每个目标函数一个
# uprobe:BIN_PATH:*your_function*
# /pid == TARGET_PID/
# {
#     printf("CALL your_function\n");
#     printf("%s\n", ustack());
# }

END { printf("=== edge tracing stopped ===\n"); }
BTPL

echo "Deployed to $OUTPUT_DIR"
echo "Edit $OUTPUT_DIR/edges/edge.bt to add target functions"
echo "Then run: bash edge_coverage_daemon.sh"

15.2 完整的 edge.bt 生成器

#!/usr/bin/env bash
# generate_edge_bt.sh - 从函数列表生成 bpftrace 脚本

# 用法: $0 <binary_path> <pid> <func1> [func2] [func3] ...

BINARY="$1"
PID="$2"
shift 2

cat << EOF
#!/usr/bin/env bpftrace

BEGIN { printf("=== edge tracing started (PID=%d) ===\n", pid); }

EOF

for func in "$@"; do
cat << EOF
uprobe:${BINARY}:*${func}*
/pid == ${PID}/
{
    printf("CALL ${func}\\n");
    printf("%s\\n", ustack());
}

EOF
done

cat << 'EOF'
END { printf("=== edge tracing stopped ===\n"); }
EOF

使用示例:

# 生成脚本
bash generate_edge_bt.sh /usr/bin/mysqld 12345 \
    dispatch_command \
    mysql_parse \
    trx_commit \
    buf_page_get_gen \
    row_search_mvcc \
    > edge.bt

# 直接使用
sudo bpftrace edge.bt > edges.log 2>&1 &

15.3 快速验证脚本

#!/usr/bin/env bash
# verify_edge_trace.sh - 快速验证边覆盖度采集是否正常工作

BINARY="${1:?用法: $0 <binary_path> <target_pid>}"
TARGET_PID="${2:?}"

echo "=== 快速验证 ==="
echo "目标二进制: $BINARY"
echo "目标 PID: $TARGET_PID"

# 1. 检查 bpftrace
echo -n "检查 bpftrace... "
if sudo -n bpftrace --version &>/dev/null; then
    echo "OK ($(sudo bpftrace --version 2>&1 | head -1))"
else
    echo "FAIL (需要 sudo 权限或 bpftrace 未安装)"
    exit 1
fi

# 2. 检查目标进程
echo -n "检查目标进程... "
if ps -p "$TARGET_PID" >/dev/null 2>&1; then
    echo "OK ($(ps -p "$TARGET_PID" -o comm=))"
else
    echo "FAIL (PID $TARGET_PID 不存在)"
    exit 1
fi

# 3. 检查二进制可访问性
echo -n "检查二进制... "
if sudo test -r "$BINARY"; then
    echo "OK"
else
    echo "FAIL ($BINARY 不可读)"
    exit 1
fi

# 4. 测试 uprobe attach
echo -n "测试 uprobe... "
TEST_LOG=$(mktemp)
sudo timeout 5 bpftrace -e "
uprobe:${BINARY}:*dispatch_command*
/pid == ${TARGET_PID}/
{
    printf("TEST_HIT\\n");
    printf(\"%s\\n\", ustack());
}
" > "$TEST_LOG" 2>&1 &
TEST_PID=$!
sleep 3
sudo kill -INT "$TEST_PID" 2>/dev/null
wait "$TEST_PID" 2>/dev/null

if [[ -s "$TEST_LOG" ]]; then
    echo "OK (收到 $(grep -c 'TEST_HIT' "$TEST_LOG") 次命中)"
else
    echo "WARN (3 秒内未捕获到事件,可能目标函数未被调用)"
fi
rm -f "$TEST_LOG"

echo "=== 验证完成 ==="

快速参考卡

┌─────────────────────────────────────────────────────────────┐
│                   边覆盖度实施清单                            │
│                                                             │
│  □ 确认内核版本 >= 5.4,bpftrace 已安装                      │
│  □ 确认被测二进制未 strip,符号可读                           │
│  □ 选择 5-15 个关键插桩函数                                  │
│  □ 编写 edge.bt 探针模板(使用占位符)                        │
│  □ 实现 attach/detach 函数(PID 解析 + 路径解析)             │
│  □ 实现日志去重脚本(Python tuple set)                       │
│  □ 构建 epoch 循环 daemon                                    │
│  □ 设计 edge_history.csv 持久化格式                          │
│  □ 实现可视化(累积曲线 + 新增速率)                          │
│  □ 验证:手动触发 → 检查日志 → 确认去重正确                   │
│  □ 长期运行:监控磁盘空间、bpftrace 内存占用                  │
└─────────────────────────────────────────────────────────────┘

Updated: