Claude's "Permanent Brain": Deep Analysis of Dual-Mode Memory System and Conway Agent Architecture

Abstract

In May 2026, the AI field witnessed a major technological breakthrough. Anthropic introduced a new dual-mode memory system for Claude—Memory Files and Dreams—along with the 7×24 always-on Conway Agent platform. This marks a crucial step for AI Agents to evolve from the “use and forget” conversation mode to a “persistent memory” intelligent assistant mode. This article provides an in-depth analysis of the technical principles and implementation details of this architecture, with complete Python/Go code examples to help developers understand and build similar AI memory systems.

Keywords: AI Agent, Memory System, Claude, Conway, Memory Files, Dreams, Persistent Memory


1. Background: From “Rolling Notes” to “Permanent Brain”

1.1 The Dilemma of Traditional AI Memory

Before Claude Memory Files, the memory mechanism of most AI assistants (including Claude itself) was essentially a “rolling note”—compressing all user preferences, background, and habits into a single unified summary. This approach is simple and effective, but problems arise:

Problems with Traditional Memory Mode:
1. Limited Information Capacity - Summary length has an upper limit
2. Old Information Overwritten - New conversations overwrite important old memories
3. Topic Interference - Information from different projects gets mixed together
4. Low Retrieval Efficiency - Finding specific information is like finding a needle in a haystack

When a user discusses a long-term writing project with Claude, it may have completely forgotten details about another project they discussed last week. This “amnesia” severely limits AI’s capability boundary as a true intelligent assistant.

1.2 Why Not Just Expand the Context Window?

Many people ask: Can’t we just expand the context window to solve this problem? The answer is: No. The context window has several fundamental limitations:

DimensionLimitation
Single SessionData is destroyed after conversation closes
Linear GrowthMore memory means lower effective information density
Retrieval EfficiencyNeedle-in-haystack search, high latency
Capacity CostToken costs grow linearly with context

Claude’s Memory Files chose a different path: External Storage + Structured Indexing. This is not just a change in technical solution, but a paradigm shift.


2. Core Principles: Dual-Mode Memory System Architecture

System Architecture Diagram

2.1 Overall System Architecture

Claude’s dual-mode memory system consists of three core components:

┌─────────────────────────────────────────────────────────────┐
│                  Claude Dual-Mode Memory System              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐  │
│  │ Classic      │    │ Memory Files │    │    Dreams    │  │
│  │ Memory       │    │ (File Memory)│    │   (Dreams)   │  │
│  │ (Classic Mode│    │              │    │              │  │
│  │              │    │ • Topic-based│    │ • Async      │  │
│  │ Single summary│    │ • Unlimited │    │   consolidation │
│  │ Rolling overwrite│ │ • AI-organized│  │ • Merge duplicates│
│  └──────────────┘    └──────────────┘    └──────────────┘  │
│                                                             │
│         ▲              Trigger              ▲               │
│         │    (5 conversations or 24 hours) │               │
│         │                                    │              │
│  ┌──────────────────────────────────────────────────────┐  │
│  │              Memory Stores API                       │  │
│  └──────────────────────────────────────────────────────┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2.2 Classic Memory: Traditional Single Summary Mode

Classic Memory is the memory mode Claude has used since its launch. Its working principle is as follows:

# Simplified implementation of Classic Memory
class ClassicMemory:
    """
    Traditional single summary memory mode
    After each conversation, compress all new information into a summary
    """
    
    def __init__(self, max_summary_length: int = 4096):
        self.summary = ""  # Single summary
        self.max_length = max_summary_length
        self.conversation_history = []
    
    def add_interaction(self, user_input: str, assistant_response: str):
        """Add new conversation interaction"""
        self.conversation_history.append({
            "user": user_input,
            "assistant": assistant_response,
            "timestamp": datetime.now()
        })
        
        # When enough information accumulates, generate new summary
        if len(self.conversation_history) >= 5:
            self._generate_summary()
    
    def _generate_summary(self):
        """Compress conversation history into a single summary (overwrites old)"""
        prompt = f"""
        Compress the following conversation into a brief summary:
        {self.conversation_history}
        
        Summary should include:
        - User's main preferences
        - Important project background
        - Long-term goals or plans
        """
        # Call LLM to generate summary...
        self.summary = generated_summary
        # Clear history (information compressed into summary)
        self.conversation_history = []
    
    def get_context(self) -> str:
        """Get memory context"""
        return self.summary

Problem Analysis:

  • ✅ Simple implementation
  • ✅ Low token consumption during inference
  • ❌ Limited capacity (max_summary_length)
  • ❌ Information loss (compression is irreversible)
  • ❌ Topic interference (different projects mixed together)

2.3 Memory Files: Structured Document Memory

Memory Files is the revolutionary new architecture launched by Anthropic. It provides Claude with a “built-in personal Wiki”:

# Core implementation of Memory Files
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
import hashlib

@dataclass
class MemoryDocument:
    """Memory document structure"""
    doc_id: str
    topic: str  # Topic classification
    title: str  # Document title
    content: str  # Document content
    created_at: datetime
    updated_at: datetime
    tags: List[str] = field(default_factory=list)
    metadata: Dict = field(default_factory=dict)

class MemoryFiles:
    """
    File Memory System
    Structured document storage organized by topic
    """
    
    def __init__(self):
        # Document storage (grouped by topic)
        self.documents: Dict[str, List[MemoryDocument]] = {}
        # Global index
        self.global_index: Dict[str, str] = {}  # keyword -> doc_id
        # Vector index (for semantic retrieval)
        self.vector_index: Dict[str, List[float]] = {}
    
    def store(self, conversation: str, context: Optional[dict] = None):
        """
        Store conversation information, automatically extract and organize
        """
        # 1. Use LLM to extract key information and classify
        extracted = self._extract_key_info(conversation, context)
        
        # 2. Determine topic classification
        topic = self._classify_topic(extracted)
        
        # 3. Create or update document
        doc = self._create_or_update_document(topic, extracted)
        
        # 4. Update global index
        self._update_index(doc)
        
        return doc
    
    def _extract_key_info(self, conversation: str, context: Optional[dict]) -> dict:
        """
        Use LLM to extract key information from conversation
        """
        prompt = f"""
        Extract key information from the following conversation, return JSON format:
        
        Conversation:
        {conversation}
        
        Context: {context or {}}
        
        Return format:
        {{
            "key_facts": ["list of key facts"],
            "preferences": ["user preferences list"],
            "action_items": ["to-do list"],
            "entities": ["mentioned entities"],
            "emotional_tone": "emotional tone"
        }}
        """
        # Call Claude API to extract information
        extracted = call_claude_api(prompt)
        return extracted
    
    def _classify_topic(self, extracted: dict) -> str:
        """
        Use LLM to classify information into topics
        """
        topics = [
            "Technical Development", "Product Design", "Project Management", 
            "Daily Life", "Financial Investment", "Health & Fitness",
            "Travel Planning", "Learning Research", "Social Relations", "Entertainment"
        ]
        
        prompt = f"""
        Classify the following information into the most suitable topic:
        
        Extracted information: {extracted}
        
        Available topics: {topics}
        
        Return only one topic name.
        """
        return call_claude_api(prompt).strip()
    
    def _create_or_update_document(
        self, 
        topic: str, 
        extracted: dict
    ) -> MemoryDocument:
        """
        Create new document or update existing document
        """
        # Check if document for this topic exists
        if topic in self.documents and self.documents[topic]:
            # Update existing document
            doc = self.documents[topic][-1]
            doc.content = self._merge_content(doc.content, extracted)
            doc.updated_at = datetime.now()
        else:
            # Create new document
            doc = MemoryDocument(
                doc_id=self._generate_doc_id(),
                topic=topic,
                title=self._generate_title(extracted),
                content=str(extracted),
                created_at=datetime.now(),
                updated_at=datetime.now(),
                tags=self._extract_tags(extracted)
            )
            if topic not in self.documents:
                self.documents[topic] = []
            self.documents[topic].append(doc)
        
        return doc
    
    def _merge_content(self, existing: str, new: dict) -> str:
        """
        Merge old and new content, preserve important historical information
        """
        prompt = f"""
        Integrate new information into the existing document:
        
        Existing document:
        {existing}
        
        New information:
        {new}
        
        Rules:
        1. Preserve all important historical information
        2. Update outdated content with new information
        3. Merge duplicate content
        4. Maintain clear document structure
        
        Return the updated complete document content.
        """
        return call_claude_api(prompt)
    
    def retrieve(self, query: str, max_docs: int = 5) -> List[MemoryDocument]:
        """
        Retrieve relevant memories on demand
        """
        # 1. Determine query-related topics
        relevant_topics = self._find_relevant_topics(query)
        
        # 2. Collect relevant documents
        candidates = []
        for topic in relevant_topics:
            if topic in self.documents:
                candidates.extend(self.documents[topic])
        
        # 3. Semantic ranking
        ranked = self._rank_documents(query, candidates)
        
        return ranked[:max_docs]
    
    def _find_relevant_topics(self, query: str) -> List[str]:
        """
        Find topics related to the query
        """
        all_topics = list(self.documents.keys())
        prompt = f"""
        Determine which topics are related to the following query:
        
        Query: {query}
        
        Available topics: {all_topics}
        
        Return the 3 most relevant topics, ranked by relevance.
        """
        return call_claude_api(prompt).split(',')
    
    def _rank_documents(
        self, 
        query: str, 
        documents: List[MemoryDocument]
    ) -> List[MemoryDocument]:
        """
        Rank documents using vector similarity
        """
        query_vector = self._embed_text(query)
        
        scored = []
        for doc in documents:
            if doc.doc_id in self.vector_index:
                similarity = cosine_similarity(
                    query_vector, 
                    self.vector_index[doc.doc_id]
                )
                scored.append((similarity, doc))
        
        scored.sort(key=lambda x: x[0], reverse=True)
        return [doc for _, doc in scored]
    
    def _embed_text(self, text: str) -> List[float]:
        """Text vectorization"""
        # Use Embedding API
        return embedding_api.embed(text)
    
    def _generate_doc_id(self) -> str:
        """Generate unique document ID"""
        return hashlib.md5(
            str(datetime.now()).encode()
        ).hexdigest()[:12]
    
    def _generate_title(self, content: dict) -> str:
        """Generate document title"""
        prompt = f"Generate a short descriptive title for the following content: {content}"
        return call_claude_api(prompt)
    
    def _extract_tags(self, content: dict) -> List[str]:
        """Extract tags"""
        return content.get('entities', [])

Core Advantages of Memory Files:

FeatureClassic MemoryMemory Files
Capacity4K Token limitTheoretical unlimited
OrganizationSingle summaryTopic-based classification
Update MethodOverwrite old infoIncremental integration
Retrieval EfficiencyO(n)O(1) topic lookup
User ControlLimitedFull control

2.4 Dreams: REM Sleep-style Memory Consolidation

Dreams is Anthropic’s inspiration from human neuroscience. During REM sleep, the human brain replays daytime experiences, reinforces important memories, and discards useless noise information. Anthropic brought this mechanism to Claude.

# Dreams Memory Consolidation System
import asyncio
from enum import Enum
from typing import List, Optional
from dataclasses import dataclass
import httpx

class DreamStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELED = "canceled"

@dataclass
class DreamInput:
    """Dream input"""
    type: str  # "memory_store" or "sessions"
    memory_store_id: Optional[str] = None
    session_ids: Optional[List[str]] = None

@dataclass
class DreamOutput:
    """Dream output"""
    type: str
    memory_store_id: str

@dataclass
class Dream:
    """Dream task"""
    id: str
    status: DreamStatus
    inputs: List[DreamInput]
    outputs: List[DreamOutput]
    model: str
    instructions: str
    created_at: datetime
    ended_at: Optional[datetime]
    usage: dict

class DreamsService:
    """
    Dreams Memory Consolidation Service
    Async background execution, periodic memory organization
    """
    
    def __init__(self, api_key: str):
        self.client = httpx.AsyncClient(
            base_url="https://api.anthropic.com",
            headers={"x-api-key": api_key}
        )
        self.trigger_conditions = {
            "min_conversations": 5,
            "min_hours": 24
        }
    
    async def create_dream(
        self,
        memory_store_id: str,
        session_ids: List[str],
        model: str = "claude-opus-4-7",
        instructions: Optional[str] = None
    ) -> Dream:
        """
        Create a dream task
        
        Args:
            memory_store_id: Existing memory store ID
            session_ids: List of session IDs to analyze
            model: Model to use for dreaming
            instructions: Additional consolidation guidance
        
        Returns:
            Dream: Dream task object
        """
        response = await self.client.post(
            "/v1/beta/dreams/create",
            json={
                "inputs": [
                    {"type": "memory_store", "memory_store_id": memory_store_id},
                    {"type": "sessions", "session_ids": session_ids}
                ],
                "model": model,
                "instructions": instructions or "Consolidate memories, deduplicate and update"
            }
        )
        return Dream(**response.json())
    
    async def get_dream(self, dream_id: str) -> Dream:
        """Get dream task status"""
        response = await self.client.get(f"/v1/beta/dreams/{dream_id}")
        return Dream(**response.json())
    
    async def list_dreams(
        self, 
        limit: int = 20,
        status: Optional[DreamStatus] = None
    ) -> List[Dream]:
        """List dream tasks"""
        params = {"limit": limit}
        if status:
            params["status"] = status.value
        
        response = await self.client.get(
            "/v1/beta/dreams/list",
            params=params
        )
        return [Dream(**d) for d in response.json()["dreams"]]
    
    async def cancel_dream(self, dream_id: str) -> bool:
        """Cancel dream task"""
        response = await self.client.post(
            f"/v1/beta/dreams/{dream_id}/cancel"
        )
        return response.status_code == 200
    
    def should_trigger_dream(self, memory_store) -> bool:
        """
        Determine if a dream should be triggered
        
        Trigger conditions:
        1. 5 conversations accumulated
        2. OR more than 24 hours since last consolidation
        """
        conversation_count = memory_store.get_conversation_count()
        if conversation_count >= self.trigger_conditions["min_conversations"]:
            return True
        
        last_dream_time = memory_store.get_last_dream_time()
        if last_dream_time:
            hours_since = (datetime.now() - last_dream_time).total_seconds() / 3600
            if hours_since >= self.trigger_conditions["min_hours"]:
                return True
        
        return False
    
    async def consolidation_logic(
        self,
        memory_store,
        sessions: List[dict]
    ) -> dict:
        """
        Core logic for memory consolidation
        
        Steps:
        1. Merge duplicate entries
        2. Update outdated information
        3. Resolve logical contradictions
        4. Discover hidden patterns
        """
        all_memories = []
        
        # Collect existing memories
        for doc in memory_store.documents.values():
            all_memories.extend(doc)
        
        # Collect session history
        for session in sessions:
            all_memories.extend(session.get("memories", []))
        
        # Step 1: Merge duplicate entries
        merged = self._merge_duplicates(all_memories)
        
        # Step 2: Update outdated information
        updated = self._update_stale_entries(merged)
        
        # Step 3: Resolve logical contradictions
        resolved = self._resolve_conflicts(updated)
        
        # Step 4: Optimize index structure
        optimized = self._optimize_index(resolved)
        
        return optimized
    
    def _merge_duplicates(self, memories: List[dict]) -> List[dict]:
        """
        Merge duplicate memory entries
        Use semantic similarity to identify duplicates
        """
        merged = []
        seen_signatures = {}
        
        for memory in memories:
            signature = self._generate_signature(memory)
            
            if signature in seen_signatures:
                existing = seen_signatures[signature]
                merged_memory = self._combine_memories(existing, memory)
                seen_signatures[signature] = merged_memory
            else:
                seen_signatures[signature] = memory
        
        return list(seen_signatures.values())
    
    def _generate_signature(self, memory: dict) -> str:
        """
        Generate memory signature
        Used to identify duplicate memories
        """
        key_info = f"{memory.get('topic', '')}_{memory.get('entity', '')}"
        return hashlib.md5(key_info.encode()).hexdigest()[:8]
    
    def _combine_memories(self, existing: dict, new: dict) -> dict:
        """
        Merge two memories
        Keep latest and most important information
        """
        prompt = f"""
        Merge the following two related memories, preserving all important information:
        
        Memory 1: {existing}
        Memory 2: {new}
        
        Rules:
        1. Use the most recent timestamp
        2. Preserve all unique details
        3. Merge duplicate descriptions
        """
        return call_claude_api(prompt)
    
    def _update_stale_entries(self, memories: List[dict]) -> List[dict]:
        """
        Update outdated memory entries
        For example: Change "we decided to use Redis yesterday" to
              "On May 15, 2026, we decided to use Redis"
        """
        updated = []
        current_time = datetime.now()
        
        for memory in memories:
            if self._is_stale(memory, current_time):
                memory = self._refresh_timestamp(memory, current_time)
            updated.append(memory)
        
        return updated
    
    def _is_stale(self, memory: dict, current_time: datetime) -> bool:
        """Determine if memory is outdated"""
        last_update = memory.get("updated_at")
        if not last_update:
            return True
        
        days_since = (current_time - last_update).days
        return days_since > 7  # Memories not updated in 7 days are considered outdated
    
    def _refresh_timestamp(self, memory: dict, current_time: datetime) -> dict:
        """Refresh memory timestamp"""
        prompt = f"""
        Update relative time in the following memory to absolute time:
        
        Memory: {memory}
        Current time: {current_time.strftime('%Y-%m-%d')}
        
        Only update time-related content, keep everything else unchanged.
        """
        memory["content"] = call_claude_api(prompt)
        memory["updated_at"] = current_time
        return memory
    
    def _resolve_conflicts(self, memories: List[dict]) -> List[dict]:
        """
        Resolve logical contradictions in memories
        For example: User said they like coffee, but later said they quit coffee
        """
        conflicts = self._find_conflicts(memories)
        
        for conflict_group in conflicts:
            resolved = self._resolve_conflict_group(conflict_group)
            memories = [m for m in memories if m not in conflict_group]
            memories.append(resolved)
        
        return memories
    
    def _find_conflicts(self, memories: List[dict]) -> List[List[dict]]:
        """Find groups of contradictory memories"""
        conflicts = []
        for i, mem1 in enumerate(memories):
            for mem2 in memories[i+1:]:
                if self._are_contradictory(mem1, mem2):
                    conflicts.append([mem1, mem2])
        return conflicts
    
    def _are_contradictory(self, mem1: dict, mem2: dict) -> bool:
        """Determine if two memories are contradictory"""
        prompt = f"""
        Determine if the following two memories have logical contradictions:
        
        Memory 1: {mem1}
        Memory 2: {mem2}
        
        Answer only "yes" or "no".
        """
        result = call_claude_api(prompt).strip()
        return result.lower() == "yes"
    
    def _resolve_conflict_group(self, conflict_group: List[dict]) -> dict:
        """Resolve a group of conflicting memories"""
        prompt = f"""
        Analyze and resolve the following contradictory memories, provide a reasonable comprehensive judgment:
        
        Conflicting memories: {conflict_group}
        
        Consider:
        1. Which memory is the most recent
        2. Which memory is more logical
        3. User may have changed preferences
        
        Return the resolved memory.
        """
        return call_claude_api(prompt)
    
    def _optimize_index(self, memories: List[dict]) -> dict:
        """
        Optimize index structure
        Improve retrieval efficiency
        """
        # Regroup by topic
        topics = {}
        for memory in memories:
            topic = memory.get("topic", "Uncategorized")
            if topic not in topics:
                topics[topic] = []
            topics[topic].append(memory)
        
        # Generate new global index
        new_index = {}
        for topic, mems in topics.items():
            for mem in mems:
                keywords = self._extract_keywords(mem)
                for kw in keywords:
                    if kw not in new_index:
                        new_index[kw] = []
                    new_index[kw].append(mem.get("id"))
        
        return {
            "topics": topics,
            "index": new_index,
            "total_count": len(memories)
        }

3. Conway Agent: 7×24 Always-On Agent Platform

3.1 Conway Architecture Overview

Conway is the ultimate Agent platform launched by Anthropic. Deeply integrated with Memory Files, it achieves a truly “always-on” intelligent agent:

┌─────────────────────────────────────────────────────────────┐
│                    Conway Agent Platform                      │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐    │
│  │                  Conway Runtime                      │    │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐           │    │
│  │  │ Search  │  │  Chat   │  │ System  │           │    │
│  │  └─────────┘  └─────────┘  └─────────┘           │    │
│  │                                                     │    │
│  │  Always-on │ Event Listening │ Webhook │ Auto-trigger│    │
│  └─────────────────────────────────────────────────────┘    │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              Core Capabilities                        │    │
│  │  • Browser automation control                        │    │
│  │  • Claude Code execution                             │    │
│  │  • CNW ZIP extension format                         │    │
│  │  • Webhook signal reception                          │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              Memory Files Integration                 │    │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐           │    │
│  │  │ Storage │◄─►│ Recall │◄─►│ Dreams  │           │    │
│  │  └─────────┘  └─────────┘  └─────────┘           │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                             │
└─────────────────────────────────────────────────────────────┘

3.2 Conway Core Implementation

// Conway Agent Implementation in Go
package conway

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "sync"
    "time"
)

