Claude「永久大脑」:双模记忆系统与Conway Agent架构深度解析

摘要

2026年5月,AI领域迎来一次重大技术突破。Anthropic为Claude引入了全新的双模记忆系统——Memory Files与Dreams,配合7×24小时永不下线的Conway Agent平台,标志着AI Agent从「即用即忘」的对话模式,向「持久记忆」的智能助手模式迈出了关键一步。本文将深入剖析这一架构的技术原理、实现细节,并提供完整的Python/Go代码示例,帮助开发者理解并构建类似的AI记忆系统。

关键词:AI Agent、记忆系统、Claude、Conway、Memory Files、Dreams、持久记忆


一、背景:从「滚动便签」到「永久大脑」

1.1 传统AI记忆的困境

在Claude Memory Files之前,大多数AI助手(包括Claude本身)的记忆机制本质上是一张「滚动便签」——将用户的所有偏好、背景和习惯压缩成一段统一的总结性记忆。这种方式简单有效,但问题也随之而来:

传统记忆模式的问题:
1. 信息容量有限 - 摘要长度有上限
2. 旧信息被覆盖 - 新对话会覆盖重要旧记忆
3. 话题互相干扰 - 不同项目的信息混杂在一起
4. 检索效率低下 - 需要大海捞针式查找

当用户与Claude讨论一个长期写作项目时,它可能已经完全忘记用户上周聊过的另一个项目细节。这种「健忘症」严重制约了AI作为真正智能助手的能力边界。

1.2 为什么不能只靠扩大上下文窗口?

很多人会问:扩大上下文窗口不就能解决问题了吗?答案是:不行。上下文窗口有几个本质局限:

维度局限性
单次会话对话关闭后数据即销毁
线性增长记忆越多,有效信息密度越低
检索效率大海捞针式查找,延迟高
容量成本Token费用随上下文线性增长

Claude的Memory Files选择了一条不同的路:外部化存储 + 结构化索引。这不仅是技术方案的改变,更是一种范式转换。


二、核心原理:双模记忆系统架构

系统架构图

2.1 系统整体架构

Claude的双模记忆系统由三大核心组件构成:

┌─────────────────────────────────────────────────────────────┐
│                    Claude 双模记忆系统                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐  │
│  │ Classic      │    │ Memory Files │    │    Dreams    │  │
│  │ Memory       │    │ (文件记忆)    │    │   (梦境)     │  │
│  │ (经典模式)    │    │              │    │              │  │
│  │              │    │ • 话题组织    │    │ • 异步整合    │  │
│  │ 单一摘要     │    │ • 无限容量    │    │ • 合并重复    │  │
│  │ 滚动覆盖     │    │ • AI自组织   │    │ • 更新过期    │  │
│  └──────────────┘    └──────────────┘    └──────────────┘  │
│                                                             │
│         ▲              触发条件              ▲              │
│         │    (5次对话或24小时)              │              │
│         │                                    │              │
│  ┌──────────────────────────────────────────────────────┐  │
│  │              Memory Stores API                       │  │
│  │              (记忆存储接口)                           │  │
│  └──────────────────────────────────────────────────────┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2.2 Classic Memory:传统单摘要模式

Classic Memory是Claude从上线之初沿用至今的记忆模式。它的工作原理如下:

# Classic Memory 的简化实现
class ClassicMemory:
    """
    传统单摘要记忆模式
    每次对话后,将所有新信息压缩进一个摘要
    """
    
    def __init__(self, max_summary_length: int = 4096):
        self.summary = ""  # 单一摘要
        self.max_length = max_summary_length
        self.conversation_history = []
    
    def add_interaction(self, user_input: str, assistant_response: str):
        """添加新的对话交互"""
        self.conversation_history.append({
            "user": user_input,
            "assistant": assistant_response,
            "timestamp": datetime.now()
        })
        
        # 当累积足够信息时,生成新摘要
        if len(self.conversation_history) >= 5:
            self._generate_summary()
    
    def _generate_summary(self):
        """将对话历史压缩成单一摘要(覆盖旧摘要)"""
        # 这里会调用LLM进行摘要生成
        # 问题:旧信息会被新信息覆盖
        prompt = f"""
        将以下对话压缩成一个简短的摘要:
        {self.conversation_history}
        
        摘要应包含:
        - 用户的主要偏好
        - 重要的项目背景
        - 长期目标或计划
        """
        # 调用LLM生成摘要...
        self.summary = generated_summary
        # 清空历史(信息已压缩进摘要)
        self.conversation_history = []
    
    def get_context(self) -> str:
        """获取记忆上下文"""
        return self.summary

问题分析

  • ✅ 实现简单
  • ✅ 推理时Token消耗低
  • ❌ 容量有限(max_summary_length)
  • ❌ 信息丢失(压缩过程不可逆)
  • ❌ 话题干扰(不同项目混杂)

2.3 Memory Files:结构化文档记忆

Memory Files是Anthropic推出的革命性新架构。它为Claude提供了一个「内置个人Wiki」:

# Memory Files 的核心实现
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
import hashlib

@dataclass
class MemoryDocument:
    """记忆文档结构"""
    doc_id: str
    topic: str  # 话题分类
    title: str  # 文档标题
    content: str  # 文档内容
    created_at: datetime
    updated_at: datetime
    tags: List[str] = field(default_factory=list)
    metadata: Dict = field(default_factory=dict)

