AI Agent智能体技术:从问答到执行的范式革命

标签:AI Agent、大模型、智能体、LangChain、ReAct、Function Calling


📖 前言

2026年5月20日,谷歌I/O 2026大会在美国加州山景城开幕。谷歌CEO桑达尔·皮查伊(Sundar Pichai)在大会上宣布:“我们已正式进入’智能体Gemini时代’。”就在同一天,百度Create 2026大会上,百度创始人李彦宏提出AI时代的“度量衡”——DAA(Daily Active Agents,日活智能体数),标志着AI产业从“参数竞赛”正式转向“价值验证”阶段。

从Google的Gemini Spark到百度的DuMate,从Anthropic的Claude Code到OpenAI的GPT-5.5,**AI Agent(智能体)**已从概念走向成熟商用,成为2026年最炙手可热的技术方向。本文将深入剖析AI Agent的技术架构、核心算法、工程实现,并提供完整的代码示例,帮助开发者快速掌握这一革命性技术。


一、AI Agent的本质:感知-规划-行动-反思闭环

1.1 什么是AI Agent?

AI Agent(智能体)是一种能够自主理解目标规划行动路径调用外部工具执行复杂任务的AI系统。与传统的问答式AI不同,Agent具备:

  • 自主决策能力:根据环境反馈动态调整执行策略
  • 工具调用能力:通过Function Calling/API与外部系统交互
  • 长期记忆能力:跨会话保持上下文和学习成果
  • 自我反思能力:评估执行效果并持续优化

用一句话概括:AI Agent = 大脑(LLM)+ 记忆 + 工具 + 规划引擎

1.2 从“Chat”到“Act”的范式转移

传统ChatBot的交互模式:

用户输入 → LLM生成回答 → 结束

AI Agent的交互模式:

用户输入 → 理解目标 → 分解任务 → 调用工具 → 观察结果 → 自我反思 → 完成任务

这种ReAct(Reasoning + Acting)循环让AI从被动回答者转变为主动执行者。谷歌将这种转变称为“2026年的痛点从’幻觉’转向’懒惰’——用户不想看一大段总结,他们想要结果。”

1.3 Agent的核心价值

维度传统AIAI Agent
交互方式问答任务执行
工具使用Function Calling/API
上下文单次会话长期记忆
错误处理返回错误自我反思重试
价值交付信息可执行结果

二、AI Agent技术架构深度解析

2.1 整体架构概览

一个完整的AI Agent系统包含四大核心层

┌─────────────────────────────────────────────────────────────┐
│                      交互层(Interaction Layer)             │
│         对话UI │ API接口 │ 多模态输入 │ Webhook │ 定时调度     │
└─────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────┐
│                    Agent核心层(Agent Core Layer)           │
│  ┌─────────┐   ┌─────────┐   ┌─────────┐   ┌─────────────┐ │
│  │ 规划模块 │ → │ 推理模块 │ → │ LLM大脑 │ ← │ 工具选择模块 │ │
│  │Planning │   │Reasoning│   │   LLM   │   │Tool Select │ │
│  └─────────┘   └─────────┘   └─────────┘   └─────────────┘ │
│  ┌─────────┐   ┌─────────┐   ┌─────────────────────────┐   │
│  │记忆模块 │   │执行模块 │   │      自我反思模块        │   │
│  │ Memory  │   │Execute  │   │   Self-Reflection       │   │
│  └─────────┘   └─────────┘   └─────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────┐
│                       工具层(Tools Layer)                  │
│    搜索 │ 计算器 │ 代码执行 │ API调用 │ 文件操作 │ 数据库 │ 邮件  │
│                      MCP协议(工具标准)                     │
└─────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────┐
│                   数据层(Data & Knowledge Layer)            │
│   向量数据库 │ 知识图谱 │ 文档库 │ Redis缓存 │ RAG引擎       │
└─────────────────────────────────────────────────────────────┘

2.2 感知模块(Perception Module)

感知模块负责处理多模态输入,包括文本、语音、图像等,并将其转换为标准化的观测数据。

# Python实现:多模态输入感知模块
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from enum import Enum
import json

class InputType(Enum):
    TEXT = "text"
    VOICE = "voice"
    IMAGE = "image"
    FILE = "file"
    API_EVENT = "api_event"
    SCHEDULED = "scheduled"

@dataclass
class Observation:
    """标准化观测数据"""
    input_type: InputType
    content: str
    raw_data: Any
    metadata: Dict[str, Any]
    timestamp: float
    session_id: str
    user_id: Optional[str] = None

class PerceptionModule:
    """
    感知模块:多模态信息输入与处理
    核心功能:
    1. 输入标准化
    2. 去噪与整理
    3. 高质量预处理
    """
    
    def __init__(self):
        self.input_handlers = {
            InputType.TEXT: self._handle_text,
            InputType.VOICE: self._handle_voice,
            InputType.IMAGE: self._handle_image,
            InputType.FILE: self._handle_file,
            InputType.API_EVENT: self._handle_api_event,
            InputType.SCHEDULED: self._handle_scheduled,
        }
    
    def perceive(self, raw_input: Any, input_type: InputType, 
                 session_id: str, user_id: Optional[str] = None) -> Observation:
        """
        主入口:处理各类输入并返回标准化观测
        """
        # 1. 选择对应的处理器
        handler = self.input_handlers.get(input_type)
        if not handler:
            raise ValueError(f"Unsupported input type: {input_type}")
        
        # 2. 调用处理器
        content, metadata = handler(raw_input)
        
        # 3. 构建标准化观测
        observation = Observation(
            input_type=input_type,
            content=content,
            raw_data=raw_input,
            metadata=metadata,
            timestamp=time.time(),
            session_id=session_id,
            user_id=user_id
        )
        
        return observation
    
    def _handle_text(self, raw_input: str) -> tuple[str, Dict]:
        """处理文本输入"""
        # 文本清洗
        cleaned = raw_input.strip()
        
        # 意图识别元数据
        metadata = {
            "length": len(cleaned),
            "has_url": "http" in cleaned,
            "has_code": "```" in cleaned,
            "language": detect_language(cleaned)
        }
        return cleaned, metadata
    
    def _handle_voice(self, raw_input: bytes) -> tuple[str, Dict]:
        """处理语音输入 - 需要ASR"""
        # 实际项目中调用语音识别服务
        # asr_result = asr_service.recognize(raw_input)
        asr_result = "语音转文字的识别结果"  # 模拟
        
        metadata = {
            "duration": len(raw_input),  # 模拟
            "sample_rate": 16000,
            "confidence": 0.95
        }
        return asr_result, metadata
    
    def _handle_image(self, raw_input: bytes) -> tuple[str, Dict]:
        """处理图像输入"""
        # OCR识别
        # text = ocr_service.extract(raw_input)
        extracted_text = "图像中的文字内容"  # 模拟
        
        metadata = {
            "image_size": len(raw_input),
            "format": "png/jpeg",
            "ocr_confidence": 0.92
        }
        return extracted_text, metadata
    
    def _handle_file(self, raw_input: Dict) -> tuple[str, Dict]:
        """处理文件上传"""
        # 解析文件内容
        file_path = raw_input.get("path")
        file_type = raw_input.get("type")
        
        content = self._parse_file(file_path, file_type)
        
        metadata = {
            "file_name": raw_input.get("name"),
            "file_size": raw_input.get("size"),
            "file_type": file_type
        }
        return content, metadata
    
    def _handle_api_event(self, raw_input: Dict) -> tuple[str, Dict]:
        """处理API事件触发"""
        event_type = raw_input.get("event_type")
        event_data = raw_input.get("data", {})
        
        content = f"Event: {event_type}, Data: {json.dumps(event_data)}"
        
        metadata = {
            "event_source": raw_input.get("source"),
            "event_id": raw_input.get("id")
        }
        return content, metadata
    
    def _handle_scheduled(self, raw_input: Dict) -> tuple[str, Dict]:
        """处理定时任务触发"""
        task_name = raw_input.get("task_name")
        schedule_time = raw_input.get("schedule_time")
        
        content = f"Scheduled task: {task_name}"
        
        metadata = {
            "schedule_type": raw_input.get("type"),
            "schedule_time": schedule_time
        }
        return content, metadata

2.3 记忆模块(Memory Module)

记忆是Agent从“一次性工具”进化为“持续学习助手”的关键。系统采用分层记忆架构:

# Python实现:分层记忆系统
from typing import List, Optional, Dict, Any
import time
import tiktoken
from dataclasses import dataclass, field

@dataclass
class MemoryItem:
    """记忆单元"""
    content: str
    memory_type: str  # 'short_term' | 'long_term' | 'experience'
    timestamp: float
    importance: float = 0.5  # 0-1,越高越重要
    access_count: int = 0
    embedding: Optional[List[float]] = None