// Event represents event types Conway can process
type EventType string

const (
    EventWebhook       EventType = "webhook"
    EventSchedule      EventType = "schedule"
    EventUserMessage   EventType = "user_message"
    EventSystemNotify  EventType = "system_notification"
)

// Event structure
type Event struct {
    ID        string                 `json:"id"`
    Type      EventType              `json:"type"`
    Payload   map[string]interface{} `json:"payload"`
    Timestamp time.Time              `json:"timestamp"`
    Source    string                 `json:"source"`
}

// Action represents actions Conway can execute
type Action struct {
    Type       string                 `json:"type"`
    Target     string                 `json:"target"`
    Parameters map[string]interface{} `json:"parameters"`
    Result     interface{}           `json:"result,omitempty"`
    Error      error                 `json:"error,omitempty"`
}

// MemoryStore interface for memory operations
type MemoryStore interface {
    Store(ctx context.Context, event Event) error
    Retrieve(ctx context.Context, query string, limit int) ([]MemoryDocument, error)
    List(ctx context.Context, topic string) ([]MemoryDocument, error)
}

// MemoryDocument represents a memory document
type MemoryDocument struct {
    ID        string                 `json:"id"`
    Topic     string                 `json:"topic"`
    Title     string                 `json:"title"`
    Content   string                 `json:"content"`
    CreatedAt time.Time              `json:"created_at"`
    UpdatedAt time.Time              `json:"updated_at"`
    Tags      []string               `json:"tags"`
    Metadata  map[string]interface{} `json:"metadata"`
}