class MemoryFiles:
    """
    文件记忆系统
    按话题组织的结构化文档存储
    """
    
    def __init__(self):
        # 文档存储(按话题分组)
        self.documents: Dict[str, List[MemoryDocument]] = {}
        # 全局索引
        self.global_index: Dict[str, str] = {}  # keyword -> doc_id
        # 向量索引(用于语义检索)
        self.vector_index: Dict[str, List[float]] = {}
    
    def store(self, conversation: str, context: Optional[dict] = None):
        """
        存储对话信息,自动提取并归档
        """
        # 1. 使用LLM提取关键信息并分类
        extracted = self._extract_key_info(conversation, context)
        
        # 2. 确定话题分类
        topic = self._classify_topic(extracted)
        
        # 3. 创建或更新文档
        doc = self._create_or_update_document(topic, extracted)
        
        # 4. 更新全局索引
        self._update_index(doc)
        
        return doc
    
    def _extract_key_info(self, conversation: str, context: Optional[dict]) -> dict:
        """
        使用LLM提取对话中的关键信息
        """
        prompt = f"""
        从以下对话中提取关键信息,返回JSON格式:
        
        对话内容:
        {conversation}
        
        上下文:{context or {}}
        
        返回格式:
        {{
            "key_facts": ["关键事实列表"],
            "preferences": ["用户偏好列表"],
            "action_items": ["待办事项列表"],
            "entities": ["提到的实体列表"],
            "emotional_tone": "情感基调"
        }}
        """
        # 调用Claude API提取信息
        extracted = call_claude_api(prompt)
        return extracted
    
    def _classify_topic(self, extracted: dict) -> str:
        """
        使用LLM对信息进行话题分类
        """
        topics = [
            "技术开发", "产品设计", "项目管理", "日常生活",
            "财务投资", "健康运动", "旅行计划", "学习研究",
            "社交关系", "娱乐休闲"
        ]
        
        prompt = f"""
        将以下信息分类到最合适的话题中:
        
        提取的信息:{extracted}
        
        可选话题:{topics}
        
        只返回一个话题名称。
        """
        return call_claude_api(prompt).strip()
    
    def _create_or_update_document(
        self, 
        topic: str, 
        extracted: dict
    ) -> MemoryDocument:
        """
        创建新文档或更新已有文档
        """
        # 检查是否存在该话题的文档
        if topic in self.documents and self.documents[topic]:
            # 更新现有文档
            doc = self.documents[topic][-1]
            doc.content = self._merge_content(doc.content, extracted)
            doc.updated_at = datetime.now()
        else:
            # 创建新文档
            doc = MemoryDocument(
                doc_id=self._generate_doc_id(),
                topic=topic,
                title=self._generate_title(extracted),
                content=str(extracted),
                created_at=datetime.now(),
                updated_at=datetime.now(),
                tags=self._extract_tags(extracted)
            )
            if topic not in self.documents:
                self.documents[topic] = []
            self.documents[topic].append(doc)
        
        return doc
    
    def _merge_content(self, existing: str, new: dict) -> str:
        """
        合并新旧内容,保留重要历史信息
        """
        prompt = f"""
        将新信息整合到现有文档中:
        
        现有文档:
        {existing}
        
        新信息:
        {new}
        
        规则:
        1. 保留所有重要的历史信息
        2. 用新信息更新过时的内容
        3. 合并重复内容
        4. 保持文档结构清晰
        
        返回更新后的完整文档内容。
        """
        return call_claude_api(prompt)
    
    def retrieve(self, query: str, max_docs: int = 5) -> List[MemoryDocument]:
        """
        按需检索相关记忆
        """
        # 1. 确定查询相关的话题
        relevant_topics = self._find_relevant_topics(query)
        
        # 2. 收集相关文档
        candidates = []
        for topic in relevant_topics:
            if topic in self.documents:
                candidates.extend(self.documents[topic])
        
        # 3. 语义排序
        ranked = self._rank_documents(query, candidates)
        
        return ranked[:max_docs]
    
    def _find_relevant_topics(self, query: str) -> List[str]:
        """
        找出与查询相关的话题
        """
        all_topics = list(self.documents.keys())
        prompt = f"""
        判断以下查询与哪些话题相关:
        
        查询:{query}
        
        可选话题:{all_topics}
        
        返回最相关的3个话题,按相关性排序。
        """
        return call_claude_api(prompt).split(',')
    
    def _rank_documents(
        self, 
        query: str, 
        documents: List[MemoryDocument]
    ) -> List[MemoryDocument]:
        """
        使用向量相似度对文档排序
        """
        query_vector = self._embed_text(query)
        
        scored = []
        for doc in documents:
            if doc.doc_id in self.vector_index:
                similarity = cosine_similarity(
                    query_vector, 
                    self.vector_index[doc.doc_id]
                )
                scored.append((similarity, doc))
        
        scored.sort(key=lambda x: x[0], reverse=True)
        return [doc for _, doc in scored]
    
    def _embed_text(self, text: str) -> List[float]:
        """文本向量化"""
        # 使用Embedding API
        return embedding_api.embed(text)
    
    def _generate_doc_id(self) -> str:
        """生成唯一文档ID"""
        return hashlib.md5(
            str(datetime.now()).encode()
        ).hexdigest()[:12]
    
    def _generate_title(self, content: dict) -> str:
        """生成文档标题"""
        prompt = f"为以下内容生成一个简短的描述性标题:{content}"
        return call_claude_api(prompt)
    
    def _extract_tags(self, content: dict) -> List[str]:
        """提取标签"""
        return content.get('entities', [])

