模块 09 - 综合项目 | 前置:Tool 接口与定义、Output Parsers
这一章要做什么
做一个自然语言数据分析 Agent。用户给一个 CSV 或 SQLite 文件,再用中文提问,比如”上个月每个城市的订单量是多少?“,Agent 自己写 SQL、跑查询、生成图表配置、再用一段话把结论说出来。
技术挑战只有三件:
- 让 Agent 写正确的 SQL——比起业务复杂度,这其实是 prompt + schema 描述的工程问题
- 沙箱执行——只允许 SELECT,禁掉一切修改语句,绑定查询超时
- 结果可视化 + 解读——返回前端能直接渲染的 ECharts 配置,外加一段人话总结
架构
整个 Agent 只有 3 个工具,但每个工具都包含了不少防御性逻辑。模型选 Sonnet 4.6 而不是 Opus——SQL 生成是相对结构化的任务,Sonnet 已经够用且更便宜。
项目骨架
data-analysis/
├── package.json
├── tsconfig.json
├── src/
│ ├── cli.ts # 命令行入口
│ ├── agent.ts # createAgent 装配
│ ├── db.ts # 进程内 SQLite 会话
│ └── tools/
│ ├── load.ts # load_data
│ ├── sql.ts # run_sql(含安全验证)
│ └── chart.ts # plot_chart
└── data/
└── orders.csv # 示例数据进程内 SQLite 会话
数据加载和查询都在同一个内存 SQLite 实例里完成,避免文件状态在工具之间走丢:
// src/db.ts
import Database from "better-sqlite3";
// 全局单例:整个 CLI 会话共用一个 :memory: 数据库
let _db: Database.Database | null = null;
export function getDb(): Database.Database {
if (_db) return _db;
_db = new Database(":memory:");
_db.pragma("journal_mode = WAL");
// 安全开关:禁止附加新数据库、禁止用户函数
_db.pragma("trusted_schema = OFF");
return _db;
}
export function listTables(): string[] {
const db = getDb();
const rows = db.prepare(
"SELECT name FROM sqlite_master WHERE type='table'"
).all() as Array<{ name: string }>;
return rows.map((r) => r.name);
}
export function describeTable(name: string): Array<{
name: string;
type: string;
}> {
const db = getDb();
// pragma_table_info 是 SQLite 内置元数据,安全
return db.prepare(`PRAGMA table_info(${quoteIdent(name)})`).all() as Array<{
name: string;
type: string;
}>;
}
// 简单防御 identifier 注入
export function quoteIdent(name: string): string {
if (!/^[A-Za-z_][A-Za-z0-9_]*$/.test(name)) {
throw new Error(`非法标识符:${name}`);
}
return `"${name}"`;
}load_data 工具
// src/tools/load.ts
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import { readFile } from "node:fs/promises";
import { getDb, quoteIdent } from "../db.js";
export const loadData = tool(
async ({ path, tableName }) => {
const db = getDb();
if (path.endsWith(".csv")) {
const raw = await readFile(path, "utf-8");
const lines = raw.trim().split("\n");
if (lines.length < 2) return "CSV 行数过少";
const header = parseCsvLine(lines[0]);
const sample = parseCsvLine(lines[1]);
// 简单类型推断
const cols = header.map((h, i) => ({
name: h,
type: inferType(sample[i]),
}));
const ddl =
`CREATE TABLE ${quoteIdent(tableName)} (` +
cols.map((c) => `${quoteIdent(c.name)} ${c.type}`).join(", ") +
")";
db.exec(`DROP TABLE IF EXISTS ${quoteIdent(tableName)}`);
db.exec(ddl);
const insert = db.prepare(
`INSERT INTO ${quoteIdent(tableName)} VALUES (${header.map(() => "?").join(",")})`
);
const tx = db.transaction((rows: string[][]) => {
for (const r of rows) insert.run(...r);
});
tx(lines.slice(1).map(parseCsvLine));
return JSON.stringify({
table: tableName,
rows: lines.length - 1,
columns: cols,
});
}
if (path.endsWith(".sqlite") || path.endsWith(".db")) {
// 用 ATTACH 把外部数据库挂进来然后复制表
const tmp = new (await import("better-sqlite3")).default(path, {
readonly: true,
});
const tables = tmp
.prepare("SELECT name FROM sqlite_master WHERE type='table'")
.all() as Array<{ name: string }>;
const summary: Array<{ table: string; rows: number }> = [];
for (const t of tables) {
const rows = tmp.prepare(`SELECT * FROM ${quoteIdent(t.name)}`).all();
if (rows.length === 0) continue;
const cols = Object.keys(rows[0]);
db.exec(`DROP TABLE IF EXISTS ${quoteIdent(t.name)}`);
db.exec(
`CREATE TABLE ${quoteIdent(t.name)} (${cols.map((c) => quoteIdent(c)).join(", ")})`
);
const insert = db.prepare(
`INSERT INTO ${quoteIdent(t.name)} VALUES (${cols.map(() => "?").join(",")})`
);
const tx = db.transaction((rs: Array<Record<string, unknown>>) => {
for (const r of rs) insert.run(...cols.map((c) => r[c]));
});
tx(rows as Array<Record<string, unknown>>);
summary.push({ table: t.name, rows: rows.length });
}
tmp.close();
return JSON.stringify({ source: path, imported: summary });
}
return "不支持的文件类型,目前支持 .csv / .sqlite / .db";
},
{
name: "load_data",
description: `把外部数据文件加载到内存 SQLite 中以便 SQL 查询。
- CSV:需要指定 tableName
- SQLite/DB:直接导入所有表,tableName 参数被忽略`,
schema: z.object({
path: z.string().describe("文件路径"),
tableName: z.string().describe("CSV 加载后的表名(如 'orders')"),
}),
}
);
function parseCsvLine(line: string): string[] {
// 简化版 CSV 解析;生产环境用 csv-parse
const result: string[] = [];
let cur = "";
let inQuote = false;
for (let i = 0; i < line.length; i++) {
const c = line[i];
if (c === '"') inQuote = !inQuote;
else if (c === "," && !inQuote) {
result.push(cur);
cur = "";
} else cur += c;
}
result.push(cur);
return result;
}
function inferType(value: string): string {
if (/^-?\d+$/.test(value)) return "INTEGER";
if (/^-?\d+\.\d+$/.test(value)) return "REAL";
if (/^\d{4}-\d{2}-\d{2}/.test(value)) return "TEXT"; // SQLite 日期都用 TEXT
return "TEXT";
}run_sql 工具(沙箱)
这是最关键的工具。安全验证写得严,模型可以放心写 SQL:
// src/tools/sql.ts
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import { getDb, listTables, describeTable } from "../db.js";
const FORBIDDEN = [
/\bDROP\b/i,
/\bDELETE\b/i,
/\bTRUNCATE\b/i,
/\bALTER\b/i,
/\bCREATE\b/i,
/\bINSERT\b/i,
/\bUPDATE\b/i,
/\bREPLACE\b/i,
/\bATTACH\b/i,
/\bDETACH\b/i,
/\bPRAGMA\b/i,
/\bVACUUM\b/i,
];
function validateReadOnly(sql: string): void {
const trimmed = sql.trim();
if (!/^(SELECT|WITH)\b/i.test(trimmed)) {
throw new Error("只允许 SELECT 或 WITH 查询");
}
for (const pattern of FORBIDDEN) {
if (pattern.test(trimmed)) {
throw new Error(`SQL 包含被禁用的关键字:${pattern.source}`);
}
}
// 防止多语句
if (trimmed.replace(/;\s*$/, "").includes(";")) {
throw new Error("不允许多条语句");
}
}
export const runSql = tool(
async ({ sql }) => {
validateReadOnly(sql);
const db = getDb();
// 强制套上 LIMIT,避免一次拉百万行
let safe = sql.trim().replace(/;\s*$/, "");
if (!/\bLIMIT\b/i.test(safe)) safe += " LIMIT 1000";
// better-sqlite3 同步 API 不支持语句级超时;
// 生产环境需在 worker_threads 中运行或换 async 驱动(参见已知限制第 3 条)
const start = Date.now();
const rows = db.prepare(safe).all() as Array<Record<string, unknown>>;
const elapsed = Date.now() - start;
return JSON.stringify({
sql: safe,
rowCount: rows.length,
elapsedMs: elapsed,
rows: rows.slice(0, 200), // 截断回模型,避免吃太多 token
truncated: rows.length > 200,
});
},
{
name: "run_sql",
description: `在内存 SQLite 上执行一条只读 SQL(SELECT / WITH)。
重要:调用前必须先用 inspect_schema 看清楚表结构。结果集会自动加 LIMIT 1000。`,
schema: z.object({
sql: z.string().describe("一条只读 SQL(不要写分号结尾的多语句)"),
}),
}
);
export const inspectSchema = tool(
async () => {
const tables = listTables();
const result = tables.map((t) => ({
table: t,
columns: describeTable(t),
}));
return JSON.stringify(result);
},
{
name: "inspect_schema",
description: "列出当前数据库中所有表和它们的字段。写 SQL 前必看。",
schema: z.object({}),
}
);plot_chart 工具
// src/tools/chart.ts
import { tool } from "@langchain/core/tools";
import { z } from "zod";
export const plotChart = tool(
async ({ chartType, title, xField, yField, data }) => {
// 返回前端可直接渲染的 ECharts option
const option: Record<string, unknown> = {
title: { text: title },
tooltip: { trigger: "axis" },
};
if (chartType === "pie") {
option.series = [
{
type: "pie",
data: data.map((d) => ({ name: String(d[xField]), value: d[yField] })),
},
];
} else {
option.xAxis = { type: "category", data: data.map((d) => String(d[xField])) };
option.yAxis = { type: "value" };
option.series = [
{
type: chartType,
data: data.map((d) => d[yField]),
},
];
}
return JSON.stringify({ chartType, option });
},
{
name: "plot_chart",
description: `把 run_sql 的结果做成 ECharts 配置。
xField/yField 是 SQL 结果集中的字段名。data 是 run_sql 返回的 rows。`,
schema: z.object({
chartType: z.enum(["bar", "line", "pie", "scatter"]),
title: z.string(),
xField: z.string(),
yField: z.string(),
data: z.array(z.record(z.unknown())),
}),
}
);Agent 装配
// src/agent.ts
import { createAgent } from "langchain";
import { ChatAnthropic } from "@langchain/anthropic";
import { loadData } from "./tools/load.js";
import { runSql, inspectSchema } from "./tools/sql.js";
import { plotChart } from "./tools/chart.js";
const SYSTEM_PROMPT = `你是数据分析助手。用户给你一个数据文件路径和一个中文问题,你的任务:
1. 用 load_data 把文件导入内存数据库(如果还没导入)
2. 用 inspect_schema 看清楚表结构(绝不要凭想象写 SQL)
3. 写一条 SELECT 完成查询;只能 SELECT,没有写权限
4. 必要时用 plot_chart 生成图表配置
5. 用中文给出一段简洁的解读(具体数字,不要套话)
输出要求:
- 最后一条消息里同时包含:SQL、行数、图表配置 JSON、3-5 句业务解读
- 解读基于查询结果,不要编造数据`;
export const dataAgent = createAgent({
model: new ChatAnthropic({ model: "claude-sonnet-4-6", temperature: 0 }),
tools: [loadData, inspectSchema, runSql, plotChart],
systemPrompt: SYSTEM_PROMPT,
});CLI 入口
// src/cli.ts
import readline from "node:readline/promises";
import { stdin, stdout } from "node:process";
import { dataAgent } from "./agent.js";
const rl = readline.createInterface({ input: stdin, output: stdout });
const threadId = `cli-${Date.now()}`;
console.log("数据分析 Agent 已启动。先告诉我你要分析的文件路径,再问问题。\n");
while (true) {
const input = await rl.question("> ");
if (!input.trim()) continue;
if (input.trim() === "/exit") break;
for await (const chunk of dataAgent.stream(
{ messages: [{ role: "user", content: input }] },
{ configurable: { thread_id: threadId }, streamMode: "messages" }
)) {
const [msg] = chunk as [
{ contentBlocks?: Array<{ type: string; text?: string; name?: string }> }
];
for (const block of msg.contentBlocks ?? []) {
if (block.type === "text" && block.text) process.stdout.write(block.text);
if (block.type === "tool_call") process.stdout.write(`\n[调用 ${block.name}]\n`);
}
}
process.stdout.write("\n\n");
}
rl.close();package.json
{
"name": "data-analysis-agent",
"private": true,
"type": "module",
"engines": { "node": ">=20" },
"scripts": {
"dev": "tsx src/cli.ts"
},
"dependencies": {
"@langchain/anthropic": "^1.4.0",
"@langchain/core": "^1.4.0",
"@langchain/langgraph": "^1.0.0",
"better-sqlite3": "^11.3.0",
"langchain": "^1.4.0",
"zod": "^3.23.0"
},
"devDependencies": {
"@types/better-sqlite3": "^7.6.0",
"tsx": "^4.19.0",
"typescript": "^5.5.0"
}
}.env:
ANTHROPIC_API_KEY=示例数据
随便造一份 data/orders.csv 测试:
order_id,city,amount,created_at
O001,北京,299.00,2026-04-12
O002,上海,1599.00,2026-04-15
O003,广州,89.00,2026-04-20
O004,北京,499.00,2026-04-22
O005,上海,239.00,2026-05-01
O006,深圳,1099.00,2026-05-03跑起来
npm install
npm run dev交互示例:
> 加载 data/orders.csv 到 orders 表
[调用 load_data]
已导入 6 行,字段:order_id, city, amount, created_at。
> 每个城市的订单数和总金额是多少?画个柱状图
[调用 inspect_schema]
[调用 run_sql]
SELECT city, COUNT(*) AS order_count, SUM(amount) AS total_amount
FROM orders GROUP BY city ORDER BY total_amount DESC LIMIT 1000
[调用 plot_chart]
查询结果:
- 上海:2 单,¥1838.00
- 北京:2 单,¥798.00
- 深圳:1 单,¥1099.00
- 广州:1 单,¥89.00
解读:上海和深圳客单价显著高于北京、广州,但订单频次上海最高。
如果做活动投放,上海是首选;深圳值得做客单价稳定性分析。
图表配置(前端 ECharts.setOption 直接吃):
{"chartType":"bar","option":{...}}已知限制
- CSV 解析是手写的简化版:碰到带换行的引号字段会断。生产换 csv-parse。
- 类型推断只看第一行:第一行是整数后面有小数会被截断,要么人工传 schema,要么扫描多行。
- 没有真正的查询超时:better-sqlite3 是同步 API,超时控制需要在 worker 里跑或者换 async 驱动。
- 没接 PostgreSQL / MySQL:换驱动后
validateReadOnly的方言要重新评估(例如 PG 的CTE写法和 SQLite 不完全一致)。 - 图表是 ECharts only:要支持 vega-lite 或 Chart.js 得加分支。
- 大表会爆内存:
:memory:数据库放千万行级别数据会拖死进程。生产应该让 Agent 直接连只读副本数据库。 - 没有 PII 脱敏:示例数据干净,但真实场景里”列出所有用户手机号”这种问法要拦下来。加一个
wrapToolCallmiddleware 扫 SELECT 字段名。
小结
数据分析 Agent 看着简单,其实考验的是工具边界:
inspect_schema是写 SQL 前的强制前置步骤——在 system prompt 里反复强调,模型才会乖乖先看再写run_sql是只读、限语句、限行数、限时长——四道防线缺一不可,缺一道线下就会出生产事故- 结果集要截断再喂回模型——前 200 行足够 LLM 总结,全量给会浪费 token 且容易 OOM
下一节多 Agent 工作流平台 把视角拉高:当 Agent 不止一个、工作流可由用户自定义时,平台层要怎么设计。
本文摘自《LangChain.js Agent 开发权威指南》,作者递归客。
本书资源
- 源码仓库 · github.com/diguike/book-langchain-agent
- 在线阅读 · inferloop.dev/langchain-agent
- 所有书目 · inferloop.dev
继续阅读 · 同作者其他书
- 《Transformer 工程实战》从注意力机制到生产部署
- 《自己动手写 AI Agent》从 Claude Code 开源架构到你的第一个编程助手
- 《AI 时代的 CLI 工具开发实战》用 TypeScript 构建现代 CLI 工具
- 《LLM Infra 工程实战》从入门到实践
- 《Hermes Agent 实战》构建会成长的个人 AI Agent
- 《OpenClaw 源码解析》现代 Agent 系统的架构设计与工程实践
- 《Agent Memory 工程实战》从 claude-mem 源码到企业级记忆平台
- 《AI Token 中转站实战》从 0 搭建企业级 LLM 网关
- 《百万级 AI Agent 平台架构》智能客服 SaaS 实战
- 《Claude Code Skill 指南》
- 《Claude 插件官方指南》
Last updated on