class ShortTermMemory:
    """
    短期记忆:工作记忆,维护当前对话上下文
    使用滑动窗口机制,避免上下文溢出
    """
    
    def __init__(self, max_tokens: int = 4000):
        self.buffer: List[MemoryItem] = []
        self.max_tokens = max_tokens
        self.tokenizer = tiktoken.get_encoding("cl100k_base")
    
    def add(self, content: str, memory_type: str = "short_term"):
        """添加记忆"""
        item = MemoryItem(
            content=content,
            memory_type=memory_type,
            timestamp=time.time()
        )
        self.buffer.append(item)
        self._trim_by_tokens()
    
    def get_context(self, max_items: Optional[int] = None) -> str:
        """获取上下文"""
        items = self.buffer[-max_items:] if max_items else self.buffer
        return "\n".join([item.content for item in items])
    
    def get_all(self) -> List[MemoryItem]:
        """获取所有记忆"""
        return self.buffer.copy()
    
    def _trim_by_tokens(self):
        """按Token数量裁剪"""
        total = sum(
            len(self.tokenizer.encode(item.content)) 
            for item in self.buffer
        )
        
        while total > self.max_tokens and len(self.buffer) > 2:
            removed = self.buffer.pop(0)
            total -= len(self.tokenizer.encode(removed.content))
    
    def clear(self):
        """清空短期记忆"""
        self.buffer.clear()


class LongTermMemory:
    """
    长期记忆:持久化存储关键经验和知识
    基于向量数据库实现语义检索
    """
    
    def __init__(self, vector_store, embeddings):
        self.store = vector_store  # Pinecone/Weaviate客户端
        self.embeddings = embeddings  # OpenAI embeddings等
    
    async def store_experience(
        self, 
        task: str, 
        outcome: str, 
        lesson: str,
        metadata: Optional[Dict] = None
    ):
        """
        存储经验到长期记忆
        """
        doc_content = f"""
        任务: {task}
        结果: {outcome}
        经验教训: {lesson}
        """
        
        doc = {
            "pageContent": doc_content.strip(),
            "metadata": {
                "type": "experience",
                "timestamp": time.time(),
                "task_type": metadata.get("task_type") if metadata else None,
                **(metadata or {})
            }
        }
        
        await self.store.add_documents([doc])
        
        # 同步到经验库
        await self._sync_to_experience_store(task, outcome, lesson)
    
    async def recall(self, query: str, k: int = 5) -> List[str]:
        """
        检索相关记忆
        """
        results = await self.store.similarity_search(query, k=k)
        return [r.pageContent for r in results]
    
    async def _sync_to_experience_store(self, task: str, outcome: str, lesson: str):
        """同步到结构化经验库"""
        # 提取关键模式
        pass
    
    async def consolidate(self, session_summary: str):
        """
        记忆巩固:将重要短期记忆迁移到长期记忆
        """
        # 识别重要信息
        important_patterns = await self._extract_important_patterns(session_summary)
        
        for pattern in important_patterns:
            await self.store_experience(
                task=pattern["task"],
                outcome=pattern["outcome"],
                lesson=pattern["lesson"]
            )


class HierarchicalMemory:
    """
    分层记忆管理器
    协调短期记忆和长期记忆的交互
    """
    
    def __init__(
        self, 
        vector_store, 
        embeddings,
        short_term_max_tokens: int = 4000
    ):
        self.short_term = ShortTermMemory(max_tokens=short_term_max_tokens)
        self.long_term = LongTermMemory(vector_store, embeddings)
        self.experience_store = {}  # 简化版经验库
    
    async def remember(self, query: str, k: int = 3) -> List[str]:
        """
        统一检索接口:先查短期记忆,再查长期记忆
        """
        # 1. 检查短期记忆
        short_results = self._search_short_term(query)
        
        # 2. 查长期记忆
        long_results = await self.long_term.recall(query, k=k)
        
        # 3. 合并去重
        combined = short_results + long_results
        return self._deduplicate(combined)[:k]
    
    def _search_short_term(self, query: str) -> List[str]:
        """短期记忆检索(简单关键词匹配)"""
        results = []
        query_keywords = set(query.lower().split())
        
        for item in self.short_term.get_all():
            item_keywords = set(item.content.lower().split())
            if query_keywords & item_keywords:  # 有交集
                results.append(item.content)
        
        return results
    
    def update(self, content: str):
        """更新记忆"""
        self.short_term.add(content)
    
    async def consolidate_session(self, summary: str):
        """会话结束时巩固记忆"""
        await self.long_term.consolidate(summary)

2.4 规划模块(Planning Module)

规划模块负责将复杂目标分解为可执行的子任务序列:

# Python实现:基于CoT/ToT的规划模块
from typing import List, Optional, Dict, Any, Callable
from dataclasses import dataclass
from enum import Enum
import json

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    BLOCKED = "blocked"

@dataclass
class SubTask:
    """子任务定义"""
    id: str
    description: str
    status: TaskStatus = TaskStatus.PENDING
    dependencies: List[str] = None  # 依赖的子任务ID
    tool_required: Optional[str] = None
    result: Optional[Any] = None
    error: Optional[str] = None
    
    def __post_init__(self):
        if self.dependencies is None:
            self.dependencies = []

class PlanningModule:
    """
    规划模块:任务分解与执行计划生成
    支持多种规划策略:
    1. Chain of Thought (CoT) - 链式思维
    2. Tree of Thoughts (ToT) - 思维树
    3. Subgoal Decomposition - 子目标分解
    """
    
    def __init__(self, llm):
        self.llm = llm
    
    async def create_plan(
        self, 
        goal: str, 
        strategy: str = "cot",
        context: Optional[Dict] = None
    ) -> List[SubTask]:
        """
        创建执行计划
        """
        if strategy == "cot":
            return await self._plan_with_cot(goal, context)
        elif strategy == "tot":
            return await self._plan_with_tot(goal, context)
        elif strategy == "simple":
            return await self._plan_simple(goal)
        else:
            raise ValueError(f"Unknown strategy: {strategy}")
    
    async def _plan_with_cot(
        self, 
        goal: str, 
        context: Optional[Dict]
    ) -> List[SubTask]:
        """
        Chain of Thought 规划
        让LLM展示完整推理过程
        """
        prompt = f"""
        目标:{goal}
        
        {f"上下文信息:{json.dumps(context, ensure_ascii=False)}" if context else ""}
        
        请按以下步骤思考并输出计划:
        1. 分析目标的核心需求
        2. 识别完成目标需要的步骤
        3. 确定步骤之间的依赖关系
        4. 输出结构化的任务列表
        
        输出格式(JSON):
        {{
            "reasoning": "你的推理过程",
            "tasks": [
                {{"id": "1", "description": "任务1", "dependencies": []}},
                {{"id": "2", "description": "任务2", "dependencies": ["1"]}}
            ]
        }}
        """
        
        response = await self.llm.agenerate([prompt])
        result = json.loads(response)
        
        tasks = [
            SubTask(
                id=t["id"],
                description=t["description"],
                dependencies=t.get("dependencies", [])
            )
            for t in result["tasks"]
        ]
        
        return tasks
    
    async def _plan_with_tot(
        self, 
        goal: str, 
        context: Optional[Dict]
    ) -> List[SubTask]:
        """
        Tree of Thoughts 规划
        探索多条执行路径,选择最优方案
        """
        # 第一阶段:生成多个可能的计划
        prompt = f"""
        目标:{goal}
        
        请生成3种不同的执行方案,每种方案从不同角度思考:
        方案A:从XX角度...
        方案B:从YY角度...
        方案C:从ZZ角度...
        
        对于每种方案,列出关键步骤。
        """
        
        response = await self.llm.agenerate([prompt])
        
        # 第二阶段:评估选择最优方案
        evaluation_prompt = f"""
        针对目标"{goal}",评估以下方案的优劣:
        
        {response}
        
        选择最佳方案,并转换为任务列表。
        输出格式(JSON):
        {{
            "best_plan": "方案X",
            "reasoning": "选择理由",
            "tasks": [...]
        }}
        """
        
        final_response = await self.llm.agenerate([evaluation_prompt])
        result = json.loads(final_response)
        
        return [
            SubTask(
                id=t["id"],
                description=t["description"],
                dependencies=t.get("dependencies", [])
            )
            for t in result["tasks"]
        ]
    
    async def _plan_simple(self, goal: str) -> List[SubTask]:
        """
        简单分解:逗号分隔的步骤
        """
        # 简单的关键词提取
        task_descriptions = [d.strip() for d in goal.split(",") if d.strip()]
        
        return [
            SubTask(id=str(i+1), description=desc)
            for i, desc in enumerate(task_descriptions)
        ]
    
    def get_ready_tasks(
        self, 
        tasks: List[SubTask]
    ) -> List[SubTask]:
        """
        获取所有依赖已满足、可以执行的任务
        """
        ready = []
        completed_ids = {
            t.id for t in tasks 
            if t.status == TaskStatus.COMPLETED
        }
        
        for task in tasks:
            if task.status != TaskStatus.PENDING:
                continue
            
            # 检查依赖
            deps_satisfied = all(
                dep_id in completed_ids 
                for dep_id in task.dependencies
            )
            
            if deps_satisfied:
                ready.append(task)
        
        return ready

2.5 推理引擎(Reasoning Engine)

推理引擎是Agent的"大脑",负责将规划、记忆、工具选择整合起来:

# Python实现:基于ReAct的推理引擎
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import json