Memory Files的核心优势

特性Classic MemoryMemory Files
容量4K Token上限理论无限
组织方式单一摘要按话题分类
更新方式覆盖旧信息增量整合
检索效率O(n)O(1) 话题定位
用户控制有限完全可控

2.4 Dreams:REM睡眠式记忆整合

Dreams是Anthropic从人类神经科学中获得的灵感。在人类睡眠的REM阶段,大脑会回放白天的经历、强化重要记忆、丢弃无用的噪声信息。Anthropic把这个机制搬到了Claude身上。

# Dreams 记忆整合系统
import asyncio
from enum import Enum
from typing import List, Optional
from dataclasses import dataclass
import httpx

class DreamStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELED = "canceled"

@dataclass
class DreamInput:
    """梦境输入"""
    type: str  # "memory_store" 或 "sessions"
    memory_store_id: Optional[str] = None
    session_ids: Optional[List[str]] = None

@dataclass
class DreamOutput:
    """梦境输出"""
    type: str
    memory_store_id: str

@dataclass
class Dream:
    """梦境任务"""
    id: str
    status: DreamStatus
    inputs: List[DreamInput]
    outputs: List[DreamOutput]
    model: str
    instructions: str
    created_at: datetime
    ended_at: Optional[datetime]
    usage: dict

