Claude「永久大脑」:双模记忆系统与Conway Agent架构深度解析
摘要
2026年5月,AI领域迎来一次重大技术突破。Anthropic为Claude引入了全新的双模记忆系统——Memory Files与Dreams,配合7×24小时永不下线的Conway Agent平台,标志着AI Agent从「即用即忘」的对话模式,向「持久记忆」的智能助手模式迈出了关键一步。本文将深入剖析这一架构的技术原理、实现细节,并提供完整的Python/Go代码示例,帮助开发者理解并构建类似的AI记忆系统。
关键词:AI Agent、记忆系统、Claude、Conway、Memory Files、Dreams、持久记忆
一、背景:从「滚动便签」到「永久大脑」
1.1 传统AI记忆的困境
在Claude Memory Files之前,大多数AI助手(包括Claude本身)的记忆机制本质上是一张「滚动便签」——将用户的所有偏好、背景和习惯压缩成一段统一的总结性记忆。这种方式简单有效,但问题也随之而来:
传统记忆模式的问题:
1. 信息容量有限 - 摘要长度有上限
2. 旧信息被覆盖 - 新对话会覆盖重要旧记忆
3. 话题互相干扰 - 不同项目的信息混杂在一起
4. 检索效率低下 - 需要大海捞针式查找
当用户与Claude讨论一个长期写作项目时,它可能已经完全忘记用户上周聊过的另一个项目细节。这种「健忘症」严重制约了AI作为真正智能助手的能力边界。
1.2 为什么不能只靠扩大上下文窗口?
很多人会问:扩大上下文窗口不就能解决问题了吗?答案是:不行。上下文窗口有几个本质局限:
| 维度 | 局限性 |
|---|---|
| 单次会话 | 对话关闭后数据即销毁 |
| 线性增长 | 记忆越多,有效信息密度越低 |
| 检索效率 | 大海捞针式查找,延迟高 |
| 容量成本 | Token费用随上下文线性增长 |
Claude的Memory Files选择了一条不同的路:外部化存储 + 结构化索引。这不仅是技术方案的改变,更是一种范式转换。
二、核心原理:双模记忆系统架构