class ReasoningStep(Enum):
    THINK = "think"      # 思考
    ACT = "act"          # 行动
    OBSERVE = "observe"  # 观察
    REFLECT = "reflect"  # 反思
    FINAL = "final"      # 最终答案

@dataclass
class ReasoningTrace:
    """推理轨迹"""
    step: ReasoningStep
    thought: str
    action: Optional[Dict] = None
    observation: Optional[str] = None
    reflection: Optional[str] = None

class ReasoningEngine:
    """
    推理引擎:实现ReAct循环
    Think → Act → Observe → Reflect → ...
    """
    
    def __init__(
        self, 
        llm,
        planning_module: PlanningModule,
        memory_module: HierarchicalMemory,
        tool_registry
    ):
        self.llm = llm
        self.planning = planning_module
        self.memory = memory_module
        self.tools = tool_registry
        self.max_iterations = 10
    
    async def think(
        self, 
        query: str, 
        plan: Optional[List[SubTask]] = None,
        context: Optional[Dict] = None
    ) -> str:
        """
        核心推理循环
        """
        trace: List[ReasoningTrace] = []
        
        # 初始化计划
        if not plan:
            plan = await self.planning.create_plan(query, strategy="cot")
        
        # 获取当前状态
        current_task_idx = self._get_current_task_index(plan)
        
        # ReAct循环
        for iteration in range(self.max_iterations):
            # 1. Think - 分析当前状态
            thought = await self._think_step(
                query, plan, current_task_idx, trace, context
            )
            
            # 2. 检查是否完成
            if thought.get("is_complete"):
                final_trace = ReasoningTrace(
                    step=ReasoningStep.FINAL,
                    thought=thought.get("final_answer", "")
                )
                trace.append(final_trace)
                break
            
            # 3. Act - 决定行动
            action = await self._act_step(
                thought, plan, current_task_idx
            )
            
            # 4. Execute - 执行行动
            result = await self._execute_action(action)
            
            # 5. Observe - 观察结果
            observation = await self._observe_step(result, action)
            
            # 6. Reflect - 反思并更新状态
            should_continue, updated_plan = await self._reflect_step(
                trace, observation, plan, current_task_idx
            )
            
            plan = updated_plan
            current_task_idx = self._get_current_task_index(plan)
            
            if not should_continue:
                break
        
        # 生成最终答案
        return self._generate_final_answer(query, trace)
    
    async def _think_step(
        self,
        query: str,
        plan: List[SubTask],
        current_idx: int,
        trace: List[ReasoningTrace],
        context: Optional[Dict]
    ) -> Dict[str, Any]:
        """思考步骤"""
        
        # 检索相关记忆
        relevant_memories = await self.memory.remember(query, k=3)
        
        # 构建思考提示
        prompt = f"""
        当前问题:{query}
        
        执行计划:
        {self._format_plan(plan, current_idx)}
        
        历史推理:
        {self._format_trace(trace)}
        
        相关记忆:
        {chr(10).join(relevant_memories)}
        
        {f"上下文:{json.dumps(context, ensure_ascii=False)}" if context else ""}
        
        请分析:
        1. 当前应该关注哪个任务?
        2. 需要调用什么工具?
        3. 是否可以给出最终答案?
        """
        
        response = await self.llm.agenerate([prompt])
        
        # 解析LLM响应
        return self._parse_thought_response(response)
    
    async def _act_step(
        self,
        thought: Dict[str, Any],
        plan: List[SubTask],
        current_idx: int
    ) -> Dict[str, Any]:
        """行动步骤"""
        
        action_type = thought.get("action_type")
        
        if action_type == "use_tool":
            # 确定使用哪个工具
            tool_name = thought.get("tool_name")
            tool_args = thought.get("tool_args", {})
            
            return {
                "type": "tool_call",
                "tool": tool_name,
                "args": tool_args
            }
        
        elif action_type == "update_plan":
            # 更新计划
            return {
                "type": "update_plan",
                "updates": thought.get("plan_updates", [])
            }
        
        elif action_type == "answer":
            # 直接回答
            return {
                "type": "final_answer",
                "answer": thought.get("answer")
            }
        
        else:
            raise ValueError(f"Unknown action type: {action_type}")
    
    async def _execute_action(self, action: Dict) -> Any:
        """执行行动"""
        
        if action["type"] == "tool_call":
            tool_name = action["tool"]
            tool_args = action["args"]
            
            # 从工具注册表获取工具
            tool = self.tools.get(tool_name)
            if not tool:
                raise ValueError(f"Tool not found: {tool_name}")
            
            # 执行工具
            result = await tool.execute(**tool_args)
            return result
        
        elif action["type"] == "update_plan":
            # 计划更新不需要执行
            return None
        
        elif action["type"] == "final_answer":
            return action["answer"]
    
    async def _observe_step(
        self, 
        result: Any, 
        action: Dict
    ) -> str:
        """观察步骤"""
        
        if result is None:
            return "计划已更新,等待下一步执行"
        
        # 将结果格式化为观察
        return str(result)
    
    async def _reflect_step(
        self,
        trace: List[ReasoningTrace],
        observation: str,
        plan: List[SubTask],
        current_idx: int
    ) -> tuple[bool, List[SubTask]]:
        """反思步骤"""
        
        # 更新计划状态
        if current_idx < len(plan):
            plan[current_idx].status = TaskStatus.COMPLETED
            plan[current_idx].result = observation
        
        # 检查是否还有待执行任务
        ready_tasks = self.planning.get_ready_tasks(plan)
        
        should_continue = len(ready_tasks) > 0
        
        return should_continue, plan
    
    def _parse_thought_response(self, response: str) -> Dict[str, Any]:
        """解析LLM的思考响应"""
        # 简化版:实际应使用结构化输出
        # 这里模拟解析JSON
        try:
            return json.loads(response)
        except:
            return {
                "action_type": "use_tool",
                "tool_name": "unknown",
                "is_complete": False
            }
    
    def _format_plan(self, plan: List[SubTask], current_idx: int) -> str:
        """格式化计划为文本"""
        lines = []
        for i, task in enumerate(plan):
            status = "✅" if task.status == TaskStatus.COMPLETED else "📋"
            prefix = "→" if i == current_idx else " "
            lines.append(f"{prefix} {status} [{task.id}] {task.description}")
        return "\n".join(lines)
    
    def _format_trace(self, trace: List[ReasoningTrace]) -> str:
        """格式化推理轨迹"""
        lines = []
        for t in trace:
            if t.step == ReasoningStep.THINK:
                lines.append(f"思考:{t.thought}")
            elif t.step == ReasoningStep.OBSERVE:
                lines.append(f"观察:{t.observation}")
        return "\n".join(lines) if lines else "(暂无)"
    
    def _get_current_task_index(self, plan: List[SubTask]) -> int:
        """获取当前应该执行的任务索引"""
        for i, task in enumerate(plan):
            if task.status == TaskStatus.PENDING:
                # 检查依赖
                deps_done = all(
                    plan[int(d)-1].status == TaskStatus.COMPLETED
                    for d in task.dependencies
                )
                if deps_done:
                    return i
        return len(plan)  # 所有任务完成
    
    def _generate_final_answer(self, query: str, trace: List[ReasoningTrace]) -> str:
        """生成最终答案"""
        for t in reversed(trace):
            if t.step == ReasoningStep.FINAL:
                return t.thought
        
        # 没有明确的最终答案,从观察中提取
        observations = [t.observation for t in trace if t.observation]
        return observations[-1] if observations else "任务完成"

三、ReAct模式:推理与行动的协同

3.1 ReAct模式的原理

ReAct(Reasoning + Acting)是由清华大学和普林斯顿大学提出的推理框架,核心思想是让LLM在推理过程中交替进行“思考”和“行动”,通过观察行动结果来指导后续推理。

ReAct循环:
┌─────────────┐
│  1. Thought │  分析问题,理解当前状态
└──────┬──────┘
       ↓
┌─────────────┐
│  2. Action  │  选择并执行工具
└──────┬──────┘
       ↓
┌─────────────┐
│ 3. Observe  │  获取执行结果
└──────┬──────┘
       ↓
┌─────────────┐
│ 4. Reflect  │  评估结果,调整策略
└──────┬──────┘
       ↓
       └──→ (循环直到任务完成)

3.2 ReAct代码实现

# Python实现:ReAct Agent完整示例
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import json
import asyncio

class ActionType(Enum):
    TOOL_CALL = "tool_call"
    FINAL_ANSWER = "final_answer"
    ASK_CLARIFICATION = "ask_clarification"

@dataclass
class ToolCall:
    """工具调用"""
    name: str
    arguments: Dict[str, Any]
    result: Optional[Any] = None
    error: Optional[str] = None

@dataclass
class ReActStep:
    """ReAct循环的单步"""
    step_num: int
    thought: str
    action: Optional[ToolCall] = None
    observation: Optional[str] = None
    is_final: bool = False