class DreamsService:
    """
    梦境记忆整合服务
    异步后台执行,定期整理记忆
    """
    
    def __init__(self, api_key: str):
        self.client = httpx.AsyncClient(
            base_url="https://api.anthropic.com",
            headers={"x-api-key": api_key}
        )
        self.trigger_conditions = {
            "min_conversations": 5,
            "min_hours": 24
        }
    
    async def create_dream(
        self,
        memory_store_id: str,
        session_ids: List[str],
        model: str = "claude-opus-4-7",
        instructions: Optional[str] = None
    ) -> Dream:
        """
        创建梦境任务
        
        参数:
            memory_store_id: 现有记忆存储ID
            session_ids: 要分析的会话ID列表
            model: 用于做梦的模型
            instructions: 额外的整合指导
        
        返回:
            Dream: 梦境任务对象
        """
        response = await self.client.post(
            "/v1/beta/dreams/create",
            json={
                "inputs": [
                    {"type": "memory_store", "memory_store_id": memory_store_id},
                    {"type": "sessions", "session_ids": session_ids}
                ],
                "model": model,
                "instructions": instructions or "整合记忆,去重更新"
            }
        )
        return Dream(**response.json())
    
    async def get_dream(self, dream_id: str) -> Dream:
        """获取梦境任务状态"""
        response = await self.client.get(f"/v1/beta/dreams/{dream_id}")
        return Dream(**response.json())
    
    async def list_dreams(
        self, 
        limit: int = 20,
        status: Optional[DreamStatus] = None
    ) -> List[Dream]:
        """列出梦境任务"""
        params = {"limit": limit}
        if status:
            params["status"] = status.value
        
        response = await self.client.get(
            "/v1/beta/dreams/list",
            params=params
        )
        return [Dream(**d) for d in response.json()["dreams"]]
    
    async def cancel_dream(self, dream_id: str) -> bool:
        """取消梦境任务"""
        response = await self.client.post(
            f"/v1/beta/dreams/{dream_id}/cancel"
        )
        return response.status_code == 200
    
    def should_trigger_dream(self, memory_store) -> bool:
        """
        判断是否应该触发梦境
        
        触发条件:
        1. 累积5次对话
        2. 或距离上次整合超过24小时
        """
        # 检查对话数量
        conversation_count = memory_store.get_conversation_count()
        if conversation_count >= self.trigger_conditions["min_conversations"]:
            return True
        
        # 检查时间间隔
        last_dream_time = memory_store.get_last_dream_time()
        if last_dream_time:
            hours_since = (datetime.now() - last_dream_time).total_seconds() / 3600
            if hours_since >= self.trigger_conditions["min_hours"]:
                return True
        
        return False
    
    async def consolidation_logic(
        self,
        memory_store,
        sessions: List[dict]
    ) -> dict:
        """
        记忆整合的核心逻辑
        
        执行步骤:
        1. 合并重复条目
        2. 更新过时信息
        3. 解决逻辑矛盾
        4. 挖掘隐藏模式
        """
        all_memories = []
        
        # 收集现有记忆
        for doc in memory_store.documents.values():
            all_memories.extend(doc)
        
        # 收集会话历史
        for session in sessions:
            all_memories.extend(session.get("memories", []))
        
        # 第一步:合并重复条目
        merged = self._merge_duplicates(all_memories)
        
        # 第二步:更新过时信息
        updated = self._update_stale_entries(merged)
        
        # 第三步:解决逻辑矛盾
        resolved = self._resolve_conflicts(updated)
        
        # 第四步:优化索引结构
        optimized = self._optimize_index(resolved)
        
        return optimized
    
    def _merge_duplicates(self, memories: List[dict]) -> List[dict]:
        """
        合并重复的记忆条目
        使用语义相似度判断重复
        """
        merged = []
        seen_signatures = {}
        
        for memory in memories:
            # 生成记忆签名
            signature = self._generate_signature(memory)
            
            # 检查是否已存在相似记忆
            if signature in seen_signatures:
                # 合并到已有记忆
                existing = seen_signatures[signature]
                merged_memory = self._combine_memories(existing, memory)
                seen_signatures[signature] = merged_memory
            else:
                seen_signatures[signature] = memory
        
        return list(seen_signatures.values())
    
    def _generate_signature(self, memory: dict) -> str:
        """
        生成记忆签名
        用于识别重复记忆
        """
        # 使用关键字段生成哈希
        key_info = f"{memory.get('topic', '')}_{memory.get('entity', '')}"
        return hashlib.md5(key_info.encode()).hexdigest()[:8]
    
    def _combine_memories(self, existing: dict, new: dict) -> dict:
        """
        合并两条记忆
        保留最新和最重要的信息
        """
        prompt = f"""
        合并以下两条相关记忆,保留所有重要信息:
        
        记忆1:{existing}
        记忆2:{new}
        
        规则:
        1. 时间信息以最新的为准
        2. 保留所有独特的细节
        3. 合并重复描述
        """
        return call_claude_api(prompt)
    
    def _update_stale_entries(self, memories: List[dict]) -> List[dict]:
        """
        更新过时的记忆条目
        例如:将「昨天我们决定用Redis」更新为
              「2026年5月15日我们决定用Redis」
        """
        updated = []
        current_time = datetime.now()
        
        for memory in memories:
            if self._is_stale(memory, current_time):
                memory = self._refresh_timestamp(memory, current_time)
            updated.append(memory)
        
        return updated
    
    def _is_stale(self, memory: dict, current_time: datetime) -> bool:
        """判断记忆是否过时"""
        last_update = memory.get("updated_at")
        if not last_update:
            return True
        
        days_since = (current_time - last_update).days
        return days_since > 7  # 7天未更新的记忆视为过时
    
    def _refresh_timestamp(self, memory: dict, current_time: datetime) -> dict:
        """刷新记忆时间戳"""
        prompt = f"""
        将以下记忆中的相对时间更新为绝对时间:
        
        记忆:{memory}
        当前时间:{current_time.strftime('%Y年%m月%d日')}
        
        只更新时间相关的内容,其他不变。
        """
        memory["content"] = call_claude_api(prompt)
        memory["updated_at"] = current_time
        return memory
    
    def _resolve_conflicts(self, memories: List[dict]) -> List[dict]:
        """
        解决记忆中的逻辑矛盾
        例如:用户曾说喜欢喝咖啡,但后来又说戒了咖啡
        """
        conflicts = self._find_conflicts(memories)
        
        for conflict_group in conflicts:
            resolved = self._resolve_conflict_group(conflict_group)
            # 用解决后的记忆替换原有记忆
            memories = [m for m in memories if m not in conflict_group]
            memories.append(resolved)
        
        return memories
    
    def _find_conflicts(self, memories: List[dict]) -> List[List[dict]]:
        """找出相互矛盾的记忇组"""
        conflicts = []
        for i, mem1 in enumerate(memories):
            for mem2 in memories[i+1:]:
                if self._are_contradictory(mem1, mem2):
                    conflicts.append([mem1, mem2])
        return conflicts
    
    def _are_contradictory(self, mem1: dict, mem2: dict) -> bool:
        """判断两条记忆是否矛盾"""
        # 使用LLM判断逻辑矛盾
        prompt = f"""
        判断以下两条记忆是否存在逻辑矛盾:
        
        记忆1:{mem1}
        记忆2:{mem2}
        
        只回答「是」或「否」。
        """
        result = call_claude_api(prompt).strip()
        return result == "是"
    
    def _resolve_conflict_group(self, conflict_group: List[dict]) -> dict:
        """解决一组矛盾记忆"""
        prompt = f"""
        分析并解决以下矛盾的记忆,给出一个合理的综合判断:
        
        矛盾记忆:{conflict_group}
        
        考虑:
        1. 哪条记忆是最新发生的
        2. 哪条记忆更符合常理
        3. 用户可能改变了偏好
        
        返回解决后的记忆。
        """
        return call_claude_api(prompt)
    
    def _optimize_index(self, memories: List[dict]) -> dict:
        """
        优化索引结构
        提高检索效率
        """
        # 按话题重新分组
        topics = {}
        for memory in memories:
            topic = memory.get("topic", "未分类")
            if topic not in topics:
                topics[topic] = []
            topics[topic].append(memory)
        
        # 生成新的全局索引
        new_index = {}
        for topic, mems in topics.items():
            for mem in mems:
                keywords = self._extract_keywords(mem)
                for kw in keywords:
                    if kw not in new_index:
                        new_index[kw] = []
                    new_index[kw].append(mem.get("id"))
        
        return {
            "topics": topics,
            "index": new_index,
            "total_count": len(memories)
        }

三、Conway Agent:7×24永不下线的Agent平台

3.1 Conway架构概述

Conway是Anthropic推出的终极Agent平台,它与Memory Files深度整合,实现了真正的「永不下线」智能体:

┌─────────────────────────────────────────────────────────────┐
│                    Conway Agent Platform                      │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐    │
│  │                  Conway Runtime                      │    │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐           │    │
│  │  │ Search  │  │  Chat   │  │ System  │           │    │
│  │  │  搜索   │  │  对话   │  │  系统   │           │    │
│  │  └─────────┘  └─────────┘  └─────────┘           │    │
│  │                                                     │    │
│  │  常驻后台 │ 事件监听 │ Webhook接收 │ 自主触发    │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              核心能力                                 │    │
│  │  • 浏览器自动化操控                                  │    │
│  │  • Claude Code执行                                   │    │
│  │  • CNW ZIP扩展包格式                                 │    │
│  │  • Webhook信号接收                                  │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              Memory Files 集成                        │    │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐           │    │
│  │  │ 存储    │◄─►│ 检索    │◄─►│ 整合    │           │    │
│  │  │ Storage │  │ Recall  │  │ Dreams  │           │    │
│  │  └─────────┘  └─────────┘  └─────────┘           │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                             │
└─────────────────────────────────────────────────────────────┘

