首页 / 文章 / Python Agent + DuckDB:71 行黑盒崩溃调试实战
← 返回
AI技术

Python Agent + DuckDB:71 行黑盒崩溃调试实战

✍️ zhirenhun 📅 2026/6/1 👁 30 阅读 ⏱ 55 分钟
Python Agent + DuckDB:71 行黑盒崩溃调试实战
Python Agent + DuckDB:71 行黑盒崩溃调试实战

起因:一个看似普通的自动化任务

事情的起因是一个再普通不过的运维自动化任务:接收用户请求、搜索内部文档索引、汇总答案、交给审核人。没有什么特别之处。就是那种演示结束后、真正的业务流程开始时你才会写的 Python Agent。

然后某一次运行陷入了重试循环。

好在我在它烧掉 200 美元之前就发现了。实际的测试运行成本并不高。真正的问题在于规模——同样的错误循环、同样的文档搜索、同样的模型调用,被留在了一夜批处理任务中。估算下来,一个可以避免的缺陷,成本接近 200 美元。

它产出的答案看起来足够精致,足以通过一个疲惫审阅者的眼睛。但它背后的轨迹一点也不精致。Agent 调了正确的工具,却给了错误的输入;基于过时的上下文重试;汇总了旧的结果——每一轮都在支付代价。

就在那一刻,我停止了把 Agent 当成聊天功能来用。

我开始把它当成一个需要黑盒的系统。

不是一个仪表盘。不是一整套可观测性栈。不是又一个托管服务。

只是一个本地文件,能够回答:

我们将用纯 Python 构建这个黑盒,然后用 DuckDB 来查询它——就像一个迷你的崩溃数据库。

修复前后对比

修复前,调试是这样的:

最终答案错了。
模型可能产生了幻觉。
也许是搜索工具返回了脏数据。
也许是重试循环复用了旧消息。
也许是模型调用导致了成本飙升。

那不是调试。那是带着语法高亮的瞎猜。

修复后,调试变成了这样:

第 1 轮调用了 search_docs,但传入了错误的查询。
工具在 147.82 ms 后超时。
重试使用了过时的上下文。
守卫在 $0.0124 处终止了运行。
DuckDB 显示一次 tool_error 和一次 guard_stop。

同样的 bug。完全不同的体验。

问题的本质

一个普通的 Python 脚本通常只在一个地方失败。

一个 Agent 在整个链条上失败。

用户请求 → 模型决策 → 工具调用 → 工具结果 → 下一轮 → 最终答案
Agent 运行流程示意图
图1:Agent 运行流程——失败可以发生在链条上的任何一点

如果你只记录最终答案,你得到的是一篇日记。

如果你记录整个链条,你得到的是证据。

最简单实用的格式是 JSONL。每个事件一行。

{"type":"tool_start","tool":"search_docs","input":{"query":"rate limits"}}
{"type":"tool_end","tool":"search_docs","duration_ms":83.4,"ok":true}
{"type":"turn_end","turn":2,"total_cost_usd":0.0041}

JSONL 以一种恰到好处的方式"无聊"。它可以干净地追加写入,比一个大 JSON 文档更能承受崩溃,而且可以用常规工具搜索。

失败运行的 JSONL 轨迹截图
图2:一次失败运行的 JSONL 轨迹记录

一个真正干活的小型记录器

这就是记录器的代码。

它做四件事:

from __future__ import annotations

import json
import re
import time
import traceback
from contextlib import contextmanager
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Iterator
from uuid import uuid4


SECRET_KEYS = re.compile(
    r"(api[_-]?key|token|password|secret|authorization|cookie)",
    re.IGNORECASE,
)


@dataclass
class Event:
    run_id: str
    event_id: str
    type: str
    timestamp: float
    data: dict[str, Any] = field(default_factory=dict)


def sanitize(value: Any) -> Any:
    if isinstance(value, dict):
        cleaned = {}
        for key, item in value.items():
            if SECRET_KEYS.search(str(key)):
                cleaned[key] = "[redacted]"
            else:
                cleaned[key] = sanitize(item)
        return cleaned

    if isinstance(value, list):
        return [sanitize(item) for item in value]

    return value