// ConwayConfig holds Conway configuration
type ConwayConfig struct {
    MemoryStore    MemoryStore
    ClaudeEndpoint string
    WebhookSecret  string
    EnableBrowser  bool
    EnableCodeExec bool
}

// Conway represents the main agent structure
type Conway struct {
    config     ConwayConfig
    eventQueue chan Event
    handlers   map[EventType][]EventHandler
    memory     MemoryStore
    mu         sync.RWMutex
    running    bool
    cancel     context.CancelFunc
}

// EventHandler interface for event handlers
type EventHandler interface {
    Handle(ctx context.Context, event Event) ([]Action, error)
    CanHandle(event Event) bool
}

// NewConway creates a new Conway instance
func NewConway(config ConwayConfig) *Conway {
    return &Conway{
        config:     config,
        eventQueue: make(chan Event, 1000),
        handlers:   make(map[EventType][]EventHandler),
        memory:     config.MemoryStore,
    }
}

// RegisterHandler registers an event handler
func (c *Conway) RegisterHandler(handler EventHandler) {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    c.handlers[EventWebhook] = append(c.handlers[EventWebhook], handler)
}

// Start starts the Conway Agent
func (c *Conway) Start(ctx context.Context) error {
    c.mu.Lock()
    if c.running {
        c.mu.Unlock()
        return fmt.Errorf("Conway is already running")
    }
    c.running = true
    c.mu.Unlock()
    
    ctx, cancel := context.WithCancel(ctx)
    c.cancel = cancel
    
    // Start event processing loop
    go c.eventLoop(ctx)
    
    // Start memory consolidation goroutine
    go c.dreamLoop(ctx)
    
    log.Println("Conway Agent started")
    return nil
}

