添加 AI 压缩(接入 Claude API)
mini-mem 的基础版使用规则提取标题,信息密度有限。接入 Claude API 后可以实现真正的语义压缩。
实现思路
// src/services/compressor.ts
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic(); // 读取 ANTHROPIC_API_KEY 环境变量
export async function compressObservation(
toolName: string,
toolInput: Record<string, unknown>,
toolResponse: Record<string, unknown>
): Promise<{ type: string; title: string; narrative: string; facts: string[] }> {
const response = await client.messages.create({
model: 'claude-haiku-4-5-20251001', // 用最新的 Haiku 模型降低成本(请替换为当前可用的模型 ID)
max_tokens: 300,
messages: [{
role: 'user',
content: `分析以下工具调用,提取一条结构化观察。
工具: ${toolName}
输入: ${JSON.stringify(toolInput).slice(0, 1000)}
输出: ${JSON.stringify(toolResponse).slice(0, 500)}
以 JSON 格式返回:
{"type": "change|bugfix|discovery|decision|how-it-works", "title": "10字以内的标题", "narrative": "50字以内的叙述", "facts": ["事实1", "事实2"]}`
}]
});
const text = response.content[0].type === 'text' ? response.content[0].text : '';
return JSON.parse(text);
}异步化改造
AI 调用需要 5-15 秒,不能放在 Hook 同步路径中。改造方案:将 save-hook 改为入队模式,单独启动一个 Worker 处理队列。
// src/worker/processor.ts
import { ObservationStore } from '../db/store.js';
import { compressObservation } from '../services/compressor.js';
export async function processQueue(): Promise<void> {
const store = new ObservationStore();
const pending = store.getPending(10); // 取最多 10 条待处理
for (const item of pending) {
try {
const compressed = await compressObservation(
item.tool_name,
JSON.parse(item.tool_input),
JSON.parse(item.tool_response)
);
store.updateObservation(item.id, compressed);
store.markProcessed(item.id);
} catch (error) {
// 失败不阻塞,下次重试
console.error(`Failed to compress #${item.id}:`, error);
}
}
store.close();
}Worker 用 setInterval 定时拉取队列:
// src/worker/index.ts
import { processQueue } from './processor.js';
// 每 5 秒处理一次队列
setInterval(processQueue, 5000);实现 Timeline 时间线视图
Timeline 提供某条 Observation 前后的上下文视图:
// src/db/store.ts 中添加
getTimeline(anchorId: number, before: number = 3, after: number = 3) {
const anchor = this.db.prepare('SELECT created_at, project FROM observations WHERE id = ?').get(anchorId) as any;
if (!anchor) return { before: [], anchor: null, after: [] };
const beforeRows = this.db.prepare(`
SELECT id, type, title, created_at FROM observations
WHERE project = ? AND created_at < ? ORDER BY created_at DESC LIMIT ?
`).all(anchor.project, anchor.created_at, before);
const afterRows = this.db.prepare(`
SELECT id, type, title, created_at FROM observations
WHERE project = ? AND created_at > ? ORDER BY created_at ASC LIMIT ?
`).all(anchor.project, anchor.created_at, after);
return {
before: beforeRows.reverse(),
anchor: this.db.prepare('SELECT id, type, title, narrative, created_at FROM observations WHERE id = ?').get(anchorId),
after: afterRows
};
}在 MCP Server 中注册 timeline 工具:
{
name: 'timeline',
description: 'Get observations before/after a specific observation.',
inputSchema: {
type: 'object',
properties: {
anchor: { type: 'number', description: 'Observation ID' },
before: { type: 'number', description: 'Count before (default 3)' },
after: { type: 'number', description: 'Count after (default 3)' }
},
required: ['anchor']
}
}向量搜索集成
用 ChromaDB 或 Qdrant 实现语义搜索。以下示例使用 ChromaDB 的 Node.js 客户端:
// src/services/vector-store.ts
import { ChromaClient, Collection } from 'chromadb';
let collection: Collection | null = null;
async function getCollection(): Promise<Collection> {
if (collection) return collection;
const client = new ChromaClient({ path: 'http://localhost:8000' });
collection = await client.getOrCreateCollection({ name: 'mini-mem-observations' });
return collection;
}
export async function addObservation(id: number, text: string, metadata: Record<string, string>): Promise<void> {
const coll = await getCollection();
await coll.add({
ids: [`obs_${id}`],
documents: [text],
metadatas: [metadata]
});
}
export async function semanticSearch(query: string, limit: number = 10): Promise<Array<{ id: number; score: number }>> {
const coll = await getCollection();
const results = await coll.query({
queryTexts: [query],
nResults: limit
});
return (results.ids[0] || []).map((id, i) => ({
id: parseInt(id.replace('obs_', '')),
score: results.distances?.[0]?.[i] || 0
}));
}ChromaDB 需要单独启动服务:
# 使用 Docker
docker run -p 8000:8000 chromadb/chroma
# 或用 pip
pip install chromadb
chroma run --path ./chroma-dataViewer UI:React 实时展示面板
添加一个简单的 Web UI 展示 Observation 流:
// src/viewer/server.ts
import express from 'express';
import { ObservationStore } from '../db/store.js';
const app = express();
// SSE 端点:实时推送新 Observation
app.get('/api/stream', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 每 2 秒检查新数据
let lastId = 0;
const interval = setInterval(() => {
const store = new ObservationStore();
const newObs = store.getAfter(lastId, 10);
store.close();
for (const obs of newObs) {
res.write(`data: ${JSON.stringify(obs)}\n\n`);
lastId = Math.max(lastId, obs.id);
}
}, 2000);
req.on('close', () => clearInterval(interval));
});
// API 端点
app.get('/api/observations', (req, res) => {
const store = new ObservationStore();
const observations = store.getRecentByProject(req.query.project as string || '', 50);
store.close();
res.json(observations);
});
app.listen(37800, () => console.log('Viewer UI: http://localhost:37800'));前端用一个简单的 React 组件消费 SSE:
// src/viewer/App.tsx
function ObservationFeed() {
const [observations, setObservations] = useState<Observation[]>([]);
useEffect(() => {
const es = new EventSource('/api/stream');
es.onmessage = (event) => {
const obs = JSON.parse(event.data);
setObservations(prev => [obs, ...prev].slice(0, 100));
};
return () => es.close();
}, []);
return (
<div className="feed">
{observations.map(obs => (
<div key={obs.id} className="observation-card">
<span className="type">{obs.type}</span>
<h3>{obs.title}</h3>
<p>{obs.narrative}</p>
</div>
))}
</div>
);
}多项目隔离与切换
支持同时追踪多个项目的记忆,在 Context Injection 时只注入当前项目相关的内容:
// context-hook.ts 中的项目识别
function getProjectName(cwd: string): string {
// 策略 1:如果有 package.json,用其 name 字段
const pkgPath = path.join(cwd, 'package.json');
if (existsSync(pkgPath)) {
const pkg = JSON.parse(readFileSync(pkgPath, 'utf-8'));
if (pkg.name) return pkg.name;
}
// 策略 2:用 git 仓库根目录名
try {
const gitRoot = execSync('git rev-parse --show-toplevel', { cwd }).toString().trim();
return path.basename(gitRoot);
} catch {}
// 策略 3:用当前目录名
return path.basename(cwd);
}查询时自动按项目过滤:
SELECT * FROM observations WHERE project = ? ORDER BY created_at DESC LIMIT ?思考题
- AI 压缩每条 Observation 的成本约 $0.001(基于 Claude Haiku)。如果一天产生 200 条 Observation,一年的压缩成本是多少?如何在质量和成本之间取舍(比如只压缩超过一定长度的 Observation)?
- 向量搜索和 FTS5 全文搜索各有优劣。设计一个混合排序策略:什么情况下优先用向量搜索,什么情况下优先用 FTS5?
- 多项目支持中,项目识别依赖 package.json name 和 git 仓库名。如果用户在 monorepo 中工作(多个子项目共享一个 git root),如何正确识别当前子项目?
本书开源发布于 inferloop.dev,转载请注明出处。
至此,mini-mem 从基础的同步版本扩展到了具备 AI 压缩、向量搜索、实时 UI 和多项目支持的完整系统。下一部分将讨论如何将这类系统从个人工具升级到企业级平台。
Last updated on