class ReactAgent:
    """
    ReAct Agent 实现
    
    核心流程:
    1. 思考(Thought):分析问题,决定下一步行动
    2. 行动(Action):调用工具或生成答案
    3. 观察(Observation):获取行动结果
    4. 判断:是否完成?继续循环还是返回答案
    """
    
    def __init__(
        self,
        llm,
        tools: Dict[str, Callable],
        system_prompt: Optional[str] = None
    ):
        self.llm = llm
        self.tools = tools
        self.system_prompt = system_prompt or self._default_system_prompt()
        self.max_steps = 15
    
    def _default_system_prompt(self) -> str:
        return """你是一个智能助手,可以通过调用工具来完成复杂任务。

可用工具:
{available_tools}

输出格式(JSON):
{{
    "thought": "你的思考过程",
    "action": {{
        "type": "tool_call" | "final_answer",
        "tool": "工具名(仅tool_call时)",
        "arguments": {{...}}(仅tool_call时),
        "answer": "最终答案(仅final_answer时)"
    }}
}}

重要规则:
1. 如果需要外部信息,必须使用工具
2. 工具调用结果会作为observation返回
3. 连续调用同一工具不超过3次
4. 收集足够信息后给出最终答案
"""
    
    async def run(self, query: str, context: Optional[Dict] = None) -> str:
        """
        运行ReAct Agent
        """
        steps: List[ReActStep] = []
        history = []
        current_query = query
        
        for step_num in range(self.max_steps):
            # 1. 构建提示
            prompt = self._build_prompt(
                current_query, history, steps
            )
            
            # 2. 调用LLM
            response_text = await self.llm.agenerate([prompt])
            response = json.loads(response_text)
            
            thought = response.get("thought", "")
            action = response.get("action", {})
            
            # 3. 执行行动
            step = ReActStep(
                step_num=step_num + 1,
                thought=thought
            )
            
            if action["type"] == "tool_call":
                tool_name = action.get("tool")
                tool_args = action.get("arguments", {})
                
                # 检查工具是否存在
                if tool_name not in self.tools:
                    observation = f"错误:工具 '{tool_name}' 不存在"
                    step.observation = observation
                    steps.append(step)
                    history.append({
                        "thought": thought,
                        "action": f"尝试调用工具 {tool_name}",
                        "observation": observation
                    })
                    continue
                
                # 执行工具
                try:
                    tool_func = self.tools[tool_name]
                    result = await self._execute_tool(tool_func, tool_args)
                    observation = str(result)
                    
                    step.action = ToolCall(
                        name=tool_name,
                        arguments=tool_args,
                        result=result
                    )
                    step.observation = observation
                    
                    history.append({
                        "thought": thought,
                        "action": f"调用 {tool_name}({tool_args})",
                        "observation": observation
                    })
                    
                    # 动态决定下一步查询
                    current_query = f"基于之前的工具调用结果:{observation}\n\n请继续分析是否需要更多信息,或者给出最终答案。"
                    
                except Exception as e:
                    observation = f"工具执行错误:{str(e)}"
                    step.observation = observation
                    history.append({
                        "thought": thought,
                        "action": f"调用 {tool_name} 失败",
                        "observation": observation
                    })
            
            elif action["type"] == "final_answer":
                step.is_final = True
                step.observation = action.get("answer", "")
                steps.append(step)
                return action.get("answer", "")
            
            steps.append(step)
        
        # 超过最大步数,返回最后结果
        return self._summarize_steps(steps)
    
    def _build_prompt(
        self, 
        query: str, 
        history: List[Dict],
        steps: List[ReActStep]
    ) -> str:
        """构建LLM提示"""
        
        # 格式化可用工具
        tool_descriptions = []
        for name, func in self.tools.items():
            desc = func.__doc__ or "无描述"
            tool_descriptions.append(f"- {name}: {desc.strip().split(chr(10))[0]}")
        
        tools_str = "\n".join(tool_descriptions)
        
        # 格式化历史
        history_str = ""
        for h in history[-5:]:  # 只保留最近5条
            history_str += f"\n思考:{h['thought']}\n"
            history_str += f"行动:{h['action']}\n"
            history_str += f"观察:{h['observation']}\n"
        
        prompt = f"""{self.system_prompt.format(available_tools=tools_str)}

当前问题:{query}
{'-' * 50}
历史执行记录:
{history_str or '(暂无)'}
{'-' * 50}

请分析当前状态,决定下一步行动:
"""
        return prompt
    
    async def _execute_tool(
        self, 
        tool_func: Callable, 
        args: Dict[str, Any]
    ) -> Any:
        """安全执行工具"""
        # 支持同步和异步工具
        if asyncio.iscoroutinefunction(tool_func):
            return await tool_func(**args)
        else:
            return tool_func(**args)
    
    def _summarize_steps(self, steps: List[ReActStep]) -> str:
        """总结步骤,返回最后观察"""
        for step in reversed(steps):
            if step.observation:
                return step.observation
        return "任务执行超时"


# 使用示例
async def demo():
    """ReAct Agent演示"""
    
    # 定义工具
    async def search(query: str) -> str:
        """网络搜索工具"""
        return f"搜索结果:关于'{query}'的信息...\n来源:搜索引擎"
    
    async def calculator(expression: str) -> str:
        """计算器工具"""
        try:
            result = eval(expression)
            return f"计算结果:{result}"
        except:
            return f"计算错误:无效表达式 '{expression}'"
    
    async def get_weather(location: str) -> str:
        """天气查询工具"""
        return f"{location}的天气:晴天,25°C,适宜出行"
    
    async def query_database(sql: str) -> str:
        """数据库查询工具"""
        return f"查询结果:执行SQL '{sql}' 返回的数据..."
    
    # 创建工具注册表
    tools = {
        "search": search,
        "calculator": calculator,
        "get_weather": get_weather,
        "query_database": query_database
    }
    
    # 初始化LLM(这里用模拟)
    class MockLLM:
        async def agenerate(self, prompts):
            # 模拟LLM响应
            return json.dumps({
                "thought": "用户想知道上海的天气,我可以直接调用天气查询工具",
                "action": {
                    "type": "tool_call",
                    "tool": "get_weather",
                    "arguments": {"location": "上海"}
                }
            })
    
    # 创建Agent
    agent = ReactAgent(
        llm=MockLLM(),
        tools=tools
    )
    
    # 运行
    result = await agent.run("上海今天天气怎么样?")
    print(f"最终结果:{result}")


# 运行演示
if __name__ == "__main__":
    asyncio.run(demo())

四、工具模块与Function Calling

4.1 工具注册与调用机制

# Python实现:工具注册与Function Calling
from typing import Dict, List, Any, Callable, Optional
from dataclasses import dataclass
from enum import Enum
import inspect
import json

@dataclass
class ToolDefinition:
    """工具定义"""
    name: str
    description: str
    parameters: Dict[str, Any]  # JSON Schema
    func: Callable

class ToolRegistry:
    """
    工具注册表
    管理所有可用的工具,提供Function Calling接口
    """
    
    def __init__(self):
        self._tools: Dict[str, ToolDefinition] = {}
    
    def register(self, func: Callable, name: Optional[str] = None):
        """
        注册工具函数
        支持装饰器或直接调用
        """
        tool_name = name or func.__name__
        
        # 从函数签名提取参数schema
        sig = inspect.signature(func)
        parameters = self._extract_parameters(sig)
        
        # 获取文档描述
        doc = func.__doc__ or ""
        description = doc.strip().split("\n")[0] if doc else "无描述"
        
        tool_def = ToolDefinition(
            name=tool_name,
            description=description,
            parameters=parameters,
            func=func
        )
        
        self._tools[tool_name] = tool_def
        return func  # 支持装饰器
    
    def _extract_parameters(self, sig: inspect.Signature) -> Dict[str, Any]:
        """从函数签名提取JSON Schema参数"""
        properties = {}
        required = []
        
        for param_name, param in sig.parameters.items():
            # 跳过self参数
            if param_name == "self":
                continue
            
            # 确定类型
            if param.annotation == str:
                param_type = "string"
            elif param.annotation == int:
                param_type = "integer"
            elif param.annotation == float:
                param_type = "number"
            elif param.annotation == bool:
                param_type = "boolean"
            elif param.annotation == list:
                param_type = "array"
            elif param.annotation == dict:
                param_type = "object"
            else:
                param_type = "string"  # 默认string
            
            properties[param_name] = {
                "type": param_type,
                "description": f"参数 {param_name}"
            }
            
            # 如果有默认值就不是必须
            if param.default == inspect.Parameter.empty:
                required.append(param_name)
        
        return {
            "type": "object",
            "properties": properties,
            "required": required
        }
    
    def get_tool_schemas(self) -> List[Dict[str, Any]]:
        """
        获取所有工具的Schema(用于Function Calling)
        """
        schemas = []
        for tool in self._tools.values():
            schemas.append({
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.parameters
                }
            })
        return schemas
    
    def call(self, name: str, arguments: Dict[str, Any]) -> Any:
        """
        调用工具
        """
        if name not in self._tools:
            raise ValueError(f"Tool not found: {name}")
        
        tool = self._tools[name]
        
        # 验证参数
        self._validate_arguments(tool, arguments)
        
        # 执行
        return tool.func(**arguments)
    
    def _validate_arguments(self, tool: ToolDefinition, args: Dict):
        """验证参数"""
        required = tool.parameters.get("required", [])
        for req in required:
            if req not in args:
                raise ValueError(f"Missing required argument: {req}")