3.2 Conway核心实现

// Conway Agent 的 Go 实现
package conway

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "sync"
    "time"
)

// Event 代表Conway可以处理的事件类型
type EventType string

const (
    EventWebhook       EventType = "webhook"
    EventSchedule      EventType = "schedule"
    EventUserMessage   EventType = "user_message"
    EventSystemNotify  EventType = "system_notification"
)

// Event 事件结构
type Event struct {
    ID        string                 `json:"id"`
    Type      EventType              `json:"type"`
    Payload   map[string]interface{} `json:"payload"`
    Timestamp time.Time              `json:"timestamp"`
    Source    string                 `json:"source"`
}

// Action Conway执行的动作
type Action struct {
    Type       string                 `json:"type"`
    Target     string                 `json:"target"`
    Parameters map[string]interface{} `json:"parameters"`
    Result     interface{}           `json:"result,omitempty"`
    Error      error                 `json:"error,omitempty"`
}

// MemoryStore 记忆存储接口
type MemoryStore interface {
    Store(ctx context.Context, event Event) error
    Retrieve(ctx context.Context, query string, limit int) ([]MemoryDocument, error)
    List(ctx context.Context, topic string) ([]MemoryDocument, error)
}

// MemoryDocument 记忆文档
type MemoryDocument struct {
    ID        string                 `json:"id"`
    Topic     string                 `json:"topic"`
    Title     string                 `json:"title"`
    Content   string                 `json:"content"`
    CreatedAt time.Time              `json:"created_at"`
    UpdatedAt time.Time              `json:"updated_at"`
    Tags      []string               `json:"tags"`
    Metadata  map[string]interface{} `json:"metadata"`
}

// ConwayConfig Conway配置
type ConwayConfig struct {
    MemoryStore    MemoryStore
    ClaudeEndpoint string
    WebhookSecret  string
    EnableBrowser  bool
    EnableCodeExec bool
}

// Conway Agent主结构
type Conway struct {
    config     ConwayConfig
    eventQueue chan Event
    handlers   map[EventType][]EventHandler
    memory     MemoryStore
    mu         sync.RWMutex
    running    bool
    cancel     context.CancelFunc
}

// EventHandler 事件处理器
type EventHandler interface {
    Handle(ctx context.Context, event Event) ([]Action, error)
    CanHandle(event Event) bool
}

// NewConway 创建新的Conway实例
func NewConway(config ConwayConfig) *Conway {
    return &Conway{
        config:     config,
        eventQueue: make(chan Event, 1000),
        handlers:   make(map[EventType][]EventHandler),
        memory:     config.MemoryStore,
    }
}

// RegisterHandler 注册事件处理器
func (c *Conway) RegisterHandler(handler EventHandler) {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    // 根据handler能力自动注册到对应事件类型
    // 实际实现中需要反射或接口方法判断
    c.handlers[EventWebhook] = append(c.handlers[EventWebhook], handler)
}

// Start 启动Conway Agent
func (c *Conway) Start(ctx context.Context) error {
    c.mu.Lock()
    if c.running {
        c.mu.Unlock()
        return fmt.Errorf("Conway is already running")
    }
    c.running = true
    c.mu.Unlock()
    
    // 创建派生context
    ctx, cancel := context.WithCancel(ctx)
    c.cancel = cancel
    
    // 启动事件处理循环
    go c.eventLoop(ctx)
    
    // 启动记忆整合协程
    go c.dreamLoop(ctx)
    
    log.Println("Conway Agent started")
    return nil
}

// Stop 停止Conway Agent
func (c *Conway) Stop() error {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    if !c.running {
        return fmt.Errorf("Conway is not running")
    }
    
    c.cancel()
    c.running = false
    close(c.eventQueue)
    
    log.Println("Conway Agent stopped")
    return nil
}

// eventLoop 事件处理主循环
func (c *Conway) eventLoop(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case event, ok := <-c.eventQueue:
            if !ok {
                return
            }
            c.processEvent(ctx, event)
        }
    }
}

// processEvent 处理单个事件
func (c *Conway) processEvent(ctx context.Context, event Event) {
    log.Printf("Processing event: %s (%s)", event.ID, event.Type)
    
    // 1. 存储事件到记忆
    if err := c.memory.Store(ctx, event); err != nil {
        log.Printf("Failed to store event: %v", err)
    }
    
    // 2. 检索相关记忆
    relevantMemories, err := c.memory.Retrieve(ctx, event.Payload["query"], 5)
    if err != nil {
        log.Printf("Failed to retrieve memories: %v", err)
        relevantMemories = []MemoryDocument{}
    }
    
    // 3. 分发给handlers处理
    c.mu.RLock()
    handlers := c.handlers[event.Type]
    c.mu.RUnlock()
    
    for _, handler := range handlers {
        if handler.CanHandle(event) {
            actions, err := handler.Handle(ctx, event)
            if err != nil {
                log.Printf("Handler error: %v", err)
                continue
            }
            
            // 执行actions
            for _, action := range actions {
                c.executeAction(ctx, action)
            }
        }
    }
    
    // 4. 生成新的Conway事件供后续处理
    c.emitEvent(ctx, Event{
        ID:        fmt.Sprintf("derived_%d", time.Now().UnixNano()),
        Type:      EventSystemNotify,
        Payload: map[string]interface{}{
            "original_event": event.ID,
            "memories_used":  len(relevantMemories),
            "actions_taken":  len(handlers),
        },
        Timestamp: time.Now(),
        Source:    "conway",
    })
}