// Stop stops the Conway Agent
func (c *Conway) Stop() error {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    if !c.running {
        return fmt.Errorf("Conway is not running")
    }
    
    c.cancel()
    c.running = false
    close(c.eventQueue)
    
    log.Println("Conway Agent stopped")
    return nil
}

// eventLoop is the main event processing loop
func (c *Conway) eventLoop(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case event, ok := <-c.eventQueue:
            if !ok {
                return
            }
            c.processEvent(ctx, event)
        }
    }
}

// processEvent processes a single event
func (c *Conway) processEvent(ctx context.Context, event Event) {
    log.Printf("Processing event: %s (%s)", event.ID, event.Type)
    
    // 1. Store event to memory
    if err := c.memory.Store(ctx, event); err != nil {
        log.Printf("Failed to store event: %v", err)
    }
    
    // 2. Retrieve relevant memories
    relevantMemories, err := c.memory.Retrieve(ctx, event.Payload["query"], 5)
    if err != nil {
        log.Printf("Failed to retrieve memories: %v", err)
        relevantMemories = []MemoryDocument{}
    }
    
    // 3. Dispatch to handlers
    c.mu.RLock()
    handlers := c.handlers[event.Type]
    c.mu.RUnlock()
    
    for _, handler := range handlers {
        if handler.CanHandle(event) {
            actions, err := handler.Handle(ctx, event)
            if err != nil {
                log.Printf("Handler error: %v", err)
                continue
            }
            
            for _, action := range actions {
                c.executeAction(ctx, action)
            }
        }
    }
    
    // 4. Generate new Conway event for subsequent processing
    c.emitEvent(ctx, Event{
        ID:        fmt.Sprintf("derived_%d", time.Now().UnixNano()),
        Type:      EventSystemNotify,
        Payload: map[string]interface{}{
            "original_event": event.ID,
            "memories_used":  len(relevantMemories),
            "actions_taken":  len(handlers),
        },
        Timestamp: time.Now(),
        Source:    "conway",
    })
}