# 使用示例:装饰器注册工具
registry = ToolRegistry()

@registry.register
async def search_web(query: str, max_results: int = 5) -> str:
    """
    搜索互联网获取实时信息
    适用于查询新闻、天气、股票价格等实时数据
    """
    # 实际实现调用搜索API
    return f"搜索'{query}'得到{max_results}条结果..."

@registry.register
async def send_email(to: str, subject: str, body: str) -> str:
    """
    发送电子邮件
    适用于通知、报告、提醒等场景
    """
    # 实际实现调用邮件API
    return f"邮件已发送至{to},主题:{subject}"

@registry.register
def calculate(expression: str) -> float:
    """
    数学计算器
    支持加减乘除、幂运算、括号等
    """
    try:
        result = eval(expression, {"__builtins__": {}}, {})
        return result
    except Exception as e:
        raise ValueError(f"计算错误:{e}")


# 获取Function Calling格式的工具定义
def get_openai_function_schemas():
    """生成OpenAI格式的Function Calling定义"""
    schemas = []
    for tool in registry._tools.values():
        schemas.append({
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description,
                "parameters": tool.parameters
            }
        })
    return schemas

4.2 LangChain Agent实现

# Python实现:基于LangChain的Agent
from langchain.agents import create_agent, AgentExecutor
from langchain.agents.middleware import (
    PIIMiddleware,
    SummarizationMiddleware,
    ToolRetryMiddleware
)
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from typing import List, Dict, Any

# 1. 定义工具
@tool
def search_knowledge_base(query: str) -> str:
    """
    在企业知识库中搜索相关信息
    适用于查询公司政策、产品文档、历史案例等
    """
    # 实际实现连接向量数据库
    return f"知识库搜索结果:{query}的相关内容..."

@tool
def create_calendar_event(
    title: str, 
    start_time: str, 
    duration_minutes: int = 30,
    attendees: List[str] = None
) -> str:
    """
    创建日历事件/会议
    适用于安排会议、设置提醒、预约时间等
    """
    attendees = attendees or []
    return f"已创建会议:{title}\n时间:{start_time}\n时长:{duration_minutes}分钟\n参会人:{', '.join(attendees)}"

@tool
def send_notification(
    channel: str,
    message: str,
    recipients: List[str] = None
) -> str:
    """
    发送通知消息
    支持邮件、Slack、企业微信等渠道
    """
    return f"已通过{channel}发送通知:{message}\n接收人:{', '.join(recipients or ['全员'])}"

@tool
def query_database(sql: str) -> str:
    """
    执行SQL查询数据库
    适用于获取业务数据、生成报表等
    """
    # 实际实现连接数据库
    return f"查询结果:执行 {sql} 返回的数据..."

# 2. 初始化LLM
llm = ChatOpenAI(
    model="gpt-4o",
    temperature=0,
    api_key="your-api-key"
)

# 3. 收集所有工具
tools = [
    search_knowledge_base,
    create_calendar_event,
    send_notification,
    query_database
]

# 4. 定义系统提示
system_prompt = """你是一个智能办公助手,帮助用户完成日常工作任务。

你的能力:
1. 搜索企业知识库获取相关信息
2. 创建日历事件和会议
3. 发送通知给团队成员
4. 查询数据库获取业务数据

工作原则:
- 在执行操作前,先理解用户的需求
- 如果信息不足,主动询问用户
- 执行后向用户确认结果
- 遇到错误时,说明原因并提供替代方案
"""

# 5. 创建Agent
agent = create_agent(
    model=llm,
    tools=tools,
    system_prompt=system_prompt
)

# 6. 添加中间件(生产级功能)
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    middleware=[
        # PII检测:自动脱敏敏感信息
        PIIMiddleware(
            "email",
            strategy="redact",
            apply_to_input=True
        ),
        PIIMiddleware(
            "phone",
            strategy="mask",
            apply_to_input=True
        ),
        
        # 自动摘要:管理上下文长度
        SummarizationMiddleware(
            model=llm,
            max_tokens_before_summary=4000,
            messages_to_keep=20
        ),
        
        # 工具重试:自动重试失败的调用
        ToolRetryMiddleware(
            max_retries=3,
            backoff_factor=2.0,
            initial_delay=1.0,
            max_delay=60.0,
            jitter=True
        )
    ],
    verbose=True,
    max_iterations=10
)

# 7. 执行任务
async def run_demo():
    """演示Agent执行任务"""
    
    # 任务1:安排会议并通知
    result1 = await agent_executor.ainvoke({
        "input": "请安排明天下午3点的产品评审会议,时长1小时,邀请产品经理张三和技术负责人李四参加"
    })
    print("任务1结果:", result1["output"])
    
    # 任务2:查询数据并通知
    result2 = await agent_executor.ainvoke({
        "input": "查询本月销售额,如果超过去年同期10%以上,就给销售团队发通知庆祝"
    })
    print("任务2结果:", result2["output"])


# 8. 流式执行(实时显示推理过程)
async def run_streaming():
    """流式执行,展示Agent思考过程"""
    
    async for chunk in agent_executor.astream({
        "input": "帮我搜索最新的AI Agent技术趋势,然后写一份报告"
    }):
        if hasattr(chunk, "messages"):
            # 处理流式消息
            for message in chunk["messages"]:
                if hasattr(message, "type"):
                    print(f"[{message.type}]", end=" ")
                if hasattr(message, "content"):
                    print(message.content[:100], "...")


# LangChain 1.0 新API:简化的Agent创建
def create_simple_agent():
    """使用LangChain 1.0的简化API创建Agent"""
    
    from langchain.agents import create_agent
    
    # 一个函数搞定Agent创建
    agent = create_agent(
        model="openai:gpt-4o",
        tools=[search_knowledge_base, create_calendar_event],
        system_prompt="你是一个专业的助手,帮助用户完成知识查询和日程安排。"
    )
    
    # 直接调用
    result = agent.invoke({
        "messages": [{"role": "user", "content": "帮我安排明天的周会"}]
    })
    
    return result

4.3 多Agent协作

# Python实现:多Agent协作系统
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import asyncio

class AgentRole(Enum):
    SUPERVISOR = "supervisor"      # 监督者:协调其他Agent
    RESEARCHER = "researcher"      # 研究者:信息检索
    CODER = "coder"                # 开发者:代码编写
    WRITER = "writer"              # 写手:内容创作
    ANALYST = "analyst"           # 分析师:数据分析

@dataclass
class AgentMessage:
    """Agent间消息"""
    from_agent: str
    to_agent: str  # or "broadcast"
    content: Any
    msg_type: str  # "task" | "result" | "status" | "error"

@dataclass
class AgentConfig:
    """Agent配置"""
    name: str
    role: AgentRole
    system_prompt: str
    tools: List[str]
    capabilities: List[str]

class MultiAgentSystem:
    """
    多Agent协作系统
    支持Supervisor模式:监督者协调多个专业Agent
    """
    
    def __init__(self, llm_factory):
        self.llm_factory = llm_factory
        self.agents: Dict[str, AgentConfig] = {}
        self.message_queue: asyncio.Queue = asyncio.Queue()
    
    def register_agent(self, config: AgentConfig):
        """注册Agent"""
        self.agents[config.name] = config
    
    async def run_supervisor_workflow(
        self,
        task: str,
        agent_configs: List[AgentConfig]
    ) -> str:
        """
        Supervisor工作流:
        1. 监督者分解任务
        2. 分派给专业Agent
        3. 收集结果整合输出
        """
        
        # 注册所有Agent
        for config in agent_configs:
            self.register_agent(config)
        
        # 1. Supervisor分解任务
        supervisor_llm = self.llm_factory.create(role="supervisor")
        task_plan = await supervisor_llm.decompose_task(task)
        
        # 2. 并行/串行执行子任务
        results = {}
        for subtask in task_plan["subtasks"]:
            agent_name = subtask["assigned_to"]
            agent_config = self.agents[agent_name]
            
            # 创建专门的Agent实例
            agent_llm = self.llm_factory.create(
                role=agent_config.role,
                system_prompt=agent_config.system_prompt
            )
            
            # 执行
            result = await agent_llm.execute(
                task=subtask["description"],
                tools=agent_config.tools
            )
            
            results[subtask["id"]] = {
                "agent": agent_name,
                "result": result
            }
        
        # 3. Supervisor整合结果
        final_result = await supervisor_llm.synthesize(task, results)
        
        return final_result


# CrewAI风格的多Agent实现
class CrewAIAgent:
    """CrewAI风格的角色化Agent"""
    
    def __init__(
        self,
        role: str,
        goal: str,
        backstory: str,
        tools: List = None
    ):
        self.role = role
        self.goal = goal
        self.backstory = backstory
        self.tools = tools or []
    
    async def execute_task(self, task: str, context: Dict = None) -> str:
        """执行任务"""
        prompt = f"""
        角色:{self.role}
        目标:{self.goal}
        背景:{self.backstory}
        
        任务:{task}
        
        {f"上下文:{context}" if context else ""}
        
        请完成任务并报告结果。
        """
        
        # 调用LLM
        result = await self._call_llm(prompt)
        return result