class AgentBlackBox:
    def __init__(self, path: str | Path, run_id: str | None = None) -> None:
        self.path = Path(path)
        self.run_id = run_id or uuid4().hex
        self.path.parent.mkdir(parents=True, exist_ok=True)

    def record(self, event_type: str, **data: Any) -> None:
        event = Event(
            run_id=self.run_id,
            event_id=uuid4().hex,
            type=event_type,
            timestamp=time.time(),
            data=sanitize(data),
        )

        with self.path.open("a", encoding="utf-8") as file:
            file.write(json.dumps(asdict(event), default=str) + "
")

    @contextmanager
    def tool(self, name: str, **tool_input: Any) -> Iterator[None]:
        started = time.perf_counter()
        self.record("tool_start", tool=name, input=tool_input)

        try:
            yield
        except Exception as exc:
            self.record(
                "tool_error",
                tool=name,
                error_type=type(exc).__name__,
                error=str(exc),
                traceback=traceback.format_exc(limit=6),
                duration_ms=round((time.perf_counter() - started) * 1000, 2),
            )
            raise
        else:
            self.record(
                "tool_end",
                tool=name,
                ok=True,
                duration_ms=round((time.perf_counter() - started) * 1000, 2),
            )

sanitize() 函数并不完美。它不是保险库,只是安全带。

但它阻止了最尴尬的情况:建好了一个有用的调试轨迹,却把 API 密钥也静静地存了进去。

先包装一个工具

从第一个工具开始。不要在第一天就把所有东西都装上仪表。

import random
import time


def search_docs(query: str, api_key: str) -> list[str]:
    time.sleep(random.uniform(0.05, 0.2))

    if "timeout" in query:
        raise TimeoutError("Document search timed out")

    return [
        "JSONL works well for append-only traces.",
        "Context managers are useful around tool calls.",
        "DuckDB can query JSON files without a server.",
    ]

现在记录这次调用:

box = AgentBlackBox("traces/run.jsonl")

query = "python agent trace format"

with box.tool("search_docs", query=query, api_key="***"):
    docs = search_docs(query=query, api_key="***")

box.record("tool_result", tool="search_docs", result_count=len(docs))

打开 traces/run.jsonl,密钥已被打码:

{"tool":"search_docs","input":{"query":"python agent trace format","api_key":"[redacted]"}}

这个小细节很重要。调试不应该制造第二次事故。

加一个低成本运行守卫

大多数 Agent 失控的故事,都始于一个看起来无害的循环。

所以黑盒不仅要记录发生了什么,还要记录它何时拒绝继续。

class RunStopped(RuntimeError):
    pass


def stop_if_needed(
    box: AgentBlackBox,
    *,
    turn: int,
    max_turns: int,
    spent_usd: float,
    max_usd: float,
) -> None:
    box.record(
        "guard_check",
        turn=turn,
        max_turns=max_turns,
        spent_usd=round(spent_usd, 6),
        max_usd=round(max_usd, 6),
    )

    if turn > max_turns:
        box.record("guard_stop", reason="max_turns", turn=turn)
        raise RunStopped(f"Stopped at turn {turn}. Max turns is {max_turns}.")

    if spent_usd > max_usd:
        box.record("guard_stop", reason="budget", spent_usd=spent_usd)
        raise RunStopped(f"Stopped at ${spent_usd:.4f}. Budget is ${max_usd:.4f}.")

这不是精确的账单。在有真实 token 计数数据的时候,请使用你的模型提供商的返回结果。

这里的目标只是一个本地的绊网。你需要在运行停止时,留下一个明确的原因。

一个微型 Agent 循环

这个模拟循环保持了最少的活动部件。

把其中的模拟模型部分替换成你真正的模型调用即可。

def estimate_cost(input_tokens: int, output_tokens: int) -> float:
    return input_tokens * 0.0000005 + output_tokens * 0.0000015


def run_agent(question: str) -> str:
    box = AgentBlackBox("traces/run.jsonl")
    messages = [{"role": "user", "content": question}]
    spent_usd = 0.0
    max_turns = 3
    max_usd = 0.01

    box.record("run_start", question=question, max_turns=max_turns, max_usd=max_usd)

    for turn in range(1, max_turns + 1):
        stop_if_needed(
            box,
            turn=turn,
            max_turns=max_turns,
            spent_usd=spent_usd,
            max_usd=max_usd,
        )

        box.record("turn_start", turn=turn, message_count=len(messages))

        # 模拟模型选择了这个工具输入
        query = question if turn == 1 else "python jsonl duckdb traces"

        with box.tool("search_docs", query=query, api_key="***"):
            docs = search_docs(query=query, api_key="***")

        messages.append({"role": "tool", "content": "
".join(docs)})

        turn_cost = estimate_cost(
            input_tokens=sum(len(message["content"].split()) for message in messages),
            output_tokens=120,
        )
        spent_usd += turn_cost

        box.record(
            "turn_end",
            turn=turn,
            message_count=len(messages),
            turn_cost_usd=round(turn_cost, 6),
            total_cost_usd=round(spent_usd, 6),
        )

    answer = "Record every tool call as JSONL, then query failures after the run."
    box.record("run_end", answer=answer, total_cost_usd=round(spent_usd, 6))
    return answer

先用一个正常问题运行一次:

print(run_agent("How should I debug Python agent tools?"))

再扔一个坏问题给它:

print(run_agent("timeout during document search"))

第二次运行应该会失败,但这一次,它带着一条完整的轨迹失败。

如果为了测试想强制触发预算停止,临时把 max_usd = 0.0001。下一次守卫检查就会写入一个 guard_stop 事件,而不是让循环悄无声息地继续。

用 DuckDB 查询崩溃

这才是让 JSONL 不再只是日志记录、而真正成为调试工具的部分。

安装 DuckDB:

pip install duckdb

然后查询轨迹:

import duckdb


def query_trace(path: str = "traces/run.jsonl") -> None:
    con = duckdb.connect()

    con.sql(
        f"""
        create or replace view events as
        select *
        from read_json_auto('{path}');
        """
    )

    print("Event counts")
    con.sql(
        """
        select type, count(*) as events
        from events
        group by type
        order by events desc;
        """
    ).show()

    print("Tool errors")
    con.sql(
        """
        select
            data.tool as tool,
            data.error_type as error_type,
            data.error as error,
            data.duration_ms as duration_ms
        from events
        where type = 'tool_error';
        """
    ).show()

    print("Slow tools")
    con.sql(
        """
        select
            data.tool as tool,
            data.duration_ms as duration_ms
        from events
        where type = 'tool_end'
        order by data.duration_ms desc
        limit 5;
        """
    ).show()

然后运行:

query_trace()

输出效果类似这样:

DuckDB 查询 Agent 崩溃输出截图
图3:DuckDB 查询输出——崩溃数据一目了然
Event counts
+-------------+--------+
| type        | events |
+-------------+--------+
| guard_check |      4 |
| turn_start  |      3 |
| tool_start  |      3 |
| tool_end    |      2 |
| tool_error  |      1 |
| guard_stop  |      1 |
+-------------+--------+

现在崩溃行是一个查询结果,而不是一个谜:

Tool errors
+-------------+--------------+---------------------------+-------------+
| tool        | error_type   | error                     | duration_ms |
+-------------+--------------+---------------------------+-------------+
| search_docs | TimeoutError | Document search timed out |      147.82 |
+-------------+--------------+---------------------------+-------------+

你现在可以回答那些普通的 print 日志让人头疼的问题了:

这就是升级。

不是"我有日志。"

而是"我可以审问这次运行。"

在真实项目中我会记录什么

对于一个演示来说,上面的轨迹已经够了。

对于一个真实项目,我会添加这些字段:

我不会默认记录的是:

那条无聊的安全规则很简单:

记录足够调试行为的信息。不要记录足够伤害他人的信息。

用一句话总结这个模式

每次 Agent 运行都应该生成一个本地、追加写入、安全可存、易于查询的事件流,当进程崩溃后依旧可用。

这句话没有新的提示词技巧那么令人兴奋。

但它更可能拯救你的周末。

完整代码文件

以下是完整的示例代码:

from __future__ import annotations

import json
import random
import re
import time
import traceback
from contextlib import contextmanager
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Iterator
from uuid import uuid4


SECRET_KEYS = re.compile(
    r"(api[_-]?key|token|password|secret|authorization|cookie)",
    re.IGNORECASE,
)


@dataclass
class Event:
    run_id: str
    event_id: str
    type: str
    timestamp: float
    data: dict[str, Any] = field(default_factory=dict)


def sanitize(value: Any) -> Any:
    if isinstance(value, dict):
        return {
            key: "[redacted]" if SECRET_KEYS.search(str(key)) else sanitize(item)
            for key, item in value.items()
        }

    if isinstance(value, list):
        return [sanitize(item) for item in value]

    return value


class AgentBlackBox:
    def __init__(self, path: str | Path, run_id: str | None = None) -> None:
        self.path = Path(path)
        self.run_id = run_id or uuid4().hex
        self.path.parent.mkdir(parents=True, exist_ok=True)

    def record(self, event_type: str, **data: Any) -> None:
        event = Event(
            run_id=self.run_id,
            event_id=uuid4().hex,
            type=event_type,
            timestamp=time.time(),
            data=sanitize(data),
        )

        with self.path.open("a", encoding="utf-8") as file:
            file.write(json.dumps(asdict(event), default=str) + "
")

    @contextmanager
    def tool(self, name: str, **tool_input: Any) -> Iterator[None]:
        started = time.perf_counter()
        self.record("tool_start", tool=name, input=tool_input)

        try:
            yield
        except Exception as exc:
            self.record(
                "tool_error",
                tool=name,
                error_type=type(exc).__name__,
                error=str(exc),
                traceback=traceback.format_exc(limit=6),
                duration_ms=round((time.perf_counter() - started) * 1000, 2),
            )
            raise
        else:
            self.record(
                "tool_end",
                tool=name,
                ok=True,
                duration_ms=round((time.perf_counter() - started) * 1000, 2),
            )


class RunStopped(RuntimeError):
    pass


def stop_if_needed(
    box: AgentBlackBox,
    *,
    turn: int,
    max_turns: int,
    spent_usd: float,
    max_usd: float,
) -> None:
    box.record(
        "guard_check",
        turn=turn,
        max_turns=max_turns,
        spent_usd=round(spent_usd, 6),
        max_usd=round(max_usd, 6),
    )

    if turn > max_turns:
        box.record("guard_stop", reason="max_turns", turn=turn)
        raise RunStopped(f"Stopped at turn {turn}. Max turns is {max_turns}.")

    if spent_usd > max_usd:
        box.record("guard_stop", reason="budget", spent_usd=spent_usd)
        raise RunStopped(f"Stopped at ${spent_usd:.4f}. Budget is ${max_usd:.4f}.")


def search_docs(query: str, api_key: str) -> list[str]:
    time.sleep(random.uniform(0.05, 0.2))

    if "timeout" in query:
        raise TimeoutError("Document search timed out")

    return [
        "JSONL works well for append-only traces.",
        "Context managers are useful around tool calls.",
        "DuckDB can query JSON files without a server.",
    ]


def estimate_cost(input_tokens: int, output_tokens: int) -> float:
    return input_tokens * 0.0000005 + output_tokens * 0.0000015


def run_agent(question: str) -> str:
    box = AgentBlackBox("traces/run.jsonl")
    messages = [{"role": "user", "content": question}]
    spent_usd = 0.0
    max_turns = 3
    max_usd = 0.01

    box.record("run_start", question=question, max_turns=max_turns, max_usd=max_usd)

    for turn in range(1, max_turns + 1):
        stop_if_needed(
            box,
            turn=turn,
            max_turns=max_turns,
            spent_usd=spent_usd,
            max_usd=max_usd,
        )

        box.record("turn_start", turn=turn, message_count=len(messages))

        query = question if turn == 1 else "python jsonl duckdb traces"

        with box.tool("search_docs", query=query, api_key="***"):
            docs = search_docs(query=query, api_key="***")

        messages.append({"role": "tool", "content": "
".join(docs)})

        turn_cost = estimate_cost(
            input_tokens=sum(len(message["content"].split()) for message in messages),
            output_tokens=120,
        )
        spent_usd += turn_cost

        box.record(
            "turn_end",
            turn=turn,
            message_count=len(messages),
            turn_cost_usd=round(turn_cost, 6),
            total_cost_usd=round(spent_usd, 6),
        )

    answer = "Record every tool call as JSONL, then query failures after the run."
    box.record("run_end", answer=answer, total_cost_usd=round(spent_usd, 6))
    return answer


if __name__ == "__main__":
    print(run_agent("How should I debug Python agent tools?"))

在那个完整文件里,有一行代码值得多看两眼:

box.record("run_start", question=question, max_turns=max_turns, max_usd=max_usd)

这一行改变了程序的姿态。

运行不再是与模型的私密对话。它是一次有记录的执行,带有一条你可以检查、查询和改进的轨迹。

这就是演示和可信任系统之间的区别。

接下来你会添加什么:prompt 哈希、token 计数、截图、检查点、还是可回放的工具夹具?

——

🧑‍💻

zhirenhun

一个热爱技术的程序员,喜欢分享前沿AI知识和开发经验。

← 上一篇
Human-on-the-Loop:AI 审查 AI 的 PR —— airCloset cortex 的自动化代码审查流水线

📌 相关推荐

Human-on-the-Loop:AI 审查 AI 的 PR —— airCloset cortex 的自动化代码审查流水线
2026/6/1
用 50 行代码构建一个容器(第一部分)
2026/6/1
走向 Agent 记忆的标准模型
2026/5/31
← 返回文章列表