// executeAction 执行动作
func (c *Conway) executeAction(ctx context.Context, action Action) {
    log.Printf("Executing action: %s -> %s", action.Type, action.Target)
    
    switch action.Type {
    case "browser_navigate":
        // 浏览器导航
        c.browserNavigate(ctx, action.Target)
    case "claude_code":
        // 执行Claude Code
        c.runClaudeCode(ctx, action.Parameters)
    case "webhook_send":
        // 发送webhook
        c.sendWebhook(ctx, action.Target, action.Parameters)
    case "memory_update":
        // 更新记忆
        c.updateMemory(ctx, action.Parameters)
    default:
        log.Printf("Unknown action type: %s", action.Type)
    }
}

// EmitEvent 向Conway发送事件
func (c *Conway) EmitEvent(event Event) {
    c.eventQueue <- event
}

// emitEvent 内部事件发射
func (c *Conway) emitEvent(ctx context.Context, event Event) {
    select {
    case c.eventQueue <- event:
    default:
        log.Println("Event queue full, dropping event")
    }
}

// dreamLoop 梦境整合循环
func (c *Conway) dreamLoop(ctx context.Context) {
    ticker := time.NewTicker(24 * time.Hour)
    defer ticker.Stop()
    
    conversationCount := 0
    maxConversations := 5
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            c.triggerDream(ctx)
        }
        
        conversationCount++
        if conversationCount >= maxConversations {
            c.triggerDream(ctx)
            conversationCount = 0
        }
    }
}

// triggerDream 触发梦境整合
func (c *Conway) triggerDream(ctx context.Context) {
    log.Println("Triggering dream consolidation...")
    
    // 获取所有记忆
    memories, err := c.consolidateMemories(ctx)
    if err != nil {
        log.Printf("Dream consolidation failed: %v", err)
        return
    }
    
    log.Printf("Dream completed: %d memories consolidated", len(memories))
}

// consolidateMemories 执行记忆整合
func (c *Conway) consolidateMemories(ctx context.Context) ([]MemoryDocument, error) {
    // 1. 获取所有会话
    sessions := c.getRecentSessions(ctx)
    
    // 2. 获取现有记忆存储
    allMemories, err := c.getAllMemories(ctx)
    if err != nil {
        return nil, err
    }
    
    // 3. 执行整合逻辑(合并重复、更新过期、解决冲突)
    consolidated := c.mergeAndOptimize(allMemories, sessions)
    
    // 4. 保存整合后的记忆
    for _, mem := range consolidated {
        if err := c.memory.Store(ctx, mem); err != nil {
            return nil, err
        }
    }
    
    return consolidated, nil
}

func (c *Conway) getRecentSessions(ctx context.Context) []map[string]interface{} {
    // 实现获取最近会话
    return []map[string]interface{}{}
}

func (c *Conway) getAllMemories(ctx context.Context) ([]MemoryDocument, error) {
    // 实现获取所有记忆
    return []MemoryDocument{}, nil
}

func (c *Conway) mergeAndOptimize(memories []MemoryDocument, sessions []map[string]interface{}) []MemoryDocument {
    // 实现记忆合并与优化
    return memories
}

四、API使用示例

4.1 Python SDK使用示例

# Claude Memory Files API 使用示例
from anthropic import Anthropic
from datetime import datetime

# 初始化客户端
client = Anthropic(api_key="sk-ant-api03-...")

# ========== Memory Stores API ==========

# 创建记忆存储
memory_store = client.beta.memory_stores.create(
    name="my-project-memory"
)
print(f"Created memory store: {memory_store.id}")

# 向记忆存储添加内容
client.beta.memory_stores.messages.create(
    memory_store_id=memory_store.id,
    role="user",
    content="我们决定使用 PostgreSQL 作为主数据库"
)

client.beta.memory_stores.messages.create(
    memory_store_id=memory_store.id,
    role="assistant", 
    content="好的,我已经记录了这个决定。PostgreSQL 支持 JSON 类型,适合存储半结构化数据。"
)

# 检索记忆
results = client.beta.memory_stores.search(
    memory_store_id=memory_store.id,
    query="数据库选择",
    limit=5
)
print(f"Found {len(results.results)} relevant memories")

# ========== Dreams API ==========

# 创建梦境任务
dream = client.beta.dreams.create(
    inputs=[
        {"type": "memory_store", "memory_store_id": memory_store.id},
        {"type": "sessions", "session_ids": ["session_123", "session_456"]}
    ],
    model="claude-opus-4-7",
    instructions="重点关注技术决策相关的记忆,忽略调试笔记"
)

print(f"Dream created: {dream.id}")

# 轮询梦境状态
import asyncio

async def wait_for_dream(dream_id):
    while True:
        status = client.beta.dreams.get(dream_id)
        print(f"Dream status: {status.status}")
        
        if status.status == "completed":
            # 获取整合后的记忆
            output_store_id = status.outputs[0].memory_store_id
            print(f"New memory store: {output_store_id}")
            return output_store_id
        elif status.status == "failed":
            print(f"Dream failed: {status.error}")
            return None
        
        await asyncio.sleep(30)