class CrewAIOrchestrator:
    """
    CrewAI编排器
    协调多个角色化Agent完成任务
    """
    
    def __init__(self, agents: List[CrewAIAgent], verbose: bool = True):
        self.agents = {a.role: a for a in agents}
        self.verbose = verbose
    
    async def kickoff(self, task: str) -> str:
        """
        启动Crew执行任务
        按顺序执行,每个Agent的输出传递给下一个
        """
        current_input = task
        context = {}
        
        for agent in self.agents.values():
            if self.verbose:
                print(f"[{agent.role}] 正在执行...")
            
            result = await agent.execute_task(current_input, context)
            
            if self.verbose:
                print(f"[{agent.role}] 完成")
            
            # 传递结果给下一个Agent
            context[agent.role] = result
            current_input = f"基于{agent.role}的结果:{result}"
        
        return context
    
    async def kickoff_parallel(self, task: str) -> Dict[str, str]:
        """
        并行执行所有Agent
        然后聚合结果
        """
        async def run_agent(agent):
            return agent.role, await agent.execute_task(task)
        
        # 并行执行
        results = await asyncio.gather(
            *[run_agent(agent) for agent in self.agents.values()]
        )
        
        return dict(results)


# 使用示例
async def demo_multi_agent():
    """多Agent协作演示"""
    
    # 定义专业Agent
    researcher = CrewAIAgent(
        role="研究员",
        goal="收集并分析相关信息",
        backstory="你是一位专业的研究员,擅长信息检索和分析",
        tools=["search"]
    )
    
    analyst = CrewAIAgent(
        role="分析师",
        goal="从数据中提取洞察",
        backstory="你是一位资深数据分析师,擅长数据解读",
        tools=["analyze"]
    )
    
    writer = CrewAIAgent(
        role="撰稿人",
        goal="将分析结果转化为清晰报告",
        backstory="你是一位专业撰稿人,擅长撰写技术报告",
        tools=["write"]
    )
    
    # 创建Crew
    crew = CrewAIOrchestrator(
        agents=[researcher, analyst, writer],
        verbose=True
    )
    
    # 执行任务
    result = await crew.kickoff(
        "分析2026年AI Agent市场趋势并撰写报告"
    )
    
    print("最终报告:", result["撰稿人"])

五、2026年主流Agent框架对比

5.1 LangGraph vs AutoGen vs CrewAI

特性LangGraphAutoGenCrewAI
核心理念图结构工作流对话驱动协作角色化团队
适用场景复杂决策链、生产级应用代码生成、多Agent对话任务链、内容创作
学习曲线陡峭(需编程基础)中等平缓(非技术友好)
流程控制确定性(流程图式)灵活性高(LLM决策)顺序+条件分支
多Agent支持强(图节点)强(对话编排)强(角色分工)
状态管理内置持久化需自行实现内置
企业采用Uber, LinkedIn, Klarna微软生态初创公司、中小企业
代码示例复杂但可控简洁但隐晦直观但受限

5.2 LangGraph工作流编排

# Python实现:LangGraph工作流
from langgraph.graph import StateGraph, END, START
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, List
from pydantic import BaseModel

# 1. 定义状态
class AgentState(TypedDict):
    messages: List[str]
    current_task: str
    task_results: dict
    next_step: str

# 2. 定义节点函数
def research_node(state: AgentState) -> AgentState:
    """研究节点:搜索相关信息"""
    query = state["current_task"]
    results = search_knowledge(query)
    
    return {
        **state,
        "task_results": {"research": results},
        "next_step": "analyze"
    }

def analyze_node(state: AgentState) -> AgentState:
    """分析节点:分析研究结果"""
    research_results = state["task_results"]["research"]
    analysis = analyze_data(research_results)
    
    return {
        **state,
        "task_results": {
            **state["task_results"],
            "analysis": analysis
        },
        "next_step": "write"
    }

def write_node(state: AgentState) -> AgentState:
    """写作节点:撰写报告"""
    analysis = state["task_results"]["analysis"]
    report = write_report(analysis)
    
    return {
        **state,
        "task_results": {
            **state["task_results"],
            "report": report
        },
        "messages": state["messages"] + [report],
        "next_step": END
    }

# 3. 构建图
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("research", research_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("write", write_node)

# 设置入口和出口
workflow.add_edge(START, "research")
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "write")
workflow.add_edge("write", END)

# 4. 编译(支持检查点和恢复)
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)

# 5. 执行
async def run_graph():
    initial_state = {
        "messages": [],
        "current_task": "分析2026年AI Agent技术趋势",
        "task_results": {},
        "next_step": ""
    }
    
    # 普通执行
    result = await app.ainvoke(initial_state)
    print("最终报告:", result["task_results"]["report"])
    
    # 带检查点的执行(可恢复)
    config = {"configurable": {"thread_id": "123"}}
    for chunk in app.astream(initial_state, config):
        print("执行步骤:", chunk)

# 6. 条件分支
def should_analyze(state: AgentState) -> str:
    """条件判断:是否进入分析阶段"""
    if "error" in state.get("task_results", {}).get("research", ""):
        return "research"  # 重试研究
    return "analyze"

# 添加条件边
workflow.add_conditional_edges(
    "research",
    should_analyze,
    {
        "analyze": "analyze",
        "research": "research"
    }
)

六、生产级Agent系统架构

6.1 企业级Agent架构设计

┌────────────────────────────────────────────────────────────────────────┐
│                           用户交互层                                    │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │ Web UI   │  │ API Gateway│ │ Webhook  │  │ 定时任务  │              │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘              │
└────────────────────────────────────────────────────────────────────────┘
                                   ↓
┌────────────────────────────────────────────────────────────────────────┐
│                           安全与网关层                                  │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │ 认证授权 │  │ 限流熔断  │  │ 输入过滤 │  │ 审计日志  │              │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘              │
└────────────────────────────────────────────────────────────────────────┘
                                   ↓
┌────────────────────────────────────────────────────────────────────────┐
│                           Agent编排层                                  │
│  ┌────────────────────────────────────────────────────────────────┐  │
│  │                    LangGraph / CrewAI / AutoGen                  │  │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐            │  │
│  │  │Supervisor│  │Researcher│  │  Coder  │  │  Writer │  ...      │  │
│  │  └─────────┘  └─────────┘  └─────────┘  └─────────┘            │  │
│  └────────────────────────────────────────────────────────────────┘  │
│                           ↑ 工具层 ↑                                   │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │   MCP    │  │  Function │  │  RAG     │  │ Database │              │
│  │  Server  │  │  Calling  │  │  Engine  │  │  Access  │              │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘              │
└────────────────────────────────────────────────────────────────────────┘
                                   ↓
┌────────────────────────────────────────────────────────────────────────┐
│                           记忆与存储层                                  │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │ Redis    │  │ Pinecone │  │ Postgres │  │ S3/MinIO │              │
│  │ 缓存     │  │ 向量存储  │  │ 结构化数据│  │ 文件存储  │              │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘              │
└────────────────────────────────────────────────────────────────────────┘
                                   ↓
┌────────────────────────────────────────────────────────────────────────┐
│                           基础设施层                                    │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │Kubernetes│  │GPU集群   │  │ vLLM/TGI │  │  监控    │              │
│  │ 容器编排  │  │ 推理服务  │  │ 模型服务  │  │(Prometheus)│              │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘              │
└────────────────────────────────────────────────────────────────────────┘

6.2 MCP协议:工具标准化

# MCP (Model Context Protocol) 协议实现
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import json

@dataclass
class MCPResource:
    """MCP资源定义"""
    uri: str
    name: str
    mime_type: str
    description: str

@dataclass
class MCPTool:
    """MCP工具定义"""
    name: str
    description: str
    input_schema: Dict[str, Any]

class MCPServer:
    """
    MCP服务器实现
    提供标准化的工具和资源访问接口
    """
    
    def __init__(self, name: str):
        self.name = name
        self.tools: Dict[str, MCPTool] = {}
        self.resources: Dict[str, MCPResource] = {}
    
    def register_tool(self, tool: MCPTool):
        """注册工具"""
        self.tools[tool.name] = tool
    
    def register_resource(self, resource: MCPResource):
        """注册资源"""
        self.resources[resource.uri] = resource
    
    async def handle_request(self, request: Dict) -> Dict:
        """
        处理MCP请求
        """
        method = request.get("method")
        
        if method == "tools/list":
            return self._list_tools()
        elif method == "tools/call":
            return await self._call_tool(request["params"])
        elif method == "resources/list":
            return self._list_resources()
        elif method == "resources/read":
            return self._read_resource(request["params"]["uri"])
        else:
            return {"error": f"Unknown method: {method}"}
    
    def _list_tools(self) -> Dict:
        """列出所有工具"""
        return {
            "tools": [
                {
                    "name": tool.name,
                    "description": tool.description,
                    "inputSchema": tool.input_schema
                }
                for tool in self.tools.values()
            ]
        }
    
    async def _call_tool(self, params: Dict) -> Dict:
        """调用工具"""
        tool_name = params["name"]
        arguments = params.get("arguments", {})
        
        if tool_name not in self.tools:
            return {"error": f"Tool not found: {tool_name}"}
        
        # 获取工具处理器
        handler = self._get_tool_handler(tool_name)
        
        try:
            result = await handler(**arguments)
            return {"content": [{"type": "text", "text": str(result)}]}
        except Exception as e:
            return {"error": str(e)}
    
    def _get_tool_handler(self, name: str):
        """获取工具处理器"""
        # 实际实现应该从注册表中获取
        pass
    
    def _list_resources(self) -> Dict:
        """列出所有资源"""
        return {
            "resources": [
                {
                    "uri": r.uri,
                    "name": r.name,
                    "mimeType": r.mime_type,
                    "description": r.description
                }
                for r in self.resources.values()
            ]
        }
    
    def _read_resource(self, uri: str) -> Dict:
        """读取资源"""
        if uri not in self.resources:
            return {"error": f"Resource not found: {uri}"}
        
        resource = self.resources[uri]
        content = self._load_resource_content(uri)
        
        return {
            "contents": [{
                "uri": uri,
                "mimeType": resource.mime_type,
                "text": content
            }]
        }
    
    def _load_resource_content(self, uri: str) -> str:
        """加载资源内容"""
        pass