// executeAction executes an action
func (c *Conway) executeAction(ctx context.Context, action Action) {
    log.Printf("Executing action: %s -> %s", action.Type, action.Target)
    
    switch action.Type {
    case "browser_navigate":
        c.browserNavigate(ctx, action.Target)
    case "claude_code":
        c.runClaudeCode(ctx, action.Parameters)
    case "webhook_send":
        c.sendWebhook(ctx, action.Target, action.Parameters)
    case "memory_update":
        c.updateMemory(ctx, action.Parameters)
    default:
        log.Printf("Unknown action type: %s", action.Type)
    }
}

// EmitEvent sends an event to Conway
func (c *Conway) EmitEvent(event Event) {
    c.eventQueue <- event
}

// emitEvent internally emits an event
func (c *Conway) emitEvent(ctx context.Context, event Event) {
    select {
    case c.eventQueue <- event:
    default:
        log.Println("Event queue full, dropping event")
    }
}

// dreamLoop is the dream consolidation loop
func (c *Conway) dreamLoop(ctx context.Context) {
    ticker := time.NewTicker(24 * time.Hour)
    defer ticker.Stop()
    
    conversationCount := 0
    maxConversations := 5
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            c.triggerDream(ctx)
        }
        
        conversationCount++
        if conversationCount >= maxConversations {
            c.triggerDream(ctx)
            conversationCount = 0
        }
    }
}

// triggerDream triggers memory consolidation
func (c *Conway) triggerDream(ctx context.Context) {
    log.Println("Triggering dream consolidation...")
    
    memories, err := c.consolidateMemories(ctx)
    if err != nil {
        log.Printf("Dream consolidation failed: %v", err)
        return
    }
    
    log.Printf("Dream completed: %d memories consolidated", len(memories))
}

// consolidateMemories performs memory consolidation
func (c *Conway) consolidateMemories(ctx context.Context) ([]MemoryDocument, error) {
    // 1. Get all sessions
    sessions := c.getRecentSessions(ctx)
    
    // 2. Get existing memory store
    allMemories, err := c.getAllMemories(ctx)
    if err != nil {
        return nil, err
    }
    
    // 3. Perform consolidation (merge duplicates, update stale, resolve conflicts)
    consolidated := c.mergeAndOptimize(allMemories, sessions)
    
    // 4. Save consolidated memories
    for _, mem := range consolidated {
        if err := c.memory.Store(ctx, mem); err != nil {
            return nil, err
        }
    }
    
    return consolidated, nil
}

func (c *Conway) getRecentSessions(ctx context.Context) []map[string]interface{} {
    return []map[string]interface{}{}
}

func (c *Conway) getAllMemories(ctx context.Context) ([]MemoryDocument, error) {
    return []MemoryDocument{}, nil
}

func (c *Conway) mergeAndOptimize(memories []MemoryDocument, sessions []map[string]interface{}) []MemoryDocument {
    return memories
}

4. API Usage Examples

4.1 Python SDK Usage Example

# Claude Memory Files API Usage Example
from anthropic import Anthropic
from datetime import datetime

# Initialize client
client = Anthropic(api_key="sk-ant-api03-...")

# ========== Memory Stores API ==========

# Create memory store
memory_store = client.beta.memory_stores.create(
    name="my-project-memory"
)
print(f"Created memory store: {memory_store.id}")

# Add content to memory store
client.beta.memory_stores.messages.create(
    memory_store_id=memory_store.id,
    role="user",
    content="We decided to use PostgreSQL as the main database"
)

client.beta.memory_stores.messages.create(
    memory_store_id=memory_store.id,
    role="assistant", 
    content="Got it, I've recorded this decision. PostgreSQL supports JSON type, suitable for storing semi-structured data."
)

# Search memories
results = client.beta.memory_stores.search(
    memory_store_id=memory_store.id,
    query="database selection",
    limit=5
)
print(f"Found {len(results.results)} relevant memories")

# ========== Dreams API ==========

# Create dream task
dream = client.beta.dreams.create(
    inputs=[
        {"type": "memory_store", "memory_store_id": memory_store.id},
        {"type": "sessions", "session_ids": ["session_123", "session_456"]}
    ],
    model="claude-opus-4-7",
    instructions="Focus on technology decision-related memories, ignore debugging notes"
)

print(f"Dream created: {dream.id}")

# Poll dream status
import asyncio

async def wait_for_dream(dream_id):
    while True:
        status = client.beta.dreams.get(dream_id)
        print(f"Dream status: {status.status}")
        
        if status.status == "completed":
            output_store_id = status.outputs[0].memory_store_id
            print(f"New memory store: {output_store_id}")
            return output_store_id
        elif status.status == "failed":
            print(f"Dream failed: {status.error}")
            return None
        
        await asyncio.sleep(30)

# Run dream
new_store_id = asyncio.run(wait_for_dream(dream.id))

# ========== Create session with new memory ==========