# 运行梦境
new_store_id = asyncio.run(wait_for_dream(dream.id))

# ========== 使用新的记忆存储创建会话 ==========

if new_store_id:
    session = client.beta.sessions.create(
        agent="my-agent-id",
        environment_id="env-123",
        resources=[
            {"type": "memory_store", "memory_store_id": new_store_id}
        ]
    )
    print(f"Session with memory: {session.id}")

4.2 完整记忆管理系统实现

# 完整的记忆管理系统
from anthropic import Anthropic
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
import json

class MemoryMode(Enum):
    CLASSIC = "classic"
    FILES = "files"

@dataclass
class UserProfile:
    """用户画像"""
    user_id: str
    name: str
    preferences: Dict = field(default_factory=dict)
    projects: List[str] = field(default_factory=list)
    active_project: Optional[str] = None

class ClaudeMemoryManager:
    """
    Claude记忆管理器
    支持双模记忆切换
    """
    
    def __init__(self, api_key: str, mode: MemoryMode = MemoryMode.FILES):
        self.client = Anthropic(api_key=api_key)
        self.mode = mode
        self.memory_stores: Dict[str, str] = {}  # project -> store_id
        self.user_profile: Optional[UserProfile] = None
    
    # ========== 记忆存储管理 ==========
    
    def create_memory_store(self, name: str) -> str:
        """创建记忆存储"""
        store = self.client.beta.memory_stores.create(name=name)
        self.memory_stores[name] = store.id
        return store.id
    
    def get_memory_store(self, name: str) -> Optional[str]:
        """获取记忆存储ID"""
        return self.memory_stores.get(name)
    
    def store_conversation(
        self, 
        store_name: str, 
        messages: List[Dict],
        metadata: Optional[Dict] = None
    ):
        """
        存储对话到记忆
        
        参数:
            store_name: 记忆存储名称
            messages: 对话消息列表
            metadata: 元数据(如项目信息)
        """
        store_id = self.get_memory_store(store_name)
        if not store_id:
            store_id = self.create_memory_store(store_name)
        
        for msg in messages:
            self.client.beta.memory_stores.messages.create(
                memory_store_id=store_id,
                role=msg["role"],
                content=msg["content"],
                metadata=metadata
            )
    
    def retrieve_memories(
        self, 
        store_name: str, 
        query: str,
        limit: int = 5
    ) -> List[Dict]:
        """检索相关记忆"""
        store_id = self.get_memory_store(store_name)
        if not store_id:
            return []
        
        results = self.client.beta.memory_stores.search(
            memory_store_id=store_id,
            query=query,
            limit=limit
        )
        
        return [
            {
                "content": r.content,
                "relevance": r.relevance,
                "topic": r.topic
            }
            for r in results.results
        ]
    
    # ========== 梦境整合 ==========
    
    def trigger_dream(
        self, 
        store_name: str,
        session_ids: List[str],
        focus_areas: Optional[List[str]] = None
    ) -> str:
        """
        触发梦境整合
        
        参数:
            store_name: 记忆存储名称
            session_ids: 要整合的会话ID列表
            focus_areas: 重点关注领域
        
        返回:
            新记忆存储ID
        """
        store_id = self.get_memory_store(store_name)
        if not store_id:
            raise ValueError(f"Memory store not found: {store_name}")
        
        instructions = ""
        if focus_areas:
            instructions = f"重点关注: {', '.join(focus_areas)}"
        
        dream = self.client.beta.dreams.create(
            inputs=[
                {"type": "memory_store", "memory_store_id": store_id},
                {"type": "sessions", "session_ids": session_ids}
            ],
            model="claude-opus-4-7",
            instructions=instructions
        )
        
        return dream.id
    
    def wait_for_dream(self, dream_id: str, timeout: int = 600) -> Optional[str]:
        """
        等待梦境完成并返回新记忆存储
        
        参数:
            dream_id: 梦境ID
            timeout: 超时时间(秒)
        
        返回:
            新记忆存储ID,失败返回None
        """
        import time
        start = time.time()
        
        while time.time() - start < timeout:
            status = self.client.beta.dreams.get(dream_id)
            
            if status.status == "completed":
                if status.outputs:
                    return status.outputs[0].memory_store_id
                return None
            elif status.status == "failed":
                print(f"Dream failed: {status.error}")
                return None
            
            time.sleep(30)  # 每30秒检查一次
        
        return None
    
    # ========== 会话创建 ==========
    
    def create_session_with_memory(
        self,
        agent_id: str,
        environment_id: str,
        store_name: str
    ) -> Dict:
        """
        创建带有记忆的会话
        """
        store_id = self.get_memory_store(store_name)
        if not store_id:
            store_id = self.create_memory_store(store_name)
        
        session = self.client.beta.sessions.create(
            agent=agent_id,
            environment_id=environment_id,
            resources=[
                {"type": "memory_store", "memory_store_id": store_id}
            ]
        )
        
        return {
            "session_id": session.id,
            "memory_store_id": store_id
        }
    
    # ========== 自动记忆管理 ==========
    
    def should_trigger_dream(self, store_name: str) -> bool:
        """
        判断是否应该触发梦境
        基于对话数量和时间
        """
        # 获取会话列表
        # 简化实现
        conversation_count = len(self.client.beta.sessions.list())
        hours_since_last = 24  # 简化
        
        return conversation_count >= 5 or hours_since_last >= 24
    
    def auto_memory_management(self, store_name: str):
        """
        自动记忆管理
        根据条件自动触发梦境整合
        """
        if self.should_trigger_dream(store_name):
            sessions = self.client.beta.sessions.list(limit=10)
            session_ids = [s.id for s in sessions]
            
            dream_id = self.trigger_dream(store_name, session_ids)
            return self.wait_for_dream(dream_id)
        
        return self.get_memory_store(store_name)