# MCP客户端:连接MCP服务器
class MCPClient:
    """MCP客户端"""
    
    def __init__(self, server_url: str):
        self.server_url = server_url
        self.session_id = None
    
    async def connect(self) -> str:
        """连接服务器"""
        # 建立连接,获取session_id
        self.session_id = "session-123"
        return self.session_id
    
    async def list_tools(self) -> List[MCPTool]:
        """列出可用工具"""
        response = await self._send_request({
            "jsonrpc": "2.0",
            "id": 1,
            "method": "tools/list",
            "params": {}
        })
        
        return [MCPTool(**t) for t in response["tools"]]
    
    async def call_tool(self, name: str, arguments: Dict) -> Any:
        """调用工具"""
        response = await self._send_request({
            "jsonrpc": "2.0",
            "id": 2,
            "method": "tools/call",
            "params": {
                "name": name,
                "arguments": arguments
            }
        })
        
        return response.get("content", [{}])[0].get("text")
    
    async def _send_request(self, request: Dict) -> Dict:
        """发送请求"""
        # 实际实现通过HTTP/WebSocket发送
        pass

七、实战案例:智能客服Agent系统

7.1 需求分析

构建一个智能客服Agent,能够:

  1. 理解用户问题(FAQ查询、订单处理、技术支持)
  2. 调用知识库检索答案
  3. 查询订单状态
  4. 创建工单
  5. 发送通知

7.2 完整实现

# Python实现:智能客服Agent系统
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio
import json

class Intent(Enum):
    FAQ_QUERY = "faq_query"
    ORDER_STATUS = "order_status"
    ORDER_CANCEL = "order_cancel"
    COMPLAINT = "complaint"
    TECHNICAL_SUPPORT = "technical_support"
    HUMAN_HANDOVER = "human_handover"
    UNKNOWN = "unknown"

@dataclass
class ConversationContext:
    """对话上下文"""
    user_id: str
    session_id: str
    intent: Intent = Intent.UNKNOWN
    entities: Dict[str, Any] = None
    history: List[Dict] = None
    state: str = "initial"
    
    def __post_init__(self):
        if self.entities is None:
            self.entities = {}
        if self.history is None:
            self.history = []

class CustomerServiceAgent:
    """
    智能客服Agent
    支持意图识别、知识库问答、订单处理、工单创建
    """
    
    def __init__(self, config: Dict):
        self.config = config
        self.llm = self._init_llm(config.get("llm"))
        self.tools = self._init_tools(config.get("tools"))
        self.rag = self._init_rag(config.get("vector_store"))
        self.nlu = IntentClassifier(self.llm)
    
    def _init_llm(self, config: Dict):
        """初始化LLM"""
        return ChatLLM(
            model=config.get("model", "gpt-4o"),
            api_key=config.get("api_key")
        )
    
    def _init_tools(self, config: Dict) -> Dict:
        """初始化工具"""
        tools = {}
        
        # 知识库查询
        if "knowledge_base" in config:
            tools["search_knowledge"] = KnowledgeBaseTool(
                config["knowledge_base"]
            )
        
        # 订单服务
        if "order_service" in config:
            tools["query_order"] = OrderServiceTool(
                config["order_service"]
            )
            tools["cancel_order"] = OrderServiceTool(
                config["order_service"],
                action="cancel"
            )
        
        # 工单系统
        if "ticket_system" in config:
            tools["create_ticket"] = TicketTool(
                config["ticket_system"]
            )
        
        # 通知服务
        if "notification" in config:
            tools["send_notification"] = NotificationTool(
                config["notification"]
            )
        
        return tools
    
    def _init_rag(self, config: Dict):
        """初始化RAG"""
        return RAGEngine(
            vector_store=config.get("url"),
            embedding_model=config.get("embedding_model")
        )
    
    async def handle_message(
        self,
        user_id: str,
        message: str,
        session_id: Optional[str] = None
    ) -> Dict:
        """
        处理用户消息
        """
        # 1. 创建/恢复上下文
        context = await self._get_or_create_context(
            user_id, session_id
        )
        
        # 2. 意图识别
        intent = await self.nlu.classify(message, context)
        context.intent = intent
        
        # 3. 实体抽取
        entities = await self._extract_entities(message, intent)
        context.entities.update(entities)
        
        # 4. 根据意图执行
        if intent == Intent.FAQ_QUERY:
            result = await self._handle_faq(message, context)
        elif intent == Intent.ORDER_STATUS:
            result = await self._handle_order_status(context)
        elif intent == Intent.ORDER_CANCEL:
            result = await self._handle_order_cancel(context)
        elif intent == Intent.COMPLAINT:
            result = await self._handle_complaint(message, context)
        elif intent == Intent.TECHNICAL_SUPPORT:
            result = await self._handle_technical_support(message, context)
        elif intent == Intent.HUMAN_HANDOVER:
            result = await self._handover_to_human(context)
        else:
            result = await self._handle_unknown(message, context)
        
        # 5. 更新上下文
        context.history.append({
            "user": message,
            "agent": result["message"],
            "intent": intent.value,
            "entities": entities
        })
        await self._save_context(context)
        
        # 6. 返回结果
        return {
            "message": result["message"],
            "intent": intent.value,
            "suggestions": result.get("suggestions", []),
            "requires_handover": result.get("requires_handover", False)
        }
    
    async def _handle_faq(
        self,
        query: str,
        context: ConversationContext
    ) -> Dict:
        """处理FAQ查询"""
        
        # RAG检索
        relevant_docs = await self.rag.search(query, top_k=3)
        
        if relevant_docs:
            # 合成答案
            answer = await self.llm.generate(
                prompt=f"""
                用户问题:{query}
                
                相关知识:
                {chr(10).join([d['content'] for d in relevant_docs])}
                
                请用友好的语气回答用户问题。如果知识库中的信息不完整,请明确说明。
                """
            )
            
            return {
                "message": answer,
                "suggestions": [
                    "还有什么可以帮您?",
                    "查看订单状态",
                    "联系人工客服"
                ]
            }
        else:
            return {
                "message": "抱歉,我暂时没有找到相关信息。让我为您转接人工客服。",
                "requires_handover": True
            }
    
    async def _handle_order_status(
        self,
        context: ConversationContext
    ) -> Dict:
        """处理订单状态查询"""
        
        order_id = context.entities.get("order_id")
        
        if not order_id:
            # 需要用户提供订单号
            return {
                "message": "请提供您的订单号,我可以帮您查询订单状态。",
                "suggestions": []
            }
        
        # 调用订单服务
        tool = self.tools["query_order"]
        order_info = await tool.execute(order_id=order_id)
        
        return {
            "message": f"""
            您的订单 {order_id} 状态如下:
            - 订单状态:{order_info['status']}
            - 下单时间:{order_info['create_time']}
            - 预计送达:{order_info.get('delivery_time', '待确定')}
            
            还有什么需要帮您查询的吗?
            """,
            "suggestions": [
                "取消订单",
                "查看物流",
                "申请退款"
            ]
        }
    
    async def _handle_order_cancel(
        self,
        context: ConversationContext
    ) -> Dict:
        """处理订单取消"""
        
        order_id = context.entities.get("order_id")
        
        if not order_id:
            return {
                "message": "请提供您要取消的订单号。",
                "suggestions": []
            }
        
        # 调用订单服务取消
        tool = self.tools["cancel_order"]
        result = await tool.execute(order_id=order_id)
        
        if result["success"]:
            return {
                "message": f"订单 {order_id} 已成功取消,款项将在3-5个工作日内退回。",
                "suggestions": [
                    "查看其他商品",
                    "联系人工客服"
                ]
            }
        else:
            return {
                "message": f"抱歉,订单取消失败:{result.get('reason', '请联系人工客服')}",
                "suggestions": [
                    "联系人工客服",
                    "查看订单详情"
                ]
            }
    
    async def _handle_complaint(
        self,
        message: str,
        context: ConversationContext
    ) -> Dict:
        """处理投诉"""
        
        # 创建工单
        tool = self.tools["create_ticket"]
        ticket = await tool.execute(
            title=f"客户投诉 - {context.user_id}",
            description=message,
            category="complaint",
            priority="high"
        )
        
        return {
            "message": f"""
            我已记录您的反馈并创建工单(编号:{ticket['id']})。
            我们的客服团队将在24小时内与您联系。
            
            如有紧急问题,请拨打客服热线:400-xxx-xxxx
            """,
            "suggestions": [
                "查看其他问题",
                "联系人工客服"
            ]
        }
    
    async def _handle_technical_support(
        self,
        message: str,
        context: ConversationContext
    ) -> Dict:
        """处理技术支持"""
        
        # 先尝试知识库
        relevant_docs = await self.rag.search(
            message, 
            top_k=3,
            filter={"category": "technical"}
        )
        
        if relevant_docs:
            # 检查问题复杂度
            complexity = await self._assess_complexity(message, relevant_docs)
            
            if complexity < 0.7:
                # 简单问题,知识库解决
                answer = await self.llm.generate(
                    prompt=f"""
                    用户技术问题:{message}
                    
                    相关文档:
                    {chr(10).join([d['content'] for d in relevant_docs])}
                    
                    请给出详细的解决步骤。
                    """
                )
                
                return {
                    "message": answer,
                    "suggestions": [
                        "问题解决了吗?",
                        "还有其他问题吗?"
                    ]
                }
        
        # 复杂问题,转人工
        return {
            "message": """
            您好,这个问题需要更专业的技术支持。
            我将为您转接技术专员,请稍候...
            """,
            "requires_handover": True
        }
    
    async def _handover_to_human(self, context: ConversationContext) -> Dict:
        """转接人工"""
        
        # 记录转接信息
        await self._save_handover_record(context)
        
        return {
            "message": """
            正在为您转接人工客服,请稍候...
            当前排队人数:3人,预计等待时间:2分钟
            """,
            "requires_handover": True,
            "suggestions": []
        }
    
    async def _handle_unknown(
        self,
        message: str,
        context: ConversationContext
    ) -> Dict:
        """处理未知意图"""
        
        clarification = await self.llm.generate(
            prompt=f"""
            用户消息:{message}
            
            请判断用户的意图,可能的意图包括:
            - 咨询问题(FAQ)
            - 订单查询
            - 投诉建议
            - 技术支持
            
            如果无法判断,请友好地询问用户具体需求。
            """
        )
        
        return {
            "message": clarification,
            "suggestions": [
                "我可以帮您查询订单",
                "我可以回答常见问题",
                "我可以为您创建工单"
            ]
        }
    
    async def _get_or_create_context(
        self,
        user_id: str,
        session_id: Optional[str]
    ) -> ConversationContext:
        """获取或创建对话上下文"""
        # 实际实现从缓存/数据库获取
        pass
    
    async def _save_context(self, context: ConversationContext):
        """保存对话上下文"""
        # 实际实现保存到缓存/数据库
        pass


