Claude's "Permanent Brain": Deep Analysis of Dual-Mode Memory System and Conway Agent Architecture
Abstract
In May 2026, the AI field witnessed a major technological breakthrough. Anthropic introduced a new dual-mode memory system for Claude—Memory Files and Dreams—along with the 7×24 always-on Conway Agent platform. This marks a crucial step for AI Agents to evolve from the “use and forget” conversation mode to a “persistent memory” intelligent assistant mode. This article provides an in-depth analysis of the technical principles and implementation details of this architecture, with complete Python/Go code examples to help developers understand and build similar AI memory systems.
Keywords: AI Agent, Memory System, Claude, Conway, Memory Files, Dreams, Persistent Memory
1. Background: From “Rolling Notes” to “Permanent Brain”
1.1 The Dilemma of Traditional AI Memory
Before Claude Memory Files, the memory mechanism of most AI assistants (including Claude itself) was essentially a “rolling note”—compressing all user preferences, background, and habits into a single unified summary. This approach is simple and effective, but problems arise:
Problems with Traditional Memory Mode:
1. Limited Information Capacity - Summary length has an upper limit
2. Old Information Overwritten - New conversations overwrite important old memories
3. Topic Interference - Information from different projects gets mixed together
4. Low Retrieval Efficiency - Finding specific information is like finding a needle in a haystack
When a user discusses a long-term writing project with Claude, it may have completely forgotten details about another project they discussed last week. This “amnesia” severely limits AI’s capability boundary as a true intelligent assistant.
1.2 Why Not Just Expand the Context Window?
Many people ask: Can’t we just expand the context window to solve this problem? The answer is: No. The context window has several fundamental limitations:
| Dimension | Limitation |
|---|---|
| Single Session | Data is destroyed after conversation closes |
| Linear Growth | More memory means lower effective information density |
| Retrieval Efficiency | Needle-in-haystack search, high latency |
| Capacity Cost | Token costs grow linearly with context |
Claude’s Memory Files chose a different path: External Storage + Structured Indexing. This is not just a change in technical solution, but a paradigm shift.
2. Core Principles: Dual-Mode Memory System Architecture