2.1 系统整体架构
Claude的双模记忆系统由三大核心组件构成:
┌─────────────────────────────────────────────────────────────┐
│ Claude 双模记忆系统 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Classic │ │ Memory Files │ │ Dreams │ │
│ │ Memory │ │ (文件记忆) │ │ (梦境) │ │
│ │ (经典模式) │ │ │ │ │ │
│ │ │ │ • 话题组织 │ │ • 异步整合 │ │
│ │ 单一摘要 │ │ • 无限容量 │ │ • 合并重复 │ │
│ │ 滚动覆盖 │ │ • AI自组织 │ │ • 更新过期 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ ▲ 触发条件 ▲ │
│ │ (5次对话或24小时) │ │
│ │ │ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Memory Stores API │ │
│ │ (记忆存储接口) │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
2.2 Classic Memory:传统单摘要模式
Classic Memory是Claude从上线之初沿用至今的记忆模式。它的工作原理如下:
# Classic Memory 的简化实现
class ClassicMemory:
"""
传统单摘要记忆模式
每次对话后,将所有新信息压缩进一个摘要
"""
def __init__(self, max_summary_length: int = 4096):
self.summary = "" # 单一摘要
self.max_length = max_summary_length
self.conversation_history = []
def add_interaction(self, user_input: str, assistant_response: str):
"""添加新的对话交互"""
self.conversation_history.append({
"user": user_input,
"assistant": assistant_response,
"timestamp": datetime.now()
})
# 当累积足够信息时,生成新摘要
if len(self.conversation_history) >= 5:
self._generate_summary()
def _generate_summary(self):
"""将对话历史压缩成单一摘要(覆盖旧摘要)"""
# 这里会调用LLM进行摘要生成
# 问题:旧信息会被新信息覆盖
prompt = f"""
将以下对话压缩成一个简短的摘要:
{self.conversation_history}
摘要应包含:
- 用户的主要偏好
- 重要的项目背景
- 长期目标或计划
"""
# 调用LLM生成摘要...
self.summary = generated_summary
# 清空历史(信息已压缩进摘要)
self.conversation_history = []
def get_context(self) -> str:
"""获取记忆上下文"""
return self.summary
问题分析:
- ✅ 实现简单
- ✅ 推理时Token消耗低
- ❌ 容量有限(max_summary_length)
- ❌ 信息丢失(压缩过程不可逆)
- ❌ 话题干扰(不同项目混杂)
2.3 Memory Files:结构化文档记忆
Memory Files是Anthropic推出的革命性新架构。它为Claude提供了一个「内置个人Wiki」:
# Memory Files 的核心实现
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
import hashlib
@dataclass
class MemoryDocument:
"""记忆文档结构"""
doc_id: str
topic: str # 话题分类
title: str # 文档标题
content: str # 文档内容
created_at: datetime
updated_at: datetime
tags: List[str] = field(default_factory=list)
metadata: Dict = field(default_factory=dict)
class MemoryFiles:
"""
文件记忆系统
按话题组织的结构化文档存储
"""
def __init__(self):
# 文档存储(按话题分组)
self.documents: Dict[str, List[MemoryDocument]] = {}
# 全局索引
self.global_index: Dict[str, str] = {} # keyword -> doc_id
# 向量索引(用于语义检索)
self.vector_index: Dict[str, List[float]] = {}
def store(self, conversation: str, context: Optional[dict] = None):
"""
存储对话信息,自动提取并归档
"""
# 1. 使用LLM提取关键信息并分类
extracted = self._extract_key_info(conversation, context)
# 2. 确定话题分类
topic = self._classify_topic(extracted)
# 3. 创建或更新文档
doc = self._create_or_update_document(topic, extracted)
# 4. 更新全局索引
self._update_index(doc)
return doc
def _extract_key_info(self, conversation: str, context: Optional[dict]) -> dict:
"""
使用LLM提取对话中的关键信息
"""
prompt = f"""
从以下对话中提取关键信息,返回JSON格式:
对话内容:
{conversation}
上下文:{context or {}}
返回格式:
{{
"key_facts": ["关键事实列表"],
"preferences": ["用户偏好列表"],
"action_items": ["待办事项列表"],
"entities": ["提到的实体列表"],
"emotional_tone": "情感基调"
}}
"""
# 调用Claude API提取信息
extracted = call_claude_api(prompt)
return extracted
def _classify_topic(self, extracted: dict) -> str:
"""
使用LLM对信息进行话题分类
"""
topics = [
"技术开发", "产品设计", "项目管理", "日常生活",
"财务投资", "健康运动", "旅行计划", "学习研究",
"社交关系", "娱乐休闲"
]
prompt = f"""
将以下信息分类到最合适的话题中:
提取的信息:{extracted}
可选话题:{topics}
只返回一个话题名称。
"""
return call_claude_api(prompt).strip()
def _create_or_update_document(
self,
topic: str,
extracted: dict
) -> MemoryDocument:
"""
创建新文档或更新已有文档
"""
# 检查是否存在该话题的文档
if topic in self.documents and self.documents[topic]:
# 更新现有文档
doc = self.documents[topic][-1]
doc.content = self._merge_content(doc.content, extracted)
doc.updated_at = datetime.now()
else:
# 创建新文档
doc = MemoryDocument(
doc_id=self._generate_doc_id(),
topic=topic,
title=self._generate_title(extracted),
content=str(extracted),
created_at=datetime.now(),
updated_at=datetime.now(),
tags=self._extract_tags(extracted)
)
if topic not in self.documents:
self.documents[topic] = []
self.documents[topic].append(doc)
return doc
def _merge_content(self, existing: str, new: dict) -> str:
"""
合并新旧内容,保留重要历史信息
"""
prompt = f"""
将新信息整合到现有文档中:
现有文档:
{existing}
新信息:
{new}
规则:
1. 保留所有重要的历史信息
2. 用新信息更新过时的内容
3. 合并重复内容
4. 保持文档结构清晰
返回更新后的完整文档内容。
"""
return call_claude_api(prompt)
def retrieve(self, query: str, max_docs: int = 5) -> List[MemoryDocument]:
"""
按需检索相关记忆
"""
# 1. 确定查询相关的话题
relevant_topics = self._find_relevant_topics(query)
# 2. 收集相关文档
candidates = []
for topic in relevant_topics:
if topic in self.documents:
candidates.extend(self.documents[topic])
# 3. 语义排序
ranked = self._rank_documents(query, candidates)
return ranked[:max_docs]
def _find_relevant_topics(self, query: str) -> List[str]:
"""
找出与查询相关的话题
"""
all_topics = list(self.documents.keys())
prompt = f"""
判断以下查询与哪些话题相关:
查询:{query}
可选话题:{all_topics}
返回最相关的3个话题,按相关性排序。
"""
return call_claude_api(prompt).split(',')
def _rank_documents(
self,
query: str,
documents: List[MemoryDocument]
) -> List[MemoryDocument]:
"""
使用向量相似度对文档排序
"""
query_vector = self._embed_text(query)
scored = []
for doc in documents:
if doc.doc_id in self.vector_index:
similarity = cosine_similarity(
query_vector,
self.vector_index[doc.doc_id]
)
scored.append((similarity, doc))
scored.sort(key=lambda x: x[0], reverse=True)
return [doc for _, doc in scored]
def _embed_text(self, text: str) -> List[float]:
"""文本向量化"""
# 使用Embedding API
return embedding_api.embed(text)
def _generate_doc_id(self) -> str:
"""生成唯一文档ID"""
return hashlib.md5(
str(datetime.now()).encode()
).hexdigest()[:12]
def _generate_title(self, content: dict) -> str:
"""生成文档标题"""
prompt = f"为以下内容生成一个简短的描述性标题:{content}"
return call_claude_api(prompt)
def _extract_tags(self, content: dict) -> List[str]:
"""提取标签"""
return content.get('entities', [])
Memory Files的核心优势:
| 特性 | Classic Memory | Memory Files |
|---|---|---|
| 容量 | 4K Token上限 | 理论无限 |
| 组织方式 | 单一摘要 | 按话题分类 |
| 更新方式 | 覆盖旧信息 | 增量整合 |
| 检索效率 | O(n) | O(1) 话题定位 |
| 用户控制 | 有限 | 完全可控 |
2.4 Dreams:REM睡眠式记忆整合
Dreams是Anthropic从人类神经科学中获得的灵感。在人类睡眠的REM阶段,大脑会回放白天的经历、强化重要记忆、丢弃无用的噪声信息。Anthropic把这个机制搬到了Claude身上。
# Dreams 记忆整合系统
import asyncio
from enum import Enum
from typing import List, Optional
from dataclasses import dataclass
import httpx
class DreamStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELED = "canceled"
@dataclass
class DreamInput:
"""梦境输入"""
type: str # "memory_store" 或 "sessions"
memory_store_id: Optional[str] = None
session_ids: Optional[List[str]] = None
@dataclass
class DreamOutput:
"""梦境输出"""
type: str
memory_store_id: str
@dataclass
class Dream:
"""梦境任务"""
id: str
status: DreamStatus
inputs: List[DreamInput]
outputs: List[DreamOutput]
model: str
instructions: str
created_at: datetime
ended_at: Optional[datetime]
usage: dict
class DreamsService:
"""
梦境记忆整合服务
异步后台执行,定期整理记忆
"""
def __init__(self, api_key: str):
self.client = httpx.AsyncClient(
base_url="https://api.anthropic.com",
headers={"x-api-key": api_key}
)
self.trigger_conditions = {
"min_conversations": 5,
"min_hours": 24
}
async def create_dream(
self,
memory_store_id: str,
session_ids: List[str],
model: str = "claude-opus-4-7",
instructions: Optional[str] = None
) -> Dream:
"""
创建梦境任务
参数:
memory_store_id: 现有记忆存储ID
session_ids: 要分析的会话ID列表
model: 用于做梦的模型
instructions: 额外的整合指导
返回:
Dream: 梦境任务对象
"""
response = await self.client.post(
"/v1/beta/dreams/create",
json={
"inputs": [
{"type": "memory_store", "memory_store_id": memory_store_id},
{"type": "sessions", "session_ids": session_ids}
],
"model": model,
"instructions": instructions or "整合记忆,去重更新"
}
)
return Dream(**response.json())
async def get_dream(self, dream_id: str) -> Dream:
"""获取梦境任务状态"""
response = await self.client.get(f"/v1/beta/dreams/{dream_id}")
return Dream(**response.json())
async def list_dreams(
self,
limit: int = 20,
status: Optional[DreamStatus] = None
) -> List[Dream]:
"""列出梦境任务"""
params = {"limit": limit}
if status:
params["status"] = status.value
response = await self.client.get(
"/v1/beta/dreams/list",
params=params
)
return [Dream(**d) for d in response.json()["dreams"]]
async def cancel_dream(self, dream_id: str) -> bool:
"""取消梦境任务"""
response = await self.client.post(
f"/v1/beta/dreams/{dream_id}/cancel"
)
return response.status_code == 200
def should_trigger_dream(self, memory_store) -> bool:
"""
判断是否应该触发梦境
触发条件:
1. 累积5次对话
2. 或距离上次整合超过24小时
"""
# 检查对话数量
conversation_count = memory_store.get_conversation_count()
if conversation_count >= self.trigger_conditions["min_conversations"]:
return True
# 检查时间间隔
last_dream_time = memory_store.get_last_dream_time()
if last_dream_time:
hours_since = (datetime.now() - last_dream_time).total_seconds() / 3600
if hours_since >= self.trigger_conditions["min_hours"]:
return True
return False
async def consolidation_logic(
self,
memory_store,
sessions: List[dict]
) -> dict:
"""
记忆整合的核心逻辑
执行步骤:
1. 合并重复条目
2. 更新过时信息
3. 解决逻辑矛盾
4. 挖掘隐藏模式
"""
all_memories = []
# 收集现有记忆
for doc in memory_store.documents.values():
all_memories.extend(doc)
# 收集会话历史
for session in sessions:
all_memories.extend(session.get("memories", []))
# 第一步:合并重复条目
merged = self._merge_duplicates(all_memories)
# 第二步:更新过时信息
updated = self._update_stale_entries(merged)
# 第三步:解决逻辑矛盾
resolved = self._resolve_conflicts(updated)
# 第四步:优化索引结构
optimized = self._optimize_index(resolved)
return optimized
def _merge_duplicates(self, memories: List[dict]) -> List[dict]:
"""
合并重复的记忆条目
使用语义相似度判断重复
"""
merged = []
seen_signatures = {}
for memory in memories:
# 生成记忆签名
signature = self._generate_signature(memory)
# 检查是否已存在相似记忆
if signature in seen_signatures:
# 合并到已有记忆
existing = seen_signatures[signature]
merged_memory = self._combine_memories(existing, memory)
seen_signatures[signature] = merged_memory
else:
seen_signatures[signature] = memory
return list(seen_signatures.values())
def _generate_signature(self, memory: dict) -> str:
"""
生成记忆签名
用于识别重复记忆
"""
# 使用关键字段生成哈希
key_info = f"{memory.get('topic', '')}_{memory.get('entity', '')}"
return hashlib.md5(key_info.encode()).hexdigest()[:8]
def _combine_memories(self, existing: dict, new: dict) -> dict:
"""
合并两条记忆
保留最新和最重要的信息
"""
prompt = f"""
合并以下两条相关记忆,保留所有重要信息:
记忆1:{existing}
记忆2:{new}
规则:
1. 时间信息以最新的为准
2. 保留所有独特的细节
3. 合并重复描述
"""
return call_claude_api(prompt)
def _update_stale_entries(self, memories: List[dict]) -> List[dict]:
"""
更新过时的记忆条目
例如:将「昨天我们决定用Redis」更新为
「2026年5月15日我们决定用Redis」
"""
updated = []
current_time = datetime.now()
for memory in memories:
if self._is_stale(memory, current_time):
memory = self._refresh_timestamp(memory, current_time)
updated.append(memory)
return updated
def _is_stale(self, memory: dict, current_time: datetime) -> bool:
"""判断记忆是否过时"""
last_update = memory.get("updated_at")
if not last_update:
return True
days_since = (current_time - last_update).days
return days_since > 7 # 7天未更新的记忆视为过时
def _refresh_timestamp(self, memory: dict, current_time: datetime) -> dict:
"""刷新记忆时间戳"""
prompt = f"""
将以下记忆中的相对时间更新为绝对时间:
记忆:{memory}
当前时间:{current_time.strftime('%Y年%m月%d日')}
只更新时间相关的内容,其他不变。
"""
memory["content"] = call_claude_api(prompt)
memory["updated_at"] = current_time
return memory
def _resolve_conflicts(self, memories: List[dict]) -> List[dict]:
"""
解决记忆中的逻辑矛盾
例如:用户曾说喜欢喝咖啡,但后来又说戒了咖啡
"""
conflicts = self._find_conflicts(memories)
for conflict_group in conflicts:
resolved = self._resolve_conflict_group(conflict_group)
# 用解决后的记忆替换原有记忆
memories = [m for m in memories if m not in conflict_group]
memories.append(resolved)
return memories
def _find_conflicts(self, memories: List[dict]) -> List[List[dict]]:
"""找出相互矛盾的记忇组"""
conflicts = []
for i, mem1 in enumerate(memories):
for mem2 in memories[i+1:]:
if self._are_contradictory(mem1, mem2):
conflicts.append([mem1, mem2])
return conflicts
def _are_contradictory(self, mem1: dict, mem2: dict) -> bool:
"""判断两条记忆是否矛盾"""
# 使用LLM判断逻辑矛盾
prompt = f"""
判断以下两条记忆是否存在逻辑矛盾:
记忆1:{mem1}
记忆2:{mem2}
只回答「是」或「否」。
"""
result = call_claude_api(prompt).strip()
return result == "是"
def _resolve_conflict_group(self, conflict_group: List[dict]) -> dict:
"""解决一组矛盾记忆"""
prompt = f"""
分析并解决以下矛盾的记忆,给出一个合理的综合判断:
矛盾记忆:{conflict_group}
考虑:
1. 哪条记忆是最新发生的
2. 哪条记忆更符合常理
3. 用户可能改变了偏好
返回解决后的记忆。
"""
return call_claude_api(prompt)
def _optimize_index(self, memories: List[dict]) -> dict:
"""
优化索引结构
提高检索效率
"""
# 按话题重新分组
topics = {}
for memory in memories:
topic = memory.get("topic", "未分类")
if topic not in topics:
topics[topic] = []
topics[topic].append(memory)
# 生成新的全局索引
new_index = {}
for topic, mems in topics.items():
for mem in mems:
keywords = self._extract_keywords(mem)
for kw in keywords:
if kw not in new_index:
new_index[kw] = []
new_index[kw].append(mem.get("id"))
return {
"topics": topics,
"index": new_index,
"total_count": len(memories)
}
三、Conway Agent:7×24永不下线的Agent平台
3.1 Conway架构概述
Conway是Anthropic推出的终极Agent平台,它与Memory Files深度整合,实现了真正的「永不下线」智能体:
┌─────────────────────────────────────────────────────────────┐
│ Conway Agent Platform │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Conway Runtime │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Search │ │ Chat │ │ System │ │ │
│ │ │ 搜索 │ │ 对话 │ │ 系统 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ │ 常驻后台 │ 事件监听 │ Webhook接收 │ 自主触发 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 核心能力 │ │
│ │ • 浏览器自动化操控 │ │
│ │ • Claude Code执行 │ │
│ │ • CNW ZIP扩展包格式 │ │
│ │ • Webhook信号接收 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Memory Files 集成 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 存储 │◄─►│ 检索 │◄─►│ 整合 │ │ │
│ │ │ Storage │ │ Recall │ │ Dreams │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
3.2 Conway核心实现
// Conway Agent 的 Go 实现
package conway
import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
"time"
)
// Event 代表Conway可以处理的事件类型
type EventType string
const (
EventWebhook EventType = "webhook"
EventSchedule EventType = "schedule"
EventUserMessage EventType = "user_message"
EventSystemNotify EventType = "system_notification"
)
// Event 事件结构
type Event struct {
ID string `json:"id"`
Type EventType `json:"type"`
Payload map[string]interface{} `json:"payload"`
Timestamp time.Time `json:"timestamp"`
Source string `json:"source"`
}
// Action Conway执行的动作
type Action struct {
Type string `json:"type"`
Target string `json:"target"`
Parameters map[string]interface{} `json:"parameters"`
Result interface{} `json:"result,omitempty"`
Error error `json:"error,omitempty"`
}
// MemoryStore 记忆存储接口
type MemoryStore interface {
Store(ctx context.Context, event Event) error
Retrieve(ctx context.Context, query string, limit int) ([]MemoryDocument, error)
List(ctx context.Context, topic string) ([]MemoryDocument, error)
}
// MemoryDocument 记忆文档
type MemoryDocument struct {
ID string `json:"id"`
Topic string `json:"topic"`
Title string `json:"title"`
Content string `json:"content"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
Tags []string `json:"tags"`
Metadata map[string]interface{} `json:"metadata"`
}
// ConwayConfig Conway配置
type ConwayConfig struct {
MemoryStore MemoryStore
ClaudeEndpoint string
WebhookSecret string
EnableBrowser bool
EnableCodeExec bool
}
// Conway Agent主结构
type Conway struct {
config ConwayConfig
eventQueue chan Event
handlers map[EventType][]EventHandler
memory MemoryStore
mu sync.RWMutex
running bool
cancel context.CancelFunc
}
// EventHandler 事件处理器
type EventHandler interface {
Handle(ctx context.Context, event Event) ([]Action, error)
CanHandle(event Event) bool
}
// NewConway 创建新的Conway实例
func NewConway(config ConwayConfig) *Conway {
return &Conway{
config: config,
eventQueue: make(chan Event, 1000),
handlers: make(map[EventType][]EventHandler),
memory: config.MemoryStore,
}
}
// RegisterHandler 注册事件处理器
func (c *Conway) RegisterHandler(handler EventHandler) {
c.mu.Lock()
defer c.mu.Unlock()
// 根据handler能力自动注册到对应事件类型
// 实际实现中需要反射或接口方法判断
c.handlers[EventWebhook] = append(c.handlers[EventWebhook], handler)
}
// Start 启动Conway Agent
func (c *Conway) Start(ctx context.Context) error {
c.mu.Lock()
if c.running {
c.mu.Unlock()
return fmt.Errorf("Conway is already running")
}
c.running = true
c.mu.Unlock()
// 创建派生context
ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel
// 启动事件处理循环
go c.eventLoop(ctx)
// 启动记忆整合协程
go c.dreamLoop(ctx)
log.Println("Conway Agent started")
return nil
}
// Stop 停止Conway Agent
func (c *Conway) Stop() error {
c.mu.Lock()
defer c.mu.Unlock()
if !c.running {
return fmt.Errorf("Conway is not running")
}
c.cancel()
c.running = false
close(c.eventQueue)
log.Println("Conway Agent stopped")
return nil
}
// eventLoop 事件处理主循环
func (c *Conway) eventLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case event, ok := <-c.eventQueue:
if !ok {
return
}
c.processEvent(ctx, event)
}
}
}
// processEvent 处理单个事件
func (c *Conway) processEvent(ctx context.Context, event Event) {
log.Printf("Processing event: %s (%s)", event.ID, event.Type)
// 1. 存储事件到记忆
if err := c.memory.Store(ctx, event); err != nil {
log.Printf("Failed to store event: %v", err)
}
// 2. 检索相关记忆
relevantMemories, err := c.memory.Retrieve(ctx, event.Payload["query"], 5)
if err != nil {
log.Printf("Failed to retrieve memories: %v", err)
relevantMemories = []MemoryDocument{}
}
// 3. 分发给handlers处理
c.mu.RLock()
handlers := c.handlers[event.Type]
c.mu.RUnlock()
for _, handler := range handlers {
if handler.CanHandle(event) {
actions, err := handler.Handle(ctx, event)
if err != nil {
log.Printf("Handler error: %v", err)
continue
}
// 执行actions
for _, action := range actions {
c.executeAction(ctx, action)
}
}
}
// 4. 生成新的Conway事件供后续处理
c.emitEvent(ctx, Event{
ID: fmt.Sprintf("derived_%d", time.Now().UnixNano()),
Type: EventSystemNotify,
Payload: map[string]interface{}{
"original_event": event.ID,
"memories_used": len(relevantMemories),
"actions_taken": len(handlers),
},
Timestamp: time.Now(),
Source: "conway",
})
}
// executeAction 执行动作
func (c *Conway) executeAction(ctx context.Context, action Action) {
log.Printf("Executing action: %s -> %s", action.Type, action.Target)
switch action.Type {
case "browser_navigate":
// 浏览器导航
c.browserNavigate(ctx, action.Target)
case "claude_code":
// 执行Claude Code
c.runClaudeCode(ctx, action.Parameters)
case "webhook_send":
// 发送webhook
c.sendWebhook(ctx, action.Target, action.Parameters)
case "memory_update":
// 更新记忆
c.updateMemory(ctx, action.Parameters)
default:
log.Printf("Unknown action type: %s", action.Type)
}
}
// EmitEvent 向Conway发送事件
func (c *Conway) EmitEvent(event Event) {
c.eventQueue <- event
}
// emitEvent 内部事件发射
func (c *Conway) emitEvent(ctx context.Context, event Event) {
select {
case c.eventQueue <- event:
default:
log.Println("Event queue full, dropping event")
}
}
// dreamLoop 梦境整合循环
func (c *Conway) dreamLoop(ctx context.Context) {
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
conversationCount := 0
maxConversations := 5
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.triggerDream(ctx)
}
conversationCount++
if conversationCount >= maxConversations {
c.triggerDream(ctx)
conversationCount = 0
}
}
}
// triggerDream 触发梦境整合
func (c *Conway) triggerDream(ctx context.Context) {
log.Println("Triggering dream consolidation...")
// 获取所有记忆
memories, err := c.consolidateMemories(ctx)
if err != nil {
log.Printf("Dream consolidation failed: %v", err)
return
}
log.Printf("Dream completed: %d memories consolidated", len(memories))
}
// consolidateMemories 执行记忆整合
func (c *Conway) consolidateMemories(ctx context.Context) ([]MemoryDocument, error) {
// 1. 获取所有会话
sessions := c.getRecentSessions(ctx)
// 2. 获取现有记忆存储
allMemories, err := c.getAllMemories(ctx)
if err != nil {
return nil, err
}
// 3. 执行整合逻辑(合并重复、更新过期、解决冲突)
consolidated := c.mergeAndOptimize(allMemories, sessions)
// 4. 保存整合后的记忆
for _, mem := range consolidated {
if err := c.memory.Store(ctx, mem); err != nil {
return nil, err
}
}
return consolidated, nil
}
func (c *Conway) getRecentSessions(ctx context.Context) []map[string]interface{} {
// 实现获取最近会话
return []map[string]interface{}{}
}
func (c *Conway) getAllMemories(ctx context.Context) ([]MemoryDocument, error) {
// 实现获取所有记忆
return []MemoryDocument{}, nil
}
func (c *Conway) mergeAndOptimize(memories []MemoryDocument, sessions []map[string]interface{}) []MemoryDocument {
// 实现记忆合并与优化
return memories
}
四、API使用示例
4.1 Python SDK使用示例
# Claude Memory Files API 使用示例
from anthropic import Anthropic
from datetime import datetime
# 初始化客户端
client = Anthropic(api_key="sk-ant-api03-...")
# ========== Memory Stores API ==========
# 创建记忆存储
memory_store = client.beta.memory_stores.create(
name="my-project-memory"
)
print(f"Created memory store: {memory_store.id}")
# 向记忆存储添加内容
client.beta.memory_stores.messages.create(
memory_store_id=memory_store.id,
role="user",
content="我们决定使用 PostgreSQL 作为主数据库"
)
client.beta.memory_stores.messages.create(
memory_store_id=memory_store.id,
role="assistant",
content="好的,我已经记录了这个决定。PostgreSQL 支持 JSON 类型,适合存储半结构化数据。"
)
# 检索记忆
results = client.beta.memory_stores.search(
memory_store_id=memory_store.id,
query="数据库选择",
limit=5
)
print(f"Found {len(results.results)} relevant memories")
# ========== Dreams API ==========
# 创建梦境任务
dream = client.beta.dreams.create(
inputs=[
{"type": "memory_store", "memory_store_id": memory_store.id},
{"type": "sessions", "session_ids": ["session_123", "session_456"]}
],
model="claude-opus-4-7",
instructions="重点关注技术决策相关的记忆,忽略调试笔记"
)
print(f"Dream created: {dream.id}")
# 轮询梦境状态
import asyncio
async def wait_for_dream(dream_id):
while True:
status = client.beta.dreams.get(dream_id)
print(f"Dream status: {status.status}")
if status.status == "completed":
# 获取整合后的记忆
output_store_id = status.outputs[0].memory_store_id
print(f"New memory store: {output_store_id}")
return output_store_id
elif status.status == "failed":
print(f"Dream failed: {status.error}")
return None
await asyncio.sleep(30)
# 运行梦境
new_store_id = asyncio.run(wait_for_dream(dream.id))
# ========== 使用新的记忆存储创建会话 ==========
if new_store_id:
session = client.beta.sessions.create(
agent="my-agent-id",
environment_id="env-123",
resources=[
{"type": "memory_store", "memory_store_id": new_store_id}
]
)
print(f"Session with memory: {session.id}")
4.2 完整记忆管理系统实现
# 完整的记忆管理系统
from anthropic import Anthropic
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
import json
class MemoryMode(Enum):
CLASSIC = "classic"
FILES = "files"
@dataclass
class UserProfile:
"""用户画像"""
user_id: str
name: str
preferences: Dict = field(default_factory=dict)
projects: List[str] = field(default_factory=list)
active_project: Optional[str] = None
class ClaudeMemoryManager:
"""
Claude记忆管理器
支持双模记忆切换
"""
def __init__(self, api_key: str, mode: MemoryMode = MemoryMode.FILES):
self.client = Anthropic(api_key=api_key)
self.mode = mode
self.memory_stores: Dict[str, str] = {} # project -> store_id
self.user_profile: Optional[UserProfile] = None
# ========== 记忆存储管理 ==========
def create_memory_store(self, name: str) -> str:
"""创建记忆存储"""
store = self.client.beta.memory_stores.create(name=name)
self.memory_stores[name] = store.id
return store.id
def get_memory_store(self, name: str) -> Optional[str]:
"""获取记忆存储ID"""
return self.memory_stores.get(name)
def store_conversation(
self,
store_name: str,
messages: List[Dict],
metadata: Optional[Dict] = None
):
"""
存储对话到记忆
参数:
store_name: 记忆存储名称
messages: 对话消息列表
metadata: 元数据(如项目信息)
"""
store_id = self.get_memory_store(store_name)
if not store_id:
store_id = self.create_memory_store(store_name)
for msg in messages:
self.client.beta.memory_stores.messages.create(
memory_store_id=store_id,
role=msg["role"],
content=msg["content"],
metadata=metadata
)
def retrieve_memories(
self,
store_name: str,
query: str,
limit: int = 5
) -> List[Dict]:
"""检索相关记忆"""
store_id = self.get_memory_store(store_name)
if not store_id:
return []
results = self.client.beta.memory_stores.search(
memory_store_id=store_id,
query=query,
limit=limit
)
return [
{
"content": r.content,
"relevance": r.relevance,
"topic": r.topic
}
for r in results.results
]
# ========== 梦境整合 ==========
def trigger_dream(
self,
store_name: str,
session_ids: List[str],
focus_areas: Optional[List[str]] = None
) -> str:
"""
触发梦境整合
参数:
store_name: 记忆存储名称
session_ids: 要整合的会话ID列表
focus_areas: 重点关注领域
返回:
新记忆存储ID
"""
store_id = self.get_memory_store(store_name)
if not store_id:
raise ValueError(f"Memory store not found: {store_name}")
instructions = ""
if focus_areas:
instructions = f"重点关注: {', '.join(focus_areas)}"
dream = self.client.beta.dreams.create(
inputs=[
{"type": "memory_store", "memory_store_id": store_id},
{"type": "sessions", "session_ids": session_ids}
],
model="claude-opus-4-7",
instructions=instructions
)
return dream.id
def wait_for_dream(self, dream_id: str, timeout: int = 600) -> Optional[str]:
"""
等待梦境完成并返回新记忆存储
参数:
dream_id: 梦境ID
timeout: 超时时间(秒)
返回:
新记忆存储ID,失败返回None
"""
import time
start = time.time()
while time.time() - start < timeout:
status = self.client.beta.dreams.get(dream_id)
if status.status == "completed":
if status.outputs:
return status.outputs[0].memory_store_id
return None
elif status.status == "failed":
print(f"Dream failed: {status.error}")
return None
time.sleep(30) # 每30秒检查一次
return None
# ========== 会话创建 ==========
def create_session_with_memory(
self,
agent_id: str,
environment_id: str,
store_name: str
) -> Dict:
"""
创建带有记忆的会话
"""
store_id = self.get_memory_store(store_name)
if not store_id:
store_id = self.create_memory_store(store_name)
session = self.client.beta.sessions.create(
agent=agent_id,
environment_id=environment_id,
resources=[
{"type": "memory_store", "memory_store_id": store_id}
]
)
return {
"session_id": session.id,
"memory_store_id": store_id
}
# ========== 自动记忆管理 ==========
def should_trigger_dream(self, store_name: str) -> bool:
"""
判断是否应该触发梦境
基于对话数量和时间
"""
# 获取会话列表
# 简化实现
conversation_count = len(self.client.beta.sessions.list())
hours_since_last = 24 # 简化
return conversation_count >= 5 or hours_since_last >= 24
def auto_memory_management(self, store_name: str):
"""
自动记忆管理
根据条件自动触发梦境整合
"""
if self.should_trigger_dream(store_name):
sessions = self.client.beta.sessions.list(limit=10)
session_ids = [s.id for s in sessions]
dream_id = self.trigger_dream(store_name, session_ids)
return self.wait_for_dream(dream_id)
return self.get_memory_store(store_name)
# ========== 使用示例 ==========
def main():
# 初始化管理器
manager = ClaudeMemoryManager(
api_key="sk-ant-api03-...",
mode=MemoryMode.FILES
)
# 创建项目记忆存储
project_store = "my-ai-project"
manager.create_memory_store(project_store)
# 存储对话
messages = [
{"role": "user", "content": "我想要实现一个RAG系统"},
{"role": "assistant", "content": "RAG系统需要三个核心组件:向量数据库、检索器和生成器"},
{"role": "user", "content": "我们用ChromaDB作为向量数据库"},
{"role": "assistant", "content": "好的,ChromaDB易于部署,支持多种嵌入模型"},
]
manager.store_conversation(
project_store,
messages,
metadata={"project": "my-ai-project", "topic": "RAG"}
)
# 检索记忆
memories = manager.retrieve_memories(
project_store,
"向量数据库选择"
)
print(f"Found {len(memories)} relevant memories")
# 触发梦境整合
dream_id = manager.trigger_dream(
project_store,
session_ids=["session-1", "session-2"],
focus_areas=["技术选型", "数据库"]
)
# 等待梦境完成
new_store_id = manager.wait_for_dream(dream_id)
if new_store_id:
print(f"Memory consolidated: {new_store_id}")
if __name__ == "__main__":
main()
五、应用场景
5.1 企业级应用
场景1:智能客服系统
需求:
- 跨会话记住用户偏好
- 理解用户历史问题
- 提供连贯的对话体验
实现:
- 每个用户维护独立记忆存储
- 定期梦境整合优化记忆
- 检索历史偏好加速响应
场景2:代码开发助手
# 代码助手的记忆实现
class CodeAssistantMemory:
"""代码开发助手的专业记忆"""
def __init__(self, manager: ClaudeMemoryManager):
self.manager = manager
self.project_store = "code-assistant"
# 初始化项目记忆
manager.create_memory_store(self.project_store)
def remember_code_style(self, project: str, style_rules: dict):
"""记忆代码风格偏好"""
self.manager.store_conversation(
self.project_store,
messages=[{
"role": "system",
"content": f"代码风格规则: {json.dumps(style_rules)}"
}],
metadata={"project": project, "type": "code_style"}
)
def remember_project_context(self, project: str, context: dict):
"""记忆项目上下文"""
self.manager.store_conversation(
self.project_store,
messages=[{
"role": "system",
"content": f"项目上下文: {json.dumps(context)}"
}],
metadata={"project": project, "type": "context"}
)
def get_code_guidance(self, query: str) -> List[Dict]:
"""获取代码指导记忆"""
memories = self.manager.retrieve_memories(
self.project_store,
query=f"代码风格 {query}"
)
return [m for m in memories if m.get("topic") == "code_style"]
5.2 个人助手应用
场景3:个人知识管理
架构:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 阅读输入 │───▶│ 记忆存储 │───▶│ 检索输出 │
│ (文章/笔记) │ │ (按主题) │ │ (知识问答) │
└─────────────┘ └─────────────┘ └─────────────┘
▲
│
┌─────────────┐
│ Dreams整合 │
│ (定期整理) │
└─────────────┘
六、与传统RAG的对比
| 维度 | 传统RAG | Claude Memory |
|---|---|---|
| 数据来源 | 外部文档 | 对话历史+项目文件 |
| 组织方式 | 原始chunk | AI重构的结构化文档 |
| 更新方式 | 手动重新索引 | 自动整合(Dreams) |
| 个性化程度 | 弱 | 强 |
| 适用场景 | 企业知识库 | 个人助手/工作搭档 |
| 上下文利用 | 被动检索 | 主动学习 |
七、总结与展望
Claude的双模记忆系统代表了AI Agent记忆架构的一次重大突破。通过将记忆外部化、结构化,并配合自动化的整合机制,Claude终于拥有了「永久大脑」的能力。
核心创新:
- Memory Files:打破容量限制,实现无限记忆
- Dreams:模拟人类睡眠,实现自动记忆整理
- Conway:7×24永不下线的Agent运行时
对开发者的启示:
- 不要依赖纯上下文做记忆——容量有限,成本高
- 设计好记忆的存储结构——按话题组织比流水账好检索
- 考虑「记忆整合」机制——长期使用会产生碎片,需要定期清理
- 平衡个性化和泛化能力——让AI记住用户,但不要被用户限制
未来展望:
- 记忆跨设备同步
- 多Agent共享记忆
- 记忆可视化编辑
- 记忆安全与隐私保护
记忆,是AI Agent走向「真正的助手」的必经之路。Claude的双模记忆系统为整个行业指明了方向。
参考资料
- Anthropic Official Documentation - Memory Files & Dreams
- TestingCatalog - Claude Dual-Mode Memory System Report
- Claude Platform API Documentation
- Managed Agents Beta Documentation