# 意图分类器
class IntentClassifier:
    """基于LLM的意图分类器"""
    
    def __init__(self, llm):
        self.llm = llm
        self.intents = [i.value for i in Intent]
    
    async def classify(
        self,
        message: str,
        context: ConversationContext
    ) -> Intent:
        """分类用户意图"""
        
        prompt = f"""
        用户消息:{message}
        
        可选意图:
        {chr(10).join(self.intents)}
        
        请根据用户消息判断其意图,返回最匹配的一个。
        直接返回意图名称,不要有其他内容。
        """
        
        result = await self.llm.generate(prompt)
        
        try:
            return Intent(result.strip())
        except:
            return Intent.UNKNOWN
    
    async def _assess_complexity(
        self,
        query: str,
        documents: List[Dict]
    ) -> float:
        """评估问题复杂度(0-1)"""
        # 简单实现:基于文档数量和匹配度
        avg_relevance = sum(d.get("score", 0) for d in documents) / len(documents)
        return 1 - avg_relevance


# 工具类定义
class KnowledgeBaseTool:
    """知识库查询工具"""
    def __init__(self, config: Dict):
        self.endpoint = config["endpoint"]
    
    async def execute(self, query: str, **kwargs) -> List[Dict]:
        # 调用知识库API
        pass

class OrderServiceTool:
    """订单服务工具"""
    def __init__(self, config: Dict, action: str = "query"):
        self.endpoint = config["endpoint"]
        self.action = action
    
    async def execute(self, **kwargs) -> Dict:
        # 调用订单服务API
        pass

class TicketTool:
    """工单创建工具"""
    def __init__(self, config: Dict):
        self.endpoint = config["endpoint"]
    
    async def execute(self, **kwargs) -> Dict:
        # 调用工单系统API
        pass

class NotificationTool:
    """通知工具"""
    def __init__(self, config: Dict):
        self.endpoint = config["endpoint"]
    
    async def execute(self, **kwargs) -> Dict:
        # 调用通知服务
        pass


# RAG引擎
class RAGEngine:
    """检索增强生成引擎"""
    def __init__(self, vector_store, embedding_model):
        self.vector_store = vector_store
        self.embedding_model = embedding_model
    
    async def search(
        self,
        query: str,
        top_k: int = 5,
        filter: Dict = None
    ) -> List[Dict]:
        # 向量检索
        pass


# Chat LLM封装
class ChatLLM:
    """Chat LLM封装"""
    def __init__(self, model: str, api_key: str):
        self.model = model
        self.api_key = api_key
    
    async def generate(self, prompt: str) -> str:
        # 调用LLM API
        pass

八、2026年AI Agent技术趋势与展望

8.1 2026年关键技术趋势

据Forbes China发布的《2026 Forbes China AI TECH Enterprises TOP 50》报告,以及Google I/O 2026、百度Create 2026等大会释放的信号,2026年AI Agent领域呈现以下趋势:

趋势说明代表产品
Agentic AI成熟从单Agent向多Agent协作演进Google Gemini Spark、百度DuMate
工具生态完善MCP协议推动工具标准化Anthropic Claude Code
记忆系统升级分层记忆 + 持续学习LangChain Memory组件
企业级安全PII检测、沙箱执行、审计日志Kore Artemis平台
DAA指标兴起从Token到日活智能体数百度提出的新度量衡

8.2 技术路线图

2026年AI Agent发展路线图:

Q1-Q2:
├── 单Agent能力增强
│   ├── 更强的推理能力(CoT/ToT)
│   ├── 长期记忆支持
│   └── 多模态工具调用
│
├── 多Agent协作成熟
│   ├── Supervisor模式标准化
│   ├── CrewAI风格编排
│   └── 跨Agent知识共享
│
Q3-Q4:
├── 企业级Agent平台
│   ├── 可观测性与监控
│   ├── 安全与合规
│   └── 规模化部署
│
└── 垂直领域Agent
    ├── 金融Agent
    ├── 医疗Agent
    ├── 法律Agent
    └── 研发Agent

8.3 开发者建议

对于AI开发者:

  1. 掌握核心概念:ReAct循环、工具调用、记忆管理是基础
  2. 选择合适框架:LangGraph适合复杂生产系统,CrewAI适合快速原型
  3. 重视工具设计:好的工具设计是Agent能力的关键
  4. 关注安全:输入过滤、输出审核、沙箱执行不可或缺

对于企业决策者:

  1. 评估场景:并非所有场景都需要Agent,简单问答用LLM即可
  2. 数据准备:高质量的知识库和工具是Agent效果的前提
  3. 合规先行:AI监管加强,合规落地是必选项
  4. 渐进式推进:从简单场景开始,逐步扩展

九、总结

2026年,AI Agent已从概念走向成熟商用。从Google的"智能体Gemini时代"到百度的DAA新度量衡,AI正在完成从"能回答问题"到"能完成任务"的根本性转变。

本文深入剖析了AI Agent的技术架构、核心算法和工程实现:

  1. 架构层面:Agent由交互层、核心层、工具层、数据层组成,形成完整的感知-规划-行动-反思闭环

  2. 核心能力:ReAct推理循环、Function Calling工具调用、分层记忆系统、自我反思机制

  3. 工程实现:LangChain/LangGraph提供标准化开发框架,MCP协议推动工具生态标准化

  4. 发展趋势:多Agent协作、企业级安全、垂直领域Agent将成为新的增长点

关键要点

  • AI Agent = 大脑(LLM)+ 记忆 + 工具 + 规划引擎
  • ReAct循环让Agent具备"思考后行动"的能力
  • 工具生态和记忆系统是Agent能力的关键瓶颈
  • 2026年是多Agent协作和企业级Agent平台的元年

参考资料

  1. Google I/O 2026 - Gemini Omni & Agent Announcements (https://io.google/2026)
  2. 百度 Create 2026 - DuMate & DAA Metric (https://create.baidu.com)
  3. Forbes China AI TECH Enterprises TOP 50 (2026)
  4. LangChain Documentation (https://docs.langchain.com)
  5. Anthropic Claude Code (https://claude.ai/code)
  6. ReAct: Synergizing Reasoning and Acting in Language Models (arxiv.org)