if new_store_id:
    session = client.beta.sessions.create(
        agent="my-agent-id",
        environment_id="env-123",
        resources=[
            {"type": "memory_store", "memory_store_id": new_store_id}
        ]
    )
    print(f"Session with memory: {session.id}")

4.2 Complete Memory Management System Implementation

# Complete Memory Management System
from anthropic import Anthropic
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
import json

class MemoryMode(Enum):
    CLASSIC = "classic"
    FILES = "files"

@dataclass
class UserProfile:
    """User profile"""
    user_id: str
    name: str
    preferences: Dict = field(default_factory=dict)
    projects: List[str] = field(default_factory=list)
    active_project: Optional[str] = None

class ClaudeMemoryManager:
    """
    Claude Memory Manager
    Supports dual-mode memory switching
    """
    
    def __init__(self, api_key: str, mode: MemoryMode = MemoryMode.FILES):
        self.client = Anthropic(api_key=api_key)
        self.mode = mode
        self.memory_stores: Dict[str, str] = {}  # project -> store_id
        self.user_profile: Optional[UserProfile] = None
    
    # ========== Memory Store Management ==========
    
    def create_memory_store(self, name: str) -> str:
        """Create memory store"""
        store = self.client.beta.memory_stores.create(name=name)
        self.memory_stores[name] = store.id
        return store.id
    
    def get_memory_store(self, name: str) -> Optional[str]:
        """Get memory store ID"""
        return self.memory_stores.get(name)
    
    def store_conversation(
        self, 
        store_name: str, 
        messages: List[Dict],
        metadata: Optional[Dict] = None
    ):
        """
        Store conversation to memory
        
        Args:
            store_name: Memory store name
            messages: Conversation message list
            metadata: Metadata (e.g., project info)
        """
        store_id = self.get_memory_store(store_name)
        if not store_id:
            store_id = self.create_memory_store(store_name)
        
        for msg in messages:
            self.client.beta.memory_stores.messages.create(
                memory_store_id=store_id,
                role=msg["role"],
                content=msg["content"],
                metadata=metadata
            )
    
    def retrieve_memories(
        self, 
        store_name: str, 
        query: str,
        limit: int = 5
    ) -> List[Dict]:
        """Retrieve relevant memories"""
        store_id = self.get_memory_store(store_name)
        if not store_id:
            return []
        
        results = self.client.beta.memory_stores.search(
            memory_store_id=store_id,
            query=query,
            limit=limit
        )
        
        return [
            {
                "content": r.content,
                "relevance": r.relevance,
                "topic": r.topic
            }
            for r in results.results
        ]
    
    # ========== Dream Consolidation ==========
    
    def trigger_dream(
        self, 
        store_name: str,
        session_ids: List[str],
        focus_areas: Optional[List[str]] = None
    ) -> str:
        """
        Trigger dream consolidation
        
        Args:
            store_name: Memory store name
            session_ids: List of session IDs to consolidate
            focus_areas: Areas to focus on
        
        Returns:
            New memory store ID
        """
        store_id = self.get_memory_store(store_name)
        if not store_id:
            raise ValueError(f"Memory store not found: {store_name}")
        
        instructions = ""
        if focus_areas:
            instructions = f"Focus on: {', '.join(focus_areas)}"
        
        dream = self.client.beta.dreams.create(
            inputs=[
                {"type": "memory_store", "memory_store_id": store_id},
                {"type": "sessions", "session_ids": session_ids}
            ],
            model="claude-opus-4-7",
            instructions=instructions
        )
        
        return dream.id
    
    def wait_for_dream(self, dream_id: str, timeout: int = 600) -> Optional[str]:
        """
        Wait for dream completion and return new memory store
        
        Args:
            dream_id: Dream ID
            timeout: Timeout in seconds
        
        Returns:
            New memory store ID, None on failure
        """
        import time
        start = time.time()
        
        while time.time() - start < timeout:
            status = self.client.beta.dreams.get(dream_id)
            
            if status.status == "completed":
                if status.outputs:
                    return status.outputs[0].memory_store_id
                return None
            elif status.status == "failed":
                print(f"Dream failed: {status.error}")
                return None
            
            time.sleep(30)
        
        return None
    
    # ========== Session Creation ==========
    
    def create_session_with_memory(
        self,
        agent_id: str,
        environment_id: str,
        store_name: str
    ) -> Dict:
        """
        Create session with memory
        """
        store_id = self.get_memory_store(store_name)
        if not store_id:
            store_id = self.create_memory_store(store_name)
        
        session = self.client.beta.sessions.create(
            agent=agent_id,
            environment_id=environment_id,
            resources=[
                {"type": "memory_store", "memory_store_id": store_id}
            ]
        )
        
        return {
            "session_id": session.id,
            "memory_store_id": store_id
        }
    
    # ========== Auto Memory Management ==========
    
    def should_trigger_dream(self, store_name: str) -> bool:
        """
        Determine if dream should be triggered
        Based on conversation count and time
        """
        conversation_count = len(self.client.beta.sessions.list())
        hours_since_last = 24
        
        return conversation_count >= 5 or hours_since_last >= 24
    
    def auto_memory_management(self, store_name: str):
        """
        Auto memory management
        Automatically trigger dream consolidation based on conditions
        """
        if self.should_trigger_dream(store_name):
            sessions = self.client.beta.sessions.list(limit=10)
            session_ids = [s.id for s in sessions]
            
            dream_id = self.trigger_dream(store_name, session_ids)
            return self.wait_for_dream(dream_id)
        
        return self.get_memory_store(store_name)