# ========== 使用示例 ==========

def main():
    # 初始化管理器
    manager = ClaudeMemoryManager(
        api_key="sk-ant-api03-...",
        mode=MemoryMode.FILES
    )
    
    # 创建项目记忆存储
    project_store = "my-ai-project"
    manager.create_memory_store(project_store)
    
    # 存储对话
    messages = [
        {"role": "user", "content": "我想要实现一个RAG系统"},
        {"role": "assistant", "content": "RAG系统需要三个核心组件:向量数据库、检索器和生成器"},
        {"role": "user", "content": "我们用ChromaDB作为向量数据库"},
        {"role": "assistant", "content": "好的,ChromaDB易于部署,支持多种嵌入模型"},
    ]
    
    manager.store_conversation(
        project_store,
        messages,
        metadata={"project": "my-ai-project", "topic": "RAG"}
    )
    
    # 检索记忆
    memories = manager.retrieve_memories(
        project_store,
        "向量数据库选择"
    )
    print(f"Found {len(memories)} relevant memories")
    
    # 触发梦境整合
    dream_id = manager.trigger_dream(
        project_store,
        session_ids=["session-1", "session-2"],
        focus_areas=["技术选型", "数据库"]
    )
    
    # 等待梦境完成
    new_store_id = manager.wait_for_dream(dream_id)
    if new_store_id:
        print(f"Memory consolidated: {new_store_id}")

if __name__ == "__main__":
    main()

五、应用场景

5.1 企业级应用

场景1:智能客服系统

需求:
- 跨会话记住用户偏好
- 理解用户历史问题
- 提供连贯的对话体验

实现:
- 每个用户维护独立记忆存储
- 定期梦境整合优化记忆
- 检索历史偏好加速响应

场景2:代码开发助手

# 代码助手的记忆实现
class CodeAssistantMemory:
    """代码开发助手的专业记忆"""
    
    def __init__(self, manager: ClaudeMemoryManager):
        self.manager = manager
        self.project_store = "code-assistant"
        
        # 初始化项目记忆
        manager.create_memory_store(self.project_store)
    
    def remember_code_style(self, project: str, style_rules: dict):
        """记忆代码风格偏好"""
        self.manager.store_conversation(
            self.project_store,
            messages=[{
                "role": "system",
                "content": f"代码风格规则: {json.dumps(style_rules)}"
            }],
            metadata={"project": project, "type": "code_style"}
        )
    
    def remember_project_context(self, project: str, context: dict):
        """记忆项目上下文"""
        self.manager.store_conversation(
            self.project_store,
            messages=[{
                "role": "system", 
                "content": f"项目上下文: {json.dumps(context)}"
            }],
            metadata={"project": project, "type": "context"}
        )
    
    def get_code_guidance(self, query: str) -> List[Dict]:
        """获取代码指导记忆"""
        memories = self.manager.retrieve_memories(
            self.project_store,
            query=f"代码风格 {query}"
        )
        return [m for m in memories if m.get("topic") == "code_style"]

5.2 个人助手应用

场景3:个人知识管理

架构:
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ 阅读输入    │───▶│ 记忆存储    │───▶│ 检索输出    │
│ (文章/笔记) │    │ (按主题)    │    │ (知识问答)  │
└─────────────┘    └─────────────┘    └─────────────┘
                       ▲
                       │
                ┌─────────────┐
                │ Dreams整合  │
                │ (定期整理)  │
                └─────────────┘

六、与传统RAG的对比

维度传统RAGClaude Memory
数据来源外部文档对话历史+项目文件
组织方式原始chunkAI重构的结构化文档
更新方式手动重新索引自动整合(Dreams)
个性化程度
适用场景企业知识库个人助手/工作搭档
上下文利用被动检索主动学习

七、总结与展望

Claude的双模记忆系统代表了AI Agent记忆架构的一次重大突破。通过将记忆外部化、结构化,并配合自动化的整合机制,Claude终于拥有了「永久大脑」的能力。

核心创新

  1. Memory Files:打破容量限制,实现无限记忆
  2. Dreams:模拟人类睡眠,实现自动记忆整理
  3. Conway:7×24永不下线的Agent运行时

对开发者的启示

  • 不要依赖纯上下文做记忆——容量有限,成本高
  • 设计好记忆的存储结构——按话题组织比流水账好检索
  • 考虑「记忆整合」机制——长期使用会产生碎片,需要定期清理
  • 平衡个性化和泛化能力——让AI记住用户,但不要被用户限制

未来展望

  • 记忆跨设备同步
  • 多Agent共享记忆
  • 记忆可视化编辑
  • 记忆安全与隐私保护

记忆,是AI Agent走向「真正的助手」的必经之路。Claude的双模记忆系统为整个行业指明了方向。


参考资料

  1. Anthropic Official Documentation - Memory Files & Dreams
  2. TestingCatalog - Claude Dual-Mode Memory System Report
  3. Claude Platform API Documentation
  4. Managed Agents Beta Documentation