2.1 Overall System Architecture
Claude’s dual-mode memory system consists of three core components:
┌─────────────────────────────────────────────────────────────┐
│ Claude Dual-Mode Memory System │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Classic │ │ Memory Files │ │ Dreams │ │
│ │ Memory │ │ (File Memory)│ │ (Dreams) │ │
│ │ (Classic Mode│ │ │ │ │ │
│ │ │ │ • Topic-based│ │ • Async │ │
│ │ Single summary│ │ • Unlimited │ │ consolidation │
│ │ Rolling overwrite│ │ • AI-organized│ │ • Merge duplicates│
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ ▲ Trigger ▲ │
│ │ (5 conversations or 24 hours) │ │
│ │ │ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Memory Stores API │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
2.2 Classic Memory: Traditional Single Summary Mode
Classic Memory is the memory mode Claude has used since its launch. Its working principle is as follows:
# Simplified implementation of Classic Memory
class ClassicMemory:
"""
Traditional single summary memory mode
After each conversation, compress all new information into a summary
"""
def __init__(self, max_summary_length: int = 4096):
self.summary = "" # Single summary
self.max_length = max_summary_length
self.conversation_history = []
def add_interaction(self, user_input: str, assistant_response: str):
"""Add new conversation interaction"""
self.conversation_history.append({
"user": user_input,
"assistant": assistant_response,
"timestamp": datetime.now()
})
# When enough information accumulates, generate new summary
if len(self.conversation_history) >= 5:
self._generate_summary()
def _generate_summary(self):
"""Compress conversation history into a single summary (overwrites old)"""
prompt = f"""
Compress the following conversation into a brief summary:
{self.conversation_history}
Summary should include:
- User's main preferences
- Important project background
- Long-term goals or plans
"""
# Call LLM to generate summary...
self.summary = generated_summary
# Clear history (information compressed into summary)
self.conversation_history = []
def get_context(self) -> str:
"""Get memory context"""
return self.summary
Problem Analysis:
- ✅ Simple implementation
- ✅ Low token consumption during inference
- ❌ Limited capacity (max_summary_length)
- ❌ Information loss (compression is irreversible)
- ❌ Topic interference (different projects mixed together)
2.3 Memory Files: Structured Document Memory
Memory Files is the revolutionary new architecture launched by Anthropic. It provides Claude with a “built-in personal Wiki”:
# Core implementation of Memory Files
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
import hashlib
@dataclass
class MemoryDocument:
"""Memory document structure"""
doc_id: str
topic: str # Topic classification
title: str # Document title
content: str # Document content
created_at: datetime
updated_at: datetime
tags: List[str] = field(default_factory=list)
metadata: Dict = field(default_factory=dict)
class MemoryFiles:
"""
File Memory System
Structured document storage organized by topic
"""
def __init__(self):
# Document storage (grouped by topic)
self.documents: Dict[str, List[MemoryDocument]] = {}
# Global index
self.global_index: Dict[str, str] = {} # keyword -> doc_id
# Vector index (for semantic retrieval)
self.vector_index: Dict[str, List[float]] = {}
def store(self, conversation: str, context: Optional[dict] = None):
"""
Store conversation information, automatically extract and organize
"""
# 1. Use LLM to extract key information and classify
extracted = self._extract_key_info(conversation, context)
# 2. Determine topic classification
topic = self._classify_topic(extracted)
# 3. Create or update document
doc = self._create_or_update_document(topic, extracted)
# 4. Update global index
self._update_index(doc)
return doc
def _extract_key_info(self, conversation: str, context: Optional[dict]) -> dict:
"""
Use LLM to extract key information from conversation
"""
prompt = f"""
Extract key information from the following conversation, return JSON format:
Conversation:
{conversation}
Context: {context or {}}
Return format:
{{
"key_facts": ["list of key facts"],
"preferences": ["user preferences list"],
"action_items": ["to-do list"],
"entities": ["mentioned entities"],
"emotional_tone": "emotional tone"
}}
"""
# Call Claude API to extract information
extracted = call_claude_api(prompt)
return extracted
def _classify_topic(self, extracted: dict) -> str:
"""
Use LLM to classify information into topics
"""
topics = [
"Technical Development", "Product Design", "Project Management",
"Daily Life", "Financial Investment", "Health & Fitness",
"Travel Planning", "Learning Research", "Social Relations", "Entertainment"
]
prompt = f"""
Classify the following information into the most suitable topic:
Extracted information: {extracted}
Available topics: {topics}
Return only one topic name.
"""
return call_claude_api(prompt).strip()
def _create_or_update_document(
self,
topic: str,
extracted: dict
) -> MemoryDocument:
"""
Create new document or update existing document
"""
# Check if document for this topic exists
if topic in self.documents and self.documents[topic]:
# Update existing document
doc = self.documents[topic][-1]
doc.content = self._merge_content(doc.content, extracted)
doc.updated_at = datetime.now()
else:
# Create new document
doc = MemoryDocument(
doc_id=self._generate_doc_id(),
topic=topic,
title=self._generate_title(extracted),
content=str(extracted),
created_at=datetime.now(),
updated_at=datetime.now(),
tags=self._extract_tags(extracted)
)
if topic not in self.documents:
self.documents[topic] = []
self.documents[topic].append(doc)
return doc
def _merge_content(self, existing: str, new: dict) -> str:
"""
Merge old and new content, preserve important historical information
"""
prompt = f"""
Integrate new information into the existing document:
Existing document:
{existing}
New information:
{new}
Rules:
1. Preserve all important historical information
2. Update outdated content with new information
3. Merge duplicate content
4. Maintain clear document structure
Return the updated complete document content.
"""
return call_claude_api(prompt)
def retrieve(self, query: str, max_docs: int = 5) -> List[MemoryDocument]:
"""
Retrieve relevant memories on demand
"""
# 1. Determine query-related topics
relevant_topics = self._find_relevant_topics(query)
# 2. Collect relevant documents
candidates = []
for topic in relevant_topics:
if topic in self.documents:
candidates.extend(self.documents[topic])
# 3. Semantic ranking
ranked = self._rank_documents(query, candidates)
return ranked[:max_docs]
def _find_relevant_topics(self, query: str) -> List[str]:
"""
Find topics related to the query
"""
all_topics = list(self.documents.keys())
prompt = f"""
Determine which topics are related to the following query:
Query: {query}
Available topics: {all_topics}
Return the 3 most relevant topics, ranked by relevance.
"""
return call_claude_api(prompt).split(',')
def _rank_documents(
self,
query: str,
documents: List[MemoryDocument]
) -> List[MemoryDocument]:
"""
Rank documents using vector similarity
"""
query_vector = self._embed_text(query)
scored = []
for doc in documents:
if doc.doc_id in self.vector_index:
similarity = cosine_similarity(
query_vector,
self.vector_index[doc.doc_id]
)
scored.append((similarity, doc))
scored.sort(key=lambda x: x[0], reverse=True)
return [doc for _, doc in scored]
def _embed_text(self, text: str) -> List[float]:
"""Text vectorization"""
# Use Embedding API
return embedding_api.embed(text)
def _generate_doc_id(self) -> str:
"""Generate unique document ID"""
return hashlib.md5(
str(datetime.now()).encode()
).hexdigest()[:12]
def _generate_title(self, content: dict) -> str:
"""Generate document title"""
prompt = f"Generate a short descriptive title for the following content: {content}"
return call_claude_api(prompt)
def _extract_tags(self, content: dict) -> List[str]:
"""Extract tags"""
return content.get('entities', [])
Core Advantages of Memory Files:
| Feature | Classic Memory | Memory Files |
|---|---|---|
| Capacity | 4K Token limit | Theoretical unlimited |
| Organization | Single summary | Topic-based classification |
| Update Method | Overwrite old info | Incremental integration |
| Retrieval Efficiency | O(n) | O(1) topic lookup |
| User Control | Limited | Full control |
2.4 Dreams: REM Sleep-style Memory Consolidation
Dreams is Anthropic’s inspiration from human neuroscience. During REM sleep, the human brain replays daytime experiences, reinforces important memories, and discards useless noise information. Anthropic brought this mechanism to Claude.
# Dreams Memory Consolidation System
import asyncio
from enum import Enum
from typing import List, Optional
from dataclasses import dataclass
import httpx
class DreamStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELED = "canceled"
@dataclass
class DreamInput:
"""Dream input"""
type: str # "memory_store" or "sessions"
memory_store_id: Optional[str] = None
session_ids: Optional[List[str]] = None
@dataclass
class DreamOutput:
"""Dream output"""
type: str
memory_store_id: str
@dataclass
class Dream:
"""Dream task"""
id: str
status: DreamStatus
inputs: List[DreamInput]
outputs: List[DreamOutput]
model: str
instructions: str
created_at: datetime
ended_at: Optional[datetime]
usage: dict
class DreamsService:
"""
Dreams Memory Consolidation Service
Async background execution, periodic memory organization
"""
def __init__(self, api_key: str):
self.client = httpx.AsyncClient(
base_url="https://api.anthropic.com",
headers={"x-api-key": api_key}
)
self.trigger_conditions = {
"min_conversations": 5,
"min_hours": 24
}
async def create_dream(
self,
memory_store_id: str,
session_ids: List[str],
model: str = "claude-opus-4-7",
instructions: Optional[str] = None
) -> Dream:
"""
Create a dream task
Args:
memory_store_id: Existing memory store ID
session_ids: List of session IDs to analyze
model: Model to use for dreaming
instructions: Additional consolidation guidance
Returns:
Dream: Dream task object
"""
response = await self.client.post(
"/v1/beta/dreams/create",
json={
"inputs": [
{"type": "memory_store", "memory_store_id": memory_store_id},
{"type": "sessions", "session_ids": session_ids}
],
"model": model,
"instructions": instructions or "Consolidate memories, deduplicate and update"
}
)
return Dream(**response.json())
async def get_dream(self, dream_id: str) -> Dream:
"""Get dream task status"""
response = await self.client.get(f"/v1/beta/dreams/{dream_id}")
return Dream(**response.json())
async def list_dreams(
self,
limit: int = 20,
status: Optional[DreamStatus] = None
) -> List[Dream]:
"""List dream tasks"""
params = {"limit": limit}
if status:
params["status"] = status.value
response = await self.client.get(
"/v1/beta/dreams/list",
params=params
)
return [Dream(**d) for d in response.json()["dreams"]]
async def cancel_dream(self, dream_id: str) -> bool:
"""Cancel dream task"""
response = await self.client.post(
f"/v1/beta/dreams/{dream_id}/cancel"
)
return response.status_code == 200
def should_trigger_dream(self, memory_store) -> bool:
"""
Determine if a dream should be triggered
Trigger conditions:
1. 5 conversations accumulated
2. OR more than 24 hours since last consolidation
"""
conversation_count = memory_store.get_conversation_count()
if conversation_count >= self.trigger_conditions["min_conversations"]:
return True
last_dream_time = memory_store.get_last_dream_time()
if last_dream_time:
hours_since = (datetime.now() - last_dream_time).total_seconds() / 3600
if hours_since >= self.trigger_conditions["min_hours"]:
return True
return False
async def consolidation_logic(
self,
memory_store,
sessions: List[dict]
) -> dict:
"""
Core logic for memory consolidation
Steps:
1. Merge duplicate entries
2. Update outdated information
3. Resolve logical contradictions
4. Discover hidden patterns
"""
all_memories = []
# Collect existing memories
for doc in memory_store.documents.values():
all_memories.extend(doc)
# Collect session history
for session in sessions:
all_memories.extend(session.get("memories", []))
# Step 1: Merge duplicate entries
merged = self._merge_duplicates(all_memories)
# Step 2: Update outdated information
updated = self._update_stale_entries(merged)
# Step 3: Resolve logical contradictions
resolved = self._resolve_conflicts(updated)
# Step 4: Optimize index structure
optimized = self._optimize_index(resolved)
return optimized
def _merge_duplicates(self, memories: List[dict]) -> List[dict]:
"""
Merge duplicate memory entries
Use semantic similarity to identify duplicates
"""
merged = []
seen_signatures = {}
for memory in memories:
signature = self._generate_signature(memory)
if signature in seen_signatures:
existing = seen_signatures[signature]
merged_memory = self._combine_memories(existing, memory)
seen_signatures[signature] = merged_memory
else:
seen_signatures[signature] = memory
return list(seen_signatures.values())
def _generate_signature(self, memory: dict) -> str:
"""
Generate memory signature
Used to identify duplicate memories
"""
key_info = f"{memory.get('topic', '')}_{memory.get('entity', '')}"
return hashlib.md5(key_info.encode()).hexdigest()[:8]
def _combine_memories(self, existing: dict, new: dict) -> dict:
"""
Merge two memories
Keep latest and most important information
"""
prompt = f"""
Merge the following two related memories, preserving all important information:
Memory 1: {existing}
Memory 2: {new}
Rules:
1. Use the most recent timestamp
2. Preserve all unique details
3. Merge duplicate descriptions
"""
return call_claude_api(prompt)
def _update_stale_entries(self, memories: List[dict]) -> List[dict]:
"""
Update outdated memory entries
For example: Change "we decided to use Redis yesterday" to
"On May 15, 2026, we decided to use Redis"
"""
updated = []
current_time = datetime.now()
for memory in memories:
if self._is_stale(memory, current_time):
memory = self._refresh_timestamp(memory, current_time)
updated.append(memory)
return updated
def _is_stale(self, memory: dict, current_time: datetime) -> bool:
"""Determine if memory is outdated"""
last_update = memory.get("updated_at")
if not last_update:
return True
days_since = (current_time - last_update).days
return days_since > 7 # Memories not updated in 7 days are considered outdated
def _refresh_timestamp(self, memory: dict, current_time: datetime) -> dict:
"""Refresh memory timestamp"""
prompt = f"""
Update relative time in the following memory to absolute time:
Memory: {memory}
Current time: {current_time.strftime('%Y-%m-%d')}
Only update time-related content, keep everything else unchanged.
"""
memory["content"] = call_claude_api(prompt)
memory["updated_at"] = current_time
return memory
def _resolve_conflicts(self, memories: List[dict]) -> List[dict]:
"""
Resolve logical contradictions in memories
For example: User said they like coffee, but later said they quit coffee
"""
conflicts = self._find_conflicts(memories)
for conflict_group in conflicts:
resolved = self._resolve_conflict_group(conflict_group)
memories = [m for m in memories if m not in conflict_group]
memories.append(resolved)
return memories
def _find_conflicts(self, memories: List[dict]) -> List[List[dict]]:
"""Find groups of contradictory memories"""
conflicts = []
for i, mem1 in enumerate(memories):
for mem2 in memories[i+1:]:
if self._are_contradictory(mem1, mem2):
conflicts.append([mem1, mem2])
return conflicts
def _are_contradictory(self, mem1: dict, mem2: dict) -> bool:
"""Determine if two memories are contradictory"""
prompt = f"""
Determine if the following two memories have logical contradictions:
Memory 1: {mem1}
Memory 2: {mem2}
Answer only "yes" or "no".
"""
result = call_claude_api(prompt).strip()
return result.lower() == "yes"
def _resolve_conflict_group(self, conflict_group: List[dict]) -> dict:
"""Resolve a group of conflicting memories"""
prompt = f"""
Analyze and resolve the following contradictory memories, provide a reasonable comprehensive judgment:
Conflicting memories: {conflict_group}
Consider:
1. Which memory is the most recent
2. Which memory is more logical
3. User may have changed preferences
Return the resolved memory.
"""
return call_claude_api(prompt)
def _optimize_index(self, memories: List[dict]) -> dict:
"""
Optimize index structure
Improve retrieval efficiency
"""
# Regroup by topic
topics = {}
for memory in memories:
topic = memory.get("topic", "Uncategorized")
if topic not in topics:
topics[topic] = []
topics[topic].append(memory)
# Generate new global index
new_index = {}
for topic, mems in topics.items():
for mem in mems:
keywords = self._extract_keywords(mem)
for kw in keywords:
if kw not in new_index:
new_index[kw] = []
new_index[kw].append(mem.get("id"))
return {
"topics": topics,
"index": new_index,
"total_count": len(memories)
}
3. Conway Agent: 7×24 Always-On Agent Platform
3.1 Conway Architecture Overview
Conway is the ultimate Agent platform launched by Anthropic. Deeply integrated with Memory Files, it achieves a truly “always-on” intelligent agent:
┌─────────────────────────────────────────────────────────────┐
│ Conway Agent Platform │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Conway Runtime │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Search │ │ Chat │ │ System │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ │ Always-on │ Event Listening │ Webhook │ Auto-trigger│ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Core Capabilities │ │
│ │ • Browser automation control │ │
│ │ • Claude Code execution │ │
│ │ • CNW ZIP extension format │ │
│ │ • Webhook signal reception │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Memory Files Integration │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Storage │◄─►│ Recall │◄─►│ Dreams │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
3.2 Conway Core Implementation
// Conway Agent Implementation in Go
package conway
import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
"time"
)
// Event represents event types Conway can process
type EventType string
const (
EventWebhook EventType = "webhook"
EventSchedule EventType = "schedule"
EventUserMessage EventType = "user_message"
EventSystemNotify EventType = "system_notification"
)
// Event structure
type Event struct {
ID string `json:"id"`
Type EventType `json:"type"`
Payload map[string]interface{} `json:"payload"`
Timestamp time.Time `json:"timestamp"`
Source string `json:"source"`
}
// Action represents actions Conway can execute
type Action struct {
Type string `json:"type"`
Target string `json:"target"`
Parameters map[string]interface{} `json:"parameters"`
Result interface{} `json:"result,omitempty"`
Error error `json:"error,omitempty"`
}
// MemoryStore interface for memory operations
type MemoryStore interface {
Store(ctx context.Context, event Event) error
Retrieve(ctx context.Context, query string, limit int) ([]MemoryDocument, error)
List(ctx context.Context, topic string) ([]MemoryDocument, error)
}
// MemoryDocument represents a memory document
type MemoryDocument struct {
ID string `json:"id"`
Topic string `json:"topic"`
Title string `json:"title"`
Content string `json:"content"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
Tags []string `json:"tags"`
Metadata map[string]interface{} `json:"metadata"`
}
// ConwayConfig holds Conway configuration
type ConwayConfig struct {
MemoryStore MemoryStore
ClaudeEndpoint string
WebhookSecret string
EnableBrowser bool
EnableCodeExec bool
}
// Conway represents the main agent structure
type Conway struct {
config ConwayConfig
eventQueue chan Event
handlers map[EventType][]EventHandler
memory MemoryStore
mu sync.RWMutex
running bool
cancel context.CancelFunc
}
// EventHandler interface for event handlers
type EventHandler interface {
Handle(ctx context.Context, event Event) ([]Action, error)
CanHandle(event Event) bool
}
// NewConway creates a new Conway instance
func NewConway(config ConwayConfig) *Conway {
return &Conway{
config: config,
eventQueue: make(chan Event, 1000),
handlers: make(map[EventType][]EventHandler),
memory: config.MemoryStore,
}
}
// RegisterHandler registers an event handler
func (c *Conway) RegisterHandler(handler EventHandler) {
c.mu.Lock()
defer c.mu.Unlock()
c.handlers[EventWebhook] = append(c.handlers[EventWebhook], handler)
}
// Start starts the Conway Agent
func (c *Conway) Start(ctx context.Context) error {
c.mu.Lock()
if c.running {
c.mu.Unlock()
return fmt.Errorf("Conway is already running")
}
c.running = true
c.mu.Unlock()
ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel
// Start event processing loop
go c.eventLoop(ctx)
// Start memory consolidation goroutine
go c.dreamLoop(ctx)
log.Println("Conway Agent started")
return nil
}
// Stop stops the Conway Agent
func (c *Conway) Stop() error {
c.mu.Lock()
defer c.mu.Unlock()
if !c.running {
return fmt.Errorf("Conway is not running")
}
c.cancel()
c.running = false
close(c.eventQueue)
log.Println("Conway Agent stopped")
return nil
}
// eventLoop is the main event processing loop
func (c *Conway) eventLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case event, ok := <-c.eventQueue:
if !ok {
return
}
c.processEvent(ctx, event)
}
}
}
// processEvent processes a single event
func (c *Conway) processEvent(ctx context.Context, event Event) {
log.Printf("Processing event: %s (%s)", event.ID, event.Type)
// 1. Store event to memory
if err := c.memory.Store(ctx, event); err != nil {
log.Printf("Failed to store event: %v", err)
}
// 2. Retrieve relevant memories
relevantMemories, err := c.memory.Retrieve(ctx, event.Payload["query"], 5)
if err != nil {
log.Printf("Failed to retrieve memories: %v", err)
relevantMemories = []MemoryDocument{}
}
// 3. Dispatch to handlers
c.mu.RLock()
handlers := c.handlers[event.Type]
c.mu.RUnlock()
for _, handler := range handlers {
if handler.CanHandle(event) {
actions, err := handler.Handle(ctx, event)
if err != nil {
log.Printf("Handler error: %v", err)
continue
}
for _, action := range actions {
c.executeAction(ctx, action)
}
}
}
// 4. Generate new Conway event for subsequent processing
c.emitEvent(ctx, Event{
ID: fmt.Sprintf("derived_%d", time.Now().UnixNano()),
Type: EventSystemNotify,
Payload: map[string]interface{}{
"original_event": event.ID,
"memories_used": len(relevantMemories),
"actions_taken": len(handlers),
},
Timestamp: time.Now(),
Source: "conway",
})
}
// executeAction executes an action
func (c *Conway) executeAction(ctx context.Context, action Action) {
log.Printf("Executing action: %s -> %s", action.Type, action.Target)
switch action.Type {
case "browser_navigate":
c.browserNavigate(ctx, action.Target)
case "claude_code":
c.runClaudeCode(ctx, action.Parameters)
case "webhook_send":
c.sendWebhook(ctx, action.Target, action.Parameters)
case "memory_update":
c.updateMemory(ctx, action.Parameters)
default:
log.Printf("Unknown action type: %s", action.Type)
}
}
// EmitEvent sends an event to Conway
func (c *Conway) EmitEvent(event Event) {
c.eventQueue <- event
}
// emitEvent internally emits an event
func (c *Conway) emitEvent(ctx context.Context, event Event) {
select {
case c.eventQueue <- event:
default:
log.Println("Event queue full, dropping event")
}
}
// dreamLoop is the dream consolidation loop
func (c *Conway) dreamLoop(ctx context.Context) {
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
conversationCount := 0
maxConversations := 5
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.triggerDream(ctx)
}
conversationCount++
if conversationCount >= maxConversations {
c.triggerDream(ctx)
conversationCount = 0
}
}
}
// triggerDream triggers memory consolidation
func (c *Conway) triggerDream(ctx context.Context) {
log.Println("Triggering dream consolidation...")
memories, err := c.consolidateMemories(ctx)
if err != nil {
log.Printf("Dream consolidation failed: %v", err)
return
}
log.Printf("Dream completed: %d memories consolidated", len(memories))
}
// consolidateMemories performs memory consolidation
func (c *Conway) consolidateMemories(ctx context.Context) ([]MemoryDocument, error) {
// 1. Get all sessions
sessions := c.getRecentSessions(ctx)
// 2. Get existing memory store
allMemories, err := c.getAllMemories(ctx)
if err != nil {
return nil, err
}
// 3. Perform consolidation (merge duplicates, update stale, resolve conflicts)
consolidated := c.mergeAndOptimize(allMemories, sessions)
// 4. Save consolidated memories
for _, mem := range consolidated {
if err := c.memory.Store(ctx, mem); err != nil {
return nil, err
}
}
return consolidated, nil
}
func (c *Conway) getRecentSessions(ctx context.Context) []map[string]interface{} {
return []map[string]interface{}{}
}
func (c *Conway) getAllMemories(ctx context.Context) ([]MemoryDocument, error) {
return []MemoryDocument{}, nil
}
func (c *Conway) mergeAndOptimize(memories []MemoryDocument, sessions []map[string]interface{}) []MemoryDocument {
return memories
}
4. API Usage Examples
4.1 Python SDK Usage Example
# Claude Memory Files API Usage Example
from anthropic import Anthropic
from datetime import datetime
# Initialize client
client = Anthropic(api_key="sk-ant-api03-...")
# ========== Memory Stores API ==========
# Create memory store
memory_store = client.beta.memory_stores.create(
name="my-project-memory"
)
print(f"Created memory store: {memory_store.id}")
# Add content to memory store
client.beta.memory_stores.messages.create(
memory_store_id=memory_store.id,
role="user",
content="We decided to use PostgreSQL as the main database"
)
client.beta.memory_stores.messages.create(
memory_store_id=memory_store.id,
role="assistant",
content="Got it, I've recorded this decision. PostgreSQL supports JSON type, suitable for storing semi-structured data."
)
# Search memories
results = client.beta.memory_stores.search(
memory_store_id=memory_store.id,
query="database selection",
limit=5
)
print(f"Found {len(results.results)} relevant memories")
# ========== Dreams API ==========
# Create dream task
dream = client.beta.dreams.create(
inputs=[
{"type": "memory_store", "memory_store_id": memory_store.id},
{"type": "sessions", "session_ids": ["session_123", "session_456"]}
],
model="claude-opus-4-7",
instructions="Focus on technology decision-related memories, ignore debugging notes"
)
print(f"Dream created: {dream.id}")
# Poll dream status
import asyncio
async def wait_for_dream(dream_id):
while True:
status = client.beta.dreams.get(dream_id)
print(f"Dream status: {status.status}")
if status.status == "completed":
output_store_id = status.outputs[0].memory_store_id
print(f"New memory store: {output_store_id}")
return output_store_id
elif status.status == "failed":
print(f"Dream failed: {status.error}")
return None
await asyncio.sleep(30)
# Run dream
new_store_id = asyncio.run(wait_for_dream(dream.id))
# ========== Create session with new memory ==========
if new_store_id:
session = client.beta.sessions.create(
agent="my-agent-id",
environment_id="env-123",
resources=[
{"type": "memory_store", "memory_store_id": new_store_id}
]
)
print(f"Session with memory: {session.id}")
4.2 Complete Memory Management System Implementation
# Complete Memory Management System
from anthropic import Anthropic
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
import json
class MemoryMode(Enum):
CLASSIC = "classic"
FILES = "files"
@dataclass
class UserProfile:
"""User profile"""
user_id: str
name: str
preferences: Dict = field(default_factory=dict)
projects: List[str] = field(default_factory=list)
active_project: Optional[str] = None
class ClaudeMemoryManager:
"""
Claude Memory Manager
Supports dual-mode memory switching
"""
def __init__(self, api_key: str, mode: MemoryMode = MemoryMode.FILES):
self.client = Anthropic(api_key=api_key)
self.mode = mode
self.memory_stores: Dict[str, str] = {} # project -> store_id
self.user_profile: Optional[UserProfile] = None
# ========== Memory Store Management ==========
def create_memory_store(self, name: str) -> str:
"""Create memory store"""
store = self.client.beta.memory_stores.create(name=name)
self.memory_stores[name] = store.id
return store.id
def get_memory_store(self, name: str) -> Optional[str]:
"""Get memory store ID"""
return self.memory_stores.get(name)
def store_conversation(
self,
store_name: str,
messages: List[Dict],
metadata: Optional[Dict] = None
):
"""
Store conversation to memory
Args:
store_name: Memory store name
messages: Conversation message list
metadata: Metadata (e.g., project info)
"""
store_id = self.get_memory_store(store_name)
if not store_id:
store_id = self.create_memory_store(store_name)
for msg in messages:
self.client.beta.memory_stores.messages.create(
memory_store_id=store_id,
role=msg["role"],
content=msg["content"],
metadata=metadata
)
def retrieve_memories(
self,
store_name: str,
query: str,
limit: int = 5
) -> List[Dict]:
"""Retrieve relevant memories"""
store_id = self.get_memory_store(store_name)
if not store_id:
return []
results = self.client.beta.memory_stores.search(
memory_store_id=store_id,
query=query,
limit=limit
)
return [
{
"content": r.content,
"relevance": r.relevance,
"topic": r.topic
}
for r in results.results
]
# ========== Dream Consolidation ==========
def trigger_dream(
self,
store_name: str,
session_ids: List[str],
focus_areas: Optional[List[str]] = None
) -> str:
"""
Trigger dream consolidation
Args:
store_name: Memory store name
session_ids: List of session IDs to consolidate
focus_areas: Areas to focus on
Returns:
New memory store ID
"""
store_id = self.get_memory_store(store_name)
if not store_id:
raise ValueError(f"Memory store not found: {store_name}")
instructions = ""
if focus_areas:
instructions = f"Focus on: {', '.join(focus_areas)}"
dream = self.client.beta.dreams.create(
inputs=[
{"type": "memory_store", "memory_store_id": store_id},
{"type": "sessions", "session_ids": session_ids}
],
model="claude-opus-4-7",
instructions=instructions
)
return dream.id
def wait_for_dream(self, dream_id: str, timeout: int = 600) -> Optional[str]:
"""
Wait for dream completion and return new memory store
Args:
dream_id: Dream ID
timeout: Timeout in seconds
Returns:
New memory store ID, None on failure
"""
import time
start = time.time()
while time.time() - start < timeout:
status = self.client.beta.dreams.get(dream_id)
if status.status == "completed":
if status.outputs:
return status.outputs[0].memory_store_id
return None
elif status.status == "failed":
print(f"Dream failed: {status.error}")
return None
time.sleep(30)
return None
# ========== Session Creation ==========
def create_session_with_memory(
self,
agent_id: str,
environment_id: str,
store_name: str
) -> Dict:
"""
Create session with memory
"""
store_id = self.get_memory_store(store_name)
if not store_id:
store_id = self.create_memory_store(store_name)
session = self.client.beta.sessions.create(
agent=agent_id,
environment_id=environment_id,
resources=[
{"type": "memory_store", "memory_store_id": store_id}
]
)
return {
"session_id": session.id,
"memory_store_id": store_id
}
# ========== Auto Memory Management ==========
def should_trigger_dream(self, store_name: str) -> bool:
"""
Determine if dream should be triggered
Based on conversation count and time
"""
conversation_count = len(self.client.beta.sessions.list())
hours_since_last = 24
return conversation_count >= 5 or hours_since_last >= 24
def auto_memory_management(self, store_name: str):
"""
Auto memory management
Automatically trigger dream consolidation based on conditions
"""
if self.should_trigger_dream(store_name):
sessions = self.client.beta.sessions.list(limit=10)
session_ids = [s.id for s in sessions]
dream_id = self.trigger_dream(store_name, session_ids)
return self.wait_for_dream(dream_id)
return self.get_memory_store(store_name)
# ========== Usage Example ==========
def main():
# Initialize manager
manager = ClaudeMemoryManager(
api_key="sk-ant-api03-...",
mode=MemoryMode.FILES
)
# Create project memory store
project_store = "my-ai-project"
manager.create_memory_store(project_store)
# Store conversation
messages = [
{"role": "user", "content": "I want to implement an RAG system"},
{"role": "assistant", "content": "A RAG system needs three core components: vector database, retriever, and generator"},
{"role": "user", "content": "We use ChromaDB as the vector database"},
{"role": "assistant", "content": "Good, ChromaDB is easy to deploy and supports multiple embedding models"},
]
manager.store_conversation(
project_store,
messages,
metadata={"project": "my-ai-project", "topic": "RAG"}
)
# Retrieve memories
memories = manager.retrieve_memories(
project_store,
"vector database selection"
)
print(f"Found {len(memories)} relevant memories")
# Trigger dream consolidation
dream_id = manager.trigger_dream(
project_store,
session_ids=["session-1", "session-2"],
focus_areas=["Technology Selection", "Database"]
)
# Wait for dream completion
new_store_id = manager.wait_for_dream(dream_id)
if new_store_id:
print(f"Memory consolidated: {new_store_id}")
if __name__ == "__main__":
main()
5. Application Scenarios
5.1 Enterprise Applications
Scenario 1: Intelligent Customer Service System
Requirements:
- Remember user preferences across sessions
- Understand user's historical questions
- Provide coherent conversation experience
Implementation:
- Maintain independent memory store for each user
- Regular dream consolidation to optimize memories
- Retrieve historical preferences for faster response
Scenario 2: Code Development Assistant
# Memory implementation for code assistant
class CodeAssistantMemory:
"""Professional memory for code development assistant"""
def __init__(self, manager: ClaudeMemoryManager):
self.manager = manager
self.project_store = "code-assistant"
# Initialize project memory
manager.create_memory_store(self.project_store)
def remember_code_style(self, project: str, style_rules: dict):
"""Remember code style preferences"""
self.manager.store_conversation(
self.project_store,
messages=[{
"role": "system",
"content": f"Code style rules: {json.dumps(style_rules)}"
}],
metadata={"project": project, "type": "code_style"}
)
def remember_project_context(self, project: str, context: dict):
"""Remember project context"""
self.manager.store_conversation(
self.project_store,
messages=[{
"role": "system",
"content": f"Project context: {json.dumps(context)}"
}],
metadata={"project": project, "type": "context"}
)
def get_code_guidance(self, query: str) -> List[Dict]:
"""Get code guidance memories"""
memories = self.manager.retrieve_memories(
self.project_store,
query=f"code style {query}"
)
return [m for m in memories if m.get("topic") == "code_style"]
5.2 Personal Assistant Applications
Scenario 3: Personal Knowledge Management
Architecture:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Reading │───▶│ Memory │───▶│ Retrieval │
│ Input │ │ Storage │ │ Output │
│(Articles) │ │ (Topics) │ │(Q&A) │
└─────────────┘ └─────────────┘ └─────────────┘
▲
│
┌─────────────┐
│ Dreams │
│(Periodic) │
└─────────────┘
6. Comparison with Traditional RAG
| Dimension | Traditional RAG | Claude Memory |
|---|---|---|
| Data Source | External documents | Conversation history + project files |
| Organization | Raw chunks | AI-reconstructed structured documents |
| Update Method | Manual re-indexing | Auto consolidation (Dreams) |
| Personalization | Weak | Strong |
| Applicable Scenarios | Enterprise knowledge base | Personal assistant/work partner |
| Context Utilization | Passive retrieval | Active learning |
7. Conclusion and Outlook
Claude’s dual-mode memory system represents a major breakthrough in AI Agent memory architecture. By externalizing and structuring memory, combined with automated consolidation mechanisms, Claude finally has the capability of a “permanent brain”.
Core Innovations:
- Memory Files: Breaks capacity limitations, enables unlimited memory
- Dreams: Simulates human sleep, achieves automatic memory organization
- Conway: 7×24 always-on Agent runtime
Implications for Developers:
- Don’t rely solely on context for memory—capacity is limited, costs are high
- Design good memory storage structure—topic-based organization is better than chronological logs
- Consider “memory consolidation” mechanisms—long-term use creates fragmentation, needs regular cleanup
- Balance personalization and generalization—let AI remember users, but don’t let users limit AI
Future Outlook:
- Cross-device memory synchronization
- Multi-Agent shared memory
- Memory visualization editing
- Memory security and privacy protection
Memory is the essential path for AI Agents to become “true assistants”. Claude’s dual-mode memory system points the way for the entire industry.
References
- Anthropic Official Documentation - Memory Files & Dreams
- TestingCatalog - Claude Dual-Mode Memory System Report
- Claude Platform API Documentation
- Managed Agents Beta Documentation