模块 08 - 生产部署 | 前置知识:createAgent 入门、流式输出深入
把 Agent 包成 HTTP 服务
写完 Agent 之后下一步:包成一个能被前端、移动端、其他服务调用的 HTTP API。这一节用 Hono 搭一个生产可用的服务,包括同步接口、SSE 流式接口、请求验证、错误处理。
为什么用 Hono?
- 体积小(~12kb),冷启动快
- TypeScript 优先,类型推导完整
- 同一份代码能跑在 Node.js / Bun / Cloudflare Workers / Vercel Edge
- 内置 SSE / WebSocket helper
Express 也行,写法略不同;本节末尾会给一个 Express 等价示例。
项目结构
src/
├── index.ts # 入口,挂中间件 + 路由
├── routes/
│ ├── chat.ts # 同步对话接口
│ ├── chat-stream.ts # SSE 流式接口
│ └── health.ts # 健康检查
├── lib/
│ ├── agent.ts # Agent 单例
│ ├── validation.ts # Zod schema
│ └── env.ts # 环境变量校验
└── middleware/
├── auth.ts
├── rate-limit.ts
└── error.tsAgent 单例与冷启动
Agent 创建本身不重,但 model client 内部有连接池、warm-up。一个 process 内复用同一个 Agent 实例:
// src/lib/agent.ts
import { createAgent } from "langchain";
import { ChatAnthropic } from "@langchain/anthropic";
import { tool } from "@langchain/core/tools";
import { z } from "zod";
const getWeather = tool(
async ({ city }) => {
// 真实场景调天气 API
return `${city} 22°C,多云`;
},
{
name: "get_weather",
description: "查询某个城市的实时天气",
schema: z.object({
city: z.string().describe("城市名"),
}),
}
);
// 模块级单例 —— import 时初始化一次
export const agent = createAgent({
model: new ChatAnthropic({
model: "claude-sonnet-4-6",
temperature: 0,
}),
tools: [getWeather],
systemPrompt: "你是一个简洁的助手。涉及实时信息时必须用工具。",
});Serverless 场景下每次冷启动会重建 agent,这是不可避免的;常驻进程(Docker / EC2)就完全没问题。
请求 Schema 与 Zod 验证
所有外部输入必须先过 schema。Zod 既做运行时验证又做 TS 类型推导:
// src/lib/validation.ts
import { z } from "zod";
export const ChatRequest = z.object({
message: z.string().min(1).max(10_000),
threadId: z.string().uuid().optional(), // 复用对话用
metadata: z.record(z.string()).optional(),
});
export type ChatRequest = z.infer<typeof ChatRequest>;
export const StreamRequest = ChatRequest;message 强制非空 + 限长(防长消息攻击)。threadId 用于复用对话历史,配合 checkpointer 用。
同步对话接口
// src/routes/chat.ts
import { Hono } from "hono";
import { zValidator } from "@hono/zod-validator";
import { agent } from "../lib/agent";
import { ChatRequest } from "../lib/validation";
const chat = new Hono();
chat.post("/chat", zValidator("json", ChatRequest), async (c) => {
const body = c.req.valid("json");
const result = await agent.invoke(
{ messages: [{ role: "user", content: body.message }] },
{
configurable: body.threadId ? { thread_id: body.threadId } : undefined,
metadata: body.metadata,
runName: "ChatTurn",
}
);
const lastMessage = result.messages.at(-1);
return c.json({
ok: true,
data: {
message: lastMessage?.content,
threadId: body.threadId,
steps: result.messages.length,
},
});
});
export default chat;几个要点:
zValidator校验失败时自动返回 400,body 不合法不会进 handlerconfigurable.thread_id传给 LangGraph 的 checkpointer 复用历史runName在 LangSmith 上做 trace 名字(参考 LangSmith Tracing)
SSE 流式接口
流式接口是另一个文件,因为响应模式完全不同。详细 SSE 部署细节在 流式接口部署,这里给最小实现:
// src/routes/chat-stream.ts
import { Hono } from "hono";
import { zValidator } from "@hono/zod-validator";
import { agent } from "../lib/agent";
import { StreamRequest } from "../lib/validation";
const stream = new Hono();
stream.post("/chat/stream", zValidator("json", StreamRequest), async (c) => {
const body = c.req.valid("json");
const abortController = new AbortController();
// 客户端断开就 abort,避免 Agent 跑完整个循环浪费 token
c.req.raw.signal.addEventListener("abort", () => abortController.abort());
// stream() 的 encoding 选项直接产出 SSE 格式的 ReadableStream
const sseStream = await agent.stream(
{ messages: [{ role: "user", content: body.message }] },
{
streamMode: "messages",
encoding: "text/event-stream",
configurable: body.threadId ? { thread_id: body.threadId } : undefined,
signal: abortController.signal,
runName: "ChatStream",
}
);
return new Response(sseStream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", // Nginx 禁 buffering
},
});
});
export default stream;注意这里直接用了 stream({ encoding: "text/event-stream" })——LangGraph 1.x 原生支持把流编码成 SSE,不需要手写 \n\ndata: 拼接。
鉴权中间件
最简单的 API Key 鉴权:
// src/middleware/auth.ts
import type { Context, Next } from "hono";
import { HTTPException } from "hono/http-exception";
const VALID_KEYS = new Set(
(process.env.API_KEYS ?? "").split(",").filter(Boolean)
);
export async function apiKeyAuth(c: Context, next: Next) {
const apiKey =
c.req.header("X-API-Key") ?? c.req.header("Authorization")?.replace("Bearer ", "");
if (!apiKey) {
throw new HTTPException(401, { message: "Missing API key" });
}
if (!VALID_KEYS.has(apiKey)) {
throw new HTTPException(403, { message: "Invalid API key" });
}
c.set("apiKey", apiKey);
await next();
}JWT 鉴权用 Hono 内置 jwt 中间件:
import { jwt } from "hono/jwt";
export const jwtAuth = jwt({
secret: process.env.JWT_SECRET!,
});生产中两种通常并存:B2B 用 API Key,C 端用 JWT。
限流
防止单用户耗尽你的 LLM 配额:
// src/middleware/rate-limit.ts
import type { Context, Next } from "hono";
import { RateLimiterMemory } from "rate-limiter-flexible";
// 单进程内存限流(多副本部署要换 RateLimiterRedis)
const limiter = new RateLimiterMemory({
points: 30, // 每个 key 允许 30 次
duration: 60, // 每 60 秒
});
export async function rateLimit(c: Context, next: Next) {
const key =
c.get("apiKey") ??
c.req.header("X-Forwarded-For") ??
c.req.header("CF-Connecting-IP") ??
"anonymous";
try {
const res = await limiter.consume(key);
c.header("X-RateLimit-Limit", "30");
c.header("X-RateLimit-Remaining", String(res.remainingPoints));
await next();
} catch (rej: any) {
c.header("Retry-After", String(Math.ceil(rej.msBeforeNext / 1000)));
return c.json({ ok: false, error: "Too many requests" }, 429);
}
}多副本部署必须换 Redis 版本(RateLimiterRedis),否则限流不准。
错误处理
统一拦截各类错误:
// src/middleware/error.ts
import type { Context } from "hono";
import { HTTPException } from "hono/http-exception";
import { ZodError } from "zod";
export function errorHandler(err: Error, c: Context) {
// Zod 验证错误(zValidator 抛出)
if (err instanceof ZodError) {
return c.json(
{
ok: false,
error: "Validation failed",
details: err.errors.map((e) => ({
path: e.path.join("."),
message: e.message,
})),
},
400
);
}
// Hono HTTP 异常(鉴权抛的 401/403 等)
if (err instanceof HTTPException) {
return c.json({ ok: false, error: err.message }, err.status);
}
// LLM provider 限流
if (err.message?.includes("rate_limit") || err.message?.includes("429")) {
return c.json(
{ ok: false, error: "Upstream LLM rate limit, please retry" },
429
);
}
// 上下文超长
if (err.message?.includes("context") && err.message?.includes("length")) {
return c.json({ ok: false, error: "Input too long" }, 400);
}
// 未知错误:日志记录详情,对外只暴露通用错误
console.error("[unhandled]", err);
return c.json({ ok: false, error: "Internal server error" }, 500);
}重要:错误消息绝对不要透传给客户端原始内容。LLM provider 的错误里经常带 API Key 片段、上游 URL 等敏感信息,必须脱敏。详细的输出脱敏在 安全防御。
健康检查
K8s / 容器编排靠两个探针:
// src/routes/health.ts
import { Hono } from "hono";
const health = new Hono();
// Liveness:进程是否活着,不检查依赖
health.get("/health/live", (c) => {
return c.json({ status: "ok", ts: Date.now() });
});
// Readiness:能否对外服务(依赖是否就绪)
health.get("/health/ready", async (c) => {
const checks: Record<string, boolean> = {};
// 检查 Anthropic 可达
try {
const r = await fetch("https://api.anthropic.com/v1/messages", {
method: "HEAD",
signal: AbortSignal.timeout(3000),
});
// HEAD 返回 405 也算可达
checks.anthropic = r.status > 0;
} catch {
checks.anthropic = false;
}
// 这里也可以加 Redis / Postgres 检查
// checks.redis = await pingRedis();
const allOk = Object.values(checks).every(Boolean);
return c.json(
{ status: allOk ? "ready" : "not_ready", checks, ts: Date.now() },
allOk ? 200 : 503
);
});
export default health;Liveness 失败 K8s 会重启 pod;Readiness 失败 K8s 会把 pod 从 Service 摘除但不重启。区分两者很重要。
完整入口
// src/index.ts
import "dotenv/config";
import { Hono } from "hono";
import { logger } from "hono/logger";
import { cors } from "hono/cors";
import { secureHeaders } from "hono/secure-headers";
import { serve } from "@hono/node-server";
import chat from "./routes/chat";
import stream from "./routes/chat-stream";
import health from "./routes/health";
import { apiKeyAuth } from "./middleware/auth";
import { rateLimit } from "./middleware/rate-limit";
import { errorHandler } from "./middleware/error";
const app = new Hono();
// 全局中间件
app.use("*", logger());
app.use("*", secureHeaders());
app.use(
"*",
cors({
origin: (process.env.CORS_ORIGINS ?? "*").split(","),
allowMethods: ["GET", "POST"],
allowHeaders: ["Content-Type", "X-API-Key", "Authorization"],
})
);
// 健康检查不需要鉴权
app.route("/", health);
// API 路由统一鉴权 + 限流
const api = new Hono();
api.use("*", apiKeyAuth);
api.use("*", rateLimit);
api.route("/", chat);
api.route("/", stream);
app.route("/api/v1", api);
// 全局错误兜底
app.onError(errorHandler);
const port = parseInt(process.env.PORT ?? "3000", 10);
console.log(`server listening on :${port}`);
serve({ fetch: app.fetch, port });启动:
npm install hono @hono/node-server @hono/zod-validator \
langchain @langchain/anthropic @langchain/core \
rate-limiter-flexible zod dotenv
ANTHROPIC_API_KEY=sk-ant-xxx \
API_KEYS=test-key-1,test-key-2 \
npx tsx src/index.ts测试:
# 同步
curl -X POST http://localhost:3000/api/v1/chat \
-H "Content-Type: application/json" \
-H "X-API-Key: test-key-1" \
-d '{"message": "北京今天天气"}'
# 流式
curl -N -X POST http://localhost:3000/api/v1/chat/stream \
-H "Content-Type: application/json" \
-H "X-API-Key: test-key-1" \
-d '{"message": "北京今天天气"}'Express 等价示例
不熟悉 Hono 的话,Express 写法差不多:
import "dotenv/config";
import express from "express";
import { z } from "zod";
import { agent } from "./lib/agent";
const app = express();
app.use(express.json({ limit: "100kb" }));
const ChatBody = z.object({ message: z.string().min(1).max(10_000) });
app.post("/api/v1/chat", async (req, res, next) => {
try {
const body = ChatBody.parse(req.body);
const result = await agent.invoke({
messages: [{ role: "user", content: body.message }],
});
res.json({ ok: true, data: { message: result.messages.at(-1)?.content } });
} catch (e) {
next(e);
}
});
app.listen(3000);功能跟 Hono 版一致,少几行类型推导。
实战建议
- 请求 body 限大小:
express.json({ limit: "100kb" })或 Hono 的bodyLimit,防止超长 payload - 超时设置:HTTP server 超时 > Agent 最长执行时间(一般 30-60s),但要小于负载均衡器的超时(避免静默断开)
- 优雅退出:进程收 SIGTERM 时先停止接收新请求、等 in-flight 请求完成、再关进程
- 日志结构化:用
pino或winston输出 JSON 日志,方便 ELK / Datadog 采集 - 不要把 stack trace 暴露给客户端:开发环境可以,生产一律返回 generic error message
小结
把 Agent 包成 HTTP 服务的关键不是路由本身,而是配套的鉴权 / 限流 / 验证 / 错误处理 / 健康检查这一套基础设施。Hono 给的脚手架够好,单文件入口能跑、能上 Cloudflare Workers。同步接口用 agent.invoke,流式用 agent.stream({ encoding: "text/event-stream" })。
下一节 流式接口部署 展开讲 SSE 在 Nginx / Cloudflare 后面的生产部署细节,以及 WebSocket 什么时候用得上。
本文摘自《LangChain.js Agent 开发权威指南》,作者递归客。
本书资源
- 源码仓库 · github.com/diguike/book-langchain-agent
- 在线阅读 · inferloop.dev/langchain-agent
- 所有书目 · inferloop.dev
继续阅读 · 同作者其他书
- 《Transformer 工程实战》从注意力机制到生产部署
- 《自己动手写 AI Agent》从 Claude Code 开源架构到你的第一个编程助手
- 《AI 时代的 CLI 工具开发实战》用 TypeScript 构建现代 CLI 工具
- 《LLM Infra 工程实战》从入门到实践
- 《Hermes Agent 实战》构建会成长的个人 AI Agent
- 《OpenClaw 源码解析》现代 Agent 系统的架构设计与工程实践
- 《Agent Memory 工程实战》从 claude-mem 源码到企业级记忆平台
- 《AI Token 中转站实战》从 0 搭建企业级 LLM 网关
- 《百万级 AI Agent 平台架构》智能客服 SaaS 实战
- 《Claude Code Skill 指南》
- 《Claude 插件官方指南》