# ========== Usage Example ==========

def main():
    # Initialize manager
    manager = ClaudeMemoryManager(
        api_key="sk-ant-api03-...",
        mode=MemoryMode.FILES
    )
    
    # Create project memory store
    project_store = "my-ai-project"
    manager.create_memory_store(project_store)
    
    # Store conversation
    messages = [
        {"role": "user", "content": "I want to implement an RAG system"},
        {"role": "assistant", "content": "A RAG system needs three core components: vector database, retriever, and generator"},
        {"role": "user", "content": "We use ChromaDB as the vector database"},
        {"role": "assistant", "content": "Good, ChromaDB is easy to deploy and supports multiple embedding models"},
    ]
    
    manager.store_conversation(
        project_store,
        messages,
        metadata={"project": "my-ai-project", "topic": "RAG"}
    )
    
    # Retrieve memories
    memories = manager.retrieve_memories(
        project_store,
        "vector database selection"
    )
    print(f"Found {len(memories)} relevant memories")
    
    # Trigger dream consolidation
    dream_id = manager.trigger_dream(
        project_store,
        session_ids=["session-1", "session-2"],
        focus_areas=["Technology Selection", "Database"]
    )
    
    # Wait for dream completion
    new_store_id = manager.wait_for_dream(dream_id)
    if new_store_id:
        print(f"Memory consolidated: {new_store_id}")

if __name__ == "__main__":
    main()

5. Application Scenarios

5.1 Enterprise Applications

Scenario 1: Intelligent Customer Service System

Requirements:
- Remember user preferences across sessions
- Understand user's historical questions
- Provide coherent conversation experience

Implementation:
- Maintain independent memory store for each user
- Regular dream consolidation to optimize memories
- Retrieve historical preferences for faster response

Scenario 2: Code Development Assistant

# Memory implementation for code assistant
class CodeAssistantMemory:
    """Professional memory for code development assistant"""
    
    def __init__(self, manager: ClaudeMemoryManager):
        self.manager = manager
        self.project_store = "code-assistant"
        
        # Initialize project memory
        manager.create_memory_store(self.project_store)
    
    def remember_code_style(self, project: str, style_rules: dict):
        """Remember code style preferences"""
        self.manager.store_conversation(
            self.project_store,
            messages=[{
                "role": "system",
                "content": f"Code style rules: {json.dumps(style_rules)}"
            }],
            metadata={"project": project, "type": "code_style"}
        )
    
    def remember_project_context(self, project: str, context: dict):
        """Remember project context"""
        self.manager.store_conversation(
            self.project_store,
            messages=[{
                "role": "system", 
                "content": f"Project context: {json.dumps(context)}"
            }],
            metadata={"project": project, "type": "context"}
        )
    
    def get_code_guidance(self, query: str) -> List[Dict]:
        """Get code guidance memories"""
        memories = self.manager.retrieve_memories(
            self.project_store,
            query=f"code style {query}"
        )
        return [m for m in memories if m.get("topic") == "code_style"]

5.2 Personal Assistant Applications

Scenario 3: Personal Knowledge Management

Architecture:
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ Reading    │───▶│  Memory     │───▶│  Retrieval  │
│ Input      │    │  Storage   │    │  Output     │
│(Articles)  │    │  (Topics)  │    │(Q&A)        │
└─────────────┘    └─────────────┘    └─────────────┘
                       ▲
                       │
                ┌─────────────┐
                │ Dreams      │
                │(Periodic)   │
                └─────────────┘

6. Comparison with Traditional RAG

DimensionTraditional RAGClaude Memory
Data SourceExternal documentsConversation history + project files
OrganizationRaw chunksAI-reconstructed structured documents
Update MethodManual re-indexingAuto consolidation (Dreams)
PersonalizationWeakStrong
Applicable ScenariosEnterprise knowledge basePersonal assistant/work partner
Context UtilizationPassive retrievalActive learning

7. Conclusion and Outlook

Claude’s dual-mode memory system represents a major breakthrough in AI Agent memory architecture. By externalizing and structuring memory, combined with automated consolidation mechanisms, Claude finally has the capability of a “permanent brain”.

Core Innovations:

  1. Memory Files: Breaks capacity limitations, enables unlimited memory
  2. Dreams: Simulates human sleep, achieves automatic memory organization
  3. Conway: 7×24 always-on Agent runtime

Implications for Developers:

  • Don’t rely solely on context for memory—capacity is limited, costs are high
  • Design good memory storage structure—topic-based organization is better than chronological logs
  • Consider “memory consolidation” mechanisms—long-term use creates fragmentation, needs regular cleanup
  • Balance personalization and generalization—let AI remember users, but don’t let users limit AI

Future Outlook:

  • Cross-device memory synchronization
  • Multi-Agent shared memory
  • Memory visualization editing
  • Memory security and privacy protection

Memory is the essential path for AI Agents to become “true assistants”. Claude’s dual-mode memory system points the way for the entire industry.


References

  1. Anthropic Official Documentation - Memory Files & Dreams
  2. TestingCatalog - Claude Dual-Mode Memory System Report
  3. Claude Platform API Documentation
  4. Managed Agents Beta Documentation