AI Agent智能体技术:从问答到执行的范式革命
标签:AI Agent、大模型、智能体、LangChain、ReAct、Function Calling
📖 前言
2026年5月20日,谷歌I/O 2026大会在美国加州山景城开幕。谷歌CEO桑达尔·皮查伊(Sundar Pichai)在大会上宣布:“我们已正式进入’智能体Gemini时代’。”就在同一天,百度Create 2026大会上,百度创始人李彦宏提出AI时代的“度量衡”——DAA(Daily Active Agents,日活智能体数),标志着AI产业从“参数竞赛”正式转向“价值验证”阶段。
从Google的Gemini Spark到百度的DuMate,从Anthropic的Claude Code到OpenAI的GPT-5.5,**AI Agent(智能体)**已从概念走向成熟商用,成为2026年最炙手可热的技术方向。本文将深入剖析AI Agent的技术架构、核心算法、工程实现,并提供完整的代码示例,帮助开发者快速掌握这一革命性技术。
一、AI Agent的本质:感知-规划-行动-反思闭环
1.1 什么是AI Agent?
AI Agent(智能体)是一种能够自主理解目标、规划行动路径、调用外部工具、执行复杂任务的AI系统。与传统的问答式AI不同,Agent具备:
- 自主决策能力:根据环境反馈动态调整执行策略
- 工具调用能力:通过Function Calling/API与外部系统交互
- 长期记忆能力:跨会话保持上下文和学习成果
- 自我反思能力:评估执行效果并持续优化
用一句话概括:AI Agent = 大脑(LLM)+ 记忆 + 工具 + 规划引擎
1.2 从“Chat”到“Act”的范式转移
传统ChatBot的交互模式:
用户输入 → LLM生成回答 → 结束
AI Agent的交互模式:
用户输入 → 理解目标 → 分解任务 → 调用工具 → 观察结果 → 自我反思 → 完成任务
这种ReAct(Reasoning + Acting)循环让AI从被动回答者转变为主动执行者。谷歌将这种转变称为“2026年的痛点从’幻觉’转向’懒惰’——用户不想看一大段总结,他们想要结果。”
1.3 Agent的核心价值
| 维度 | 传统AI | AI Agent |
|---|---|---|
| 交互方式 | 问答 | 任务执行 |
| 工具使用 | 无 | Function Calling/API |
| 上下文 | 单次会话 | 长期记忆 |
| 错误处理 | 返回错误 | 自我反思重试 |
| 价值交付 | 信息 | 可执行结果 |
二、AI Agent技术架构深度解析
2.1 整体架构概览
一个完整的AI Agent系统包含四大核心层:
┌─────────────────────────────────────────────────────────────┐
│ 交互层(Interaction Layer) │
│ 对话UI │ API接口 │ 多模态输入 │ Webhook │ 定时调度 │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Agent核心层(Agent Core Layer) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │
│ │ 规划模块 │ → │ 推理模块 │ → │ LLM大脑 │ ← │ 工具选择模块 │ │
│ │Planning │ │Reasoning│ │ LLM │ │Tool Select │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────────┘ │
│ ┌─────────┐ ┌─────────┐ ┌─────────────────────────┐ │
│ │记忆模块 │ │执行模块 │ │ 自我反思模块 │ │
│ │ Memory │ │Execute │ │ Self-Reflection │ │
│ └─────────┘ └─────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 工具层(Tools Layer) │
│ 搜索 │ 计算器 │ 代码执行 │ API调用 │ 文件操作 │ 数据库 │ 邮件 │
│ MCP协议(工具标准) │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 数据层(Data & Knowledge Layer) │
│ 向量数据库 │ 知识图谱 │ 文档库 │ Redis缓存 │ RAG引擎 │
└─────────────────────────────────────────────────────────────┘
2.2 感知模块(Perception Module)
感知模块负责处理多模态输入,包括文本、语音、图像等,并将其转换为标准化的观测数据。
# Python实现:多模态输入感知模块
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from enum import Enum
import json
class InputType(Enum):
TEXT = "text"
VOICE = "voice"
IMAGE = "image"
FILE = "file"
API_EVENT = "api_event"
SCHEDULED = "scheduled"
@dataclass
class Observation:
"""标准化观测数据"""
input_type: InputType
content: str
raw_data: Any
metadata: Dict[str, Any]
timestamp: float
session_id: str
user_id: Optional[str] = None
class PerceptionModule:
"""
感知模块:多模态信息输入与处理
核心功能:
1. 输入标准化
2. 去噪与整理
3. 高质量预处理
"""
def __init__(self):
self.input_handlers = {
InputType.TEXT: self._handle_text,
InputType.VOICE: self._handle_voice,
InputType.IMAGE: self._handle_image,
InputType.FILE: self._handle_file,
InputType.API_EVENT: self._handle_api_event,
InputType.SCHEDULED: self._handle_scheduled,
}
def perceive(self, raw_input: Any, input_type: InputType,
session_id: str, user_id: Optional[str] = None) -> Observation:
"""
主入口:处理各类输入并返回标准化观测
"""
# 1. 选择对应的处理器
handler = self.input_handlers.get(input_type)
if not handler:
raise ValueError(f"Unsupported input type: {input_type}")
# 2. 调用处理器
content, metadata = handler(raw_input)
# 3. 构建标准化观测
observation = Observation(
input_type=input_type,
content=content,
raw_data=raw_input,
metadata=metadata,
timestamp=time.time(),
session_id=session_id,
user_id=user_id
)
return observation
def _handle_text(self, raw_input: str) -> tuple[str, Dict]:
"""处理文本输入"""
# 文本清洗
cleaned = raw_input.strip()
# 意图识别元数据
metadata = {
"length": len(cleaned),
"has_url": "http" in cleaned,
"has_code": "```" in cleaned,
"language": detect_language(cleaned)
}
return cleaned, metadata
def _handle_voice(self, raw_input: bytes) -> tuple[str, Dict]:
"""处理语音输入 - 需要ASR"""
# 实际项目中调用语音识别服务
# asr_result = asr_service.recognize(raw_input)
asr_result = "语音转文字的识别结果" # 模拟
metadata = {
"duration": len(raw_input), # 模拟
"sample_rate": 16000,
"confidence": 0.95
}
return asr_result, metadata
def _handle_image(self, raw_input: bytes) -> tuple[str, Dict]:
"""处理图像输入"""
# OCR识别
# text = ocr_service.extract(raw_input)
extracted_text = "图像中的文字内容" # 模拟
metadata = {
"image_size": len(raw_input),
"format": "png/jpeg",
"ocr_confidence": 0.92
}
return extracted_text, metadata
def _handle_file(self, raw_input: Dict) -> tuple[str, Dict]:
"""处理文件上传"""
# 解析文件内容
file_path = raw_input.get("path")
file_type = raw_input.get("type")
content = self._parse_file(file_path, file_type)
metadata = {
"file_name": raw_input.get("name"),
"file_size": raw_input.get("size"),
"file_type": file_type
}
return content, metadata
def _handle_api_event(self, raw_input: Dict) -> tuple[str, Dict]:
"""处理API事件触发"""
event_type = raw_input.get("event_type")
event_data = raw_input.get("data", {})
content = f"Event: {event_type}, Data: {json.dumps(event_data)}"
metadata = {
"event_source": raw_input.get("source"),
"event_id": raw_input.get("id")
}
return content, metadata
def _handle_scheduled(self, raw_input: Dict) -> tuple[str, Dict]:
"""处理定时任务触发"""
task_name = raw_input.get("task_name")
schedule_time = raw_input.get("schedule_time")
content = f"Scheduled task: {task_name}"
metadata = {
"schedule_type": raw_input.get("type"),
"schedule_time": schedule_time
}
return content, metadata
2.3 记忆模块(Memory Module)
记忆是Agent从“一次性工具”进化为“持续学习助手”的关键。系统采用分层记忆架构:
# Python实现:分层记忆系统
from typing import List, Optional, Dict, Any
import time
import tiktoken
from dataclasses import dataclass, field
@dataclass
class MemoryItem:
"""记忆单元"""
content: str
memory_type: str # 'short_term' | 'long_term' | 'experience'
timestamp: float
importance: float = 0.5 # 0-1,越高越重要
access_count: int = 0
embedding: Optional[List[float]] = None
class ShortTermMemory:
"""
短期记忆:工作记忆,维护当前对话上下文
使用滑动窗口机制,避免上下文溢出
"""
def __init__(self, max_tokens: int = 4000):
self.buffer: List[MemoryItem] = []
self.max_tokens = max_tokens
self.tokenizer = tiktoken.get_encoding("cl100k_base")
def add(self, content: str, memory_type: str = "short_term"):
"""添加记忆"""
item = MemoryItem(
content=content,
memory_type=memory_type,
timestamp=time.time()
)
self.buffer.append(item)
self._trim_by_tokens()
def get_context(self, max_items: Optional[int] = None) -> str:
"""获取上下文"""
items = self.buffer[-max_items:] if max_items else self.buffer
return "\n".join([item.content for item in items])
def get_all(self) -> List[MemoryItem]:
"""获取所有记忆"""
return self.buffer.copy()
def _trim_by_tokens(self):
"""按Token数量裁剪"""
total = sum(
len(self.tokenizer.encode(item.content))
for item in self.buffer
)
while total > self.max_tokens and len(self.buffer) > 2:
removed = self.buffer.pop(0)
total -= len(self.tokenizer.encode(removed.content))
def clear(self):
"""清空短期记忆"""
self.buffer.clear()
class LongTermMemory:
"""
长期记忆:持久化存储关键经验和知识
基于向量数据库实现语义检索
"""
def __init__(self, vector_store, embeddings):
self.store = vector_store # Pinecone/Weaviate客户端
self.embeddings = embeddings # OpenAI embeddings等
async def store_experience(
self,
task: str,
outcome: str,
lesson: str,
metadata: Optional[Dict] = None
):
"""
存储经验到长期记忆
"""
doc_content = f"""
任务: {task}
结果: {outcome}
经验教训: {lesson}
"""
doc = {
"pageContent": doc_content.strip(),
"metadata": {
"type": "experience",
"timestamp": time.time(),
"task_type": metadata.get("task_type") if metadata else None,
**(metadata or {})
}
}
await self.store.add_documents([doc])
# 同步到经验库
await self._sync_to_experience_store(task, outcome, lesson)
async def recall(self, query: str, k: int = 5) -> List[str]:
"""
检索相关记忆
"""
results = await self.store.similarity_search(query, k=k)
return [r.pageContent for r in results]
async def _sync_to_experience_store(self, task: str, outcome: str, lesson: str):
"""同步到结构化经验库"""
# 提取关键模式
pass
async def consolidate(self, session_summary: str):
"""
记忆巩固:将重要短期记忆迁移到长期记忆
"""
# 识别重要信息
important_patterns = await self._extract_important_patterns(session_summary)
for pattern in important_patterns:
await self.store_experience(
task=pattern["task"],
outcome=pattern["outcome"],
lesson=pattern["lesson"]
)
class HierarchicalMemory:
"""
分层记忆管理器
协调短期记忆和长期记忆的交互
"""
def __init__(
self,
vector_store,
embeddings,
short_term_max_tokens: int = 4000
):
self.short_term = ShortTermMemory(max_tokens=short_term_max_tokens)
self.long_term = LongTermMemory(vector_store, embeddings)
self.experience_store = {} # 简化版经验库
async def remember(self, query: str, k: int = 3) -> List[str]:
"""
统一检索接口:先查短期记忆,再查长期记忆
"""
# 1. 检查短期记忆
short_results = self._search_short_term(query)
# 2. 查长期记忆
long_results = await self.long_term.recall(query, k=k)
# 3. 合并去重
combined = short_results + long_results
return self._deduplicate(combined)[:k]
def _search_short_term(self, query: str) -> List[str]:
"""短期记忆检索(简单关键词匹配)"""
results = []
query_keywords = set(query.lower().split())
for item in self.short_term.get_all():
item_keywords = set(item.content.lower().split())
if query_keywords & item_keywords: # 有交集
results.append(item.content)
return results
def update(self, content: str):
"""更新记忆"""
self.short_term.add(content)
async def consolidate_session(self, summary: str):
"""会话结束时巩固记忆"""
await self.long_term.consolidate(summary)
2.4 规划模块(Planning Module)
规划模块负责将复杂目标分解为可执行的子任务序列:
# Python实现:基于CoT/ToT的规划模块
from typing import List, Optional, Dict, Any, Callable
from dataclasses import dataclass
from enum import Enum
import json
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
BLOCKED = "blocked"
@dataclass
class SubTask:
"""子任务定义"""
id: str
description: str
status: TaskStatus = TaskStatus.PENDING
dependencies: List[str] = None # 依赖的子任务ID
tool_required: Optional[str] = None
result: Optional[Any] = None
error: Optional[str] = None
def __post_init__(self):
if self.dependencies is None:
self.dependencies = []
class PlanningModule:
"""
规划模块:任务分解与执行计划生成
支持多种规划策略:
1. Chain of Thought (CoT) - 链式思维
2. Tree of Thoughts (ToT) - 思维树
3. Subgoal Decomposition - 子目标分解
"""
def __init__(self, llm):
self.llm = llm
async def create_plan(
self,
goal: str,
strategy: str = "cot",
context: Optional[Dict] = None
) -> List[SubTask]:
"""
创建执行计划
"""
if strategy == "cot":
return await self._plan_with_cot(goal, context)
elif strategy == "tot":
return await self._plan_with_tot(goal, context)
elif strategy == "simple":
return await self._plan_simple(goal)
else:
raise ValueError(f"Unknown strategy: {strategy}")
async def _plan_with_cot(
self,
goal: str,
context: Optional[Dict]
) -> List[SubTask]:
"""
Chain of Thought 规划
让LLM展示完整推理过程
"""
prompt = f"""
目标:{goal}
{f"上下文信息:{json.dumps(context, ensure_ascii=False)}" if context else ""}
请按以下步骤思考并输出计划:
1. 分析目标的核心需求
2. 识别完成目标需要的步骤
3. 确定步骤之间的依赖关系
4. 输出结构化的任务列表
输出格式(JSON):
{{
"reasoning": "你的推理过程",
"tasks": [
{{"id": "1", "description": "任务1", "dependencies": []}},
{{"id": "2", "description": "任务2", "dependencies": ["1"]}}
]
}}
"""
response = await self.llm.agenerate([prompt])
result = json.loads(response)
tasks = [
SubTask(
id=t["id"],
description=t["description"],
dependencies=t.get("dependencies", [])
)
for t in result["tasks"]
]
return tasks
async def _plan_with_tot(
self,
goal: str,
context: Optional[Dict]
) -> List[SubTask]:
"""
Tree of Thoughts 规划
探索多条执行路径,选择最优方案
"""
# 第一阶段:生成多个可能的计划
prompt = f"""
目标:{goal}
请生成3种不同的执行方案,每种方案从不同角度思考:
方案A:从XX角度...
方案B:从YY角度...
方案C:从ZZ角度...
对于每种方案,列出关键步骤。
"""
response = await self.llm.agenerate([prompt])
# 第二阶段:评估选择最优方案
evaluation_prompt = f"""
针对目标"{goal}",评估以下方案的优劣:
{response}
选择最佳方案,并转换为任务列表。
输出格式(JSON):
{{
"best_plan": "方案X",
"reasoning": "选择理由",
"tasks": [...]
}}
"""
final_response = await self.llm.agenerate([evaluation_prompt])
result = json.loads(final_response)
return [
SubTask(
id=t["id"],
description=t["description"],
dependencies=t.get("dependencies", [])
)
for t in result["tasks"]
]
async def _plan_simple(self, goal: str) -> List[SubTask]:
"""
简单分解:逗号分隔的步骤
"""
# 简单的关键词提取
task_descriptions = [d.strip() for d in goal.split(",") if d.strip()]
return [
SubTask(id=str(i+1), description=desc)
for i, desc in enumerate(task_descriptions)
]
def get_ready_tasks(
self,
tasks: List[SubTask]
) -> List[SubTask]:
"""
获取所有依赖已满足、可以执行的任务
"""
ready = []
completed_ids = {
t.id for t in tasks
if t.status == TaskStatus.COMPLETED
}
for task in tasks:
if task.status != TaskStatus.PENDING:
continue
# 检查依赖
deps_satisfied = all(
dep_id in completed_ids
for dep_id in task.dependencies
)
if deps_satisfied:
ready.append(task)
return ready
2.5 推理引擎(Reasoning Engine)
推理引擎是Agent的"大脑",负责将规划、记忆、工具选择整合起来:
# Python实现:基于ReAct的推理引擎
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import json
class ReasoningStep(Enum):
THINK = "think" # 思考
ACT = "act" # 行动
OBSERVE = "observe" # 观察
REFLECT = "reflect" # 反思
FINAL = "final" # 最终答案
@dataclass
class ReasoningTrace:
"""推理轨迹"""
step: ReasoningStep
thought: str
action: Optional[Dict] = None
observation: Optional[str] = None
reflection: Optional[str] = None
class ReasoningEngine:
"""
推理引擎:实现ReAct循环
Think → Act → Observe → Reflect → ...
"""
def __init__(
self,
llm,
planning_module: PlanningModule,
memory_module: HierarchicalMemory,
tool_registry
):
self.llm = llm
self.planning = planning_module
self.memory = memory_module
self.tools = tool_registry
self.max_iterations = 10
async def think(
self,
query: str,
plan: Optional[List[SubTask]] = None,
context: Optional[Dict] = None
) -> str:
"""
核心推理循环
"""
trace: List[ReasoningTrace] = []
# 初始化计划
if not plan:
plan = await self.planning.create_plan(query, strategy="cot")
# 获取当前状态
current_task_idx = self._get_current_task_index(plan)
# ReAct循环
for iteration in range(self.max_iterations):
# 1. Think - 分析当前状态
thought = await self._think_step(
query, plan, current_task_idx, trace, context
)
# 2. 检查是否完成
if thought.get("is_complete"):
final_trace = ReasoningTrace(
step=ReasoningStep.FINAL,
thought=thought.get("final_answer", "")
)
trace.append(final_trace)
break
# 3. Act - 决定行动
action = await self._act_step(
thought, plan, current_task_idx
)
# 4. Execute - 执行行动
result = await self._execute_action(action)
# 5. Observe - 观察结果
observation = await self._observe_step(result, action)
# 6. Reflect - 反思并更新状态
should_continue, updated_plan = await self._reflect_step(
trace, observation, plan, current_task_idx
)
plan = updated_plan
current_task_idx = self._get_current_task_index(plan)
if not should_continue:
break
# 生成最终答案
return self._generate_final_answer(query, trace)
async def _think_step(
self,
query: str,
plan: List[SubTask],
current_idx: int,
trace: List[ReasoningTrace],
context: Optional[Dict]
) -> Dict[str, Any]:
"""思考步骤"""
# 检索相关记忆
relevant_memories = await self.memory.remember(query, k=3)
# 构建思考提示
prompt = f"""
当前问题:{query}
执行计划:
{self._format_plan(plan, current_idx)}
历史推理:
{self._format_trace(trace)}
相关记忆:
{chr(10).join(relevant_memories)}
{f"上下文:{json.dumps(context, ensure_ascii=False)}" if context else ""}
请分析:
1. 当前应该关注哪个任务?
2. 需要调用什么工具?
3. 是否可以给出最终答案?
"""
response = await self.llm.agenerate([prompt])
# 解析LLM响应
return self._parse_thought_response(response)
async def _act_step(
self,
thought: Dict[str, Any],
plan: List[SubTask],
current_idx: int
) -> Dict[str, Any]:
"""行动步骤"""
action_type = thought.get("action_type")
if action_type == "use_tool":
# 确定使用哪个工具
tool_name = thought.get("tool_name")
tool_args = thought.get("tool_args", {})
return {
"type": "tool_call",
"tool": tool_name,
"args": tool_args
}
elif action_type == "update_plan":
# 更新计划
return {
"type": "update_plan",
"updates": thought.get("plan_updates", [])
}
elif action_type == "answer":
# 直接回答
return {
"type": "final_answer",
"answer": thought.get("answer")
}
else:
raise ValueError(f"Unknown action type: {action_type}")
async def _execute_action(self, action: Dict) -> Any:
"""执行行动"""
if action["type"] == "tool_call":
tool_name = action["tool"]
tool_args = action["args"]
# 从工具注册表获取工具
tool = self.tools.get(tool_name)
if not tool:
raise ValueError(f"Tool not found: {tool_name}")
# 执行工具
result = await tool.execute(**tool_args)
return result
elif action["type"] == "update_plan":
# 计划更新不需要执行
return None
elif action["type"] == "final_answer":
return action["answer"]
async def _observe_step(
self,
result: Any,
action: Dict
) -> str:
"""观察步骤"""
if result is None:
return "计划已更新,等待下一步执行"
# 将结果格式化为观察
return str(result)
async def _reflect_step(
self,
trace: List[ReasoningTrace],
observation: str,
plan: List[SubTask],
current_idx: int
) -> tuple[bool, List[SubTask]]:
"""反思步骤"""
# 更新计划状态
if current_idx < len(plan):
plan[current_idx].status = TaskStatus.COMPLETED
plan[current_idx].result = observation
# 检查是否还有待执行任务
ready_tasks = self.planning.get_ready_tasks(plan)
should_continue = len(ready_tasks) > 0
return should_continue, plan
def _parse_thought_response(self, response: str) -> Dict[str, Any]:
"""解析LLM的思考响应"""
# 简化版:实际应使用结构化输出
# 这里模拟解析JSON
try:
return json.loads(response)
except:
return {
"action_type": "use_tool",
"tool_name": "unknown",
"is_complete": False
}
def _format_plan(self, plan: List[SubTask], current_idx: int) -> str:
"""格式化计划为文本"""
lines = []
for i, task in enumerate(plan):
status = "✅" if task.status == TaskStatus.COMPLETED else "📋"
prefix = "→" if i == current_idx else " "
lines.append(f"{prefix} {status} [{task.id}] {task.description}")
return "\n".join(lines)
def _format_trace(self, trace: List[ReasoningTrace]) -> str:
"""格式化推理轨迹"""
lines = []
for t in trace:
if t.step == ReasoningStep.THINK:
lines.append(f"思考:{t.thought}")
elif t.step == ReasoningStep.OBSERVE:
lines.append(f"观察:{t.observation}")
return "\n".join(lines) if lines else "(暂无)"
def _get_current_task_index(self, plan: List[SubTask]) -> int:
"""获取当前应该执行的任务索引"""
for i, task in enumerate(plan):
if task.status == TaskStatus.PENDING:
# 检查依赖
deps_done = all(
plan[int(d)-1].status == TaskStatus.COMPLETED
for d in task.dependencies
)
if deps_done:
return i
return len(plan) # 所有任务完成
def _generate_final_answer(self, query: str, trace: List[ReasoningTrace]) -> str:
"""生成最终答案"""
for t in reversed(trace):
if t.step == ReasoningStep.FINAL:
return t.thought
# 没有明确的最终答案,从观察中提取
observations = [t.observation for t in trace if t.observation]
return observations[-1] if observations else "任务完成"
三、ReAct模式:推理与行动的协同
3.1 ReAct模式的原理
ReAct(Reasoning + Acting)是由清华大学和普林斯顿大学提出的推理框架,核心思想是让LLM在推理过程中交替进行“思考”和“行动”,通过观察行动结果来指导后续推理。
ReAct循环:
┌─────────────┐
│ 1. Thought │ 分析问题,理解当前状态
└──────┬──────┘
↓
┌─────────────┐
│ 2. Action │ 选择并执行工具
└──────┬──────┘
↓
┌─────────────┐
│ 3. Observe │ 获取执行结果
└──────┬──────┘
↓
┌─────────────┐
│ 4. Reflect │ 评估结果,调整策略
└──────┬──────┘
↓
└──→ (循环直到任务完成)
3.2 ReAct代码实现
# Python实现:ReAct Agent完整示例
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import json
import asyncio
class ActionType(Enum):
TOOL_CALL = "tool_call"
FINAL_ANSWER = "final_answer"
ASK_CLARIFICATION = "ask_clarification"
@dataclass
class ToolCall:
"""工具调用"""
name: str
arguments: Dict[str, Any]
result: Optional[Any] = None
error: Optional[str] = None
@dataclass
class ReActStep:
"""ReAct循环的单步"""
step_num: int
thought: str
action: Optional[ToolCall] = None
observation: Optional[str] = None
is_final: bool = False
class ReactAgent:
"""
ReAct Agent 实现
核心流程:
1. 思考(Thought):分析问题,决定下一步行动
2. 行动(Action):调用工具或生成答案
3. 观察(Observation):获取行动结果
4. 判断:是否完成?继续循环还是返回答案
"""
def __init__(
self,
llm,
tools: Dict[str, Callable],
system_prompt: Optional[str] = None
):
self.llm = llm
self.tools = tools
self.system_prompt = system_prompt or self._default_system_prompt()
self.max_steps = 15
def _default_system_prompt(self) -> str:
return """你是一个智能助手,可以通过调用工具来完成复杂任务。
可用工具:
{available_tools}
输出格式(JSON):
{{
"thought": "你的思考过程",
"action": {{
"type": "tool_call" | "final_answer",
"tool": "工具名(仅tool_call时)",
"arguments": {{...}}(仅tool_call时),
"answer": "最终答案(仅final_answer时)"
}}
}}
重要规则:
1. 如果需要外部信息,必须使用工具
2. 工具调用结果会作为observation返回
3. 连续调用同一工具不超过3次
4. 收集足够信息后给出最终答案
"""
async def run(self, query: str, context: Optional[Dict] = None) -> str:
"""
运行ReAct Agent
"""
steps: List[ReActStep] = []
history = []
current_query = query
for step_num in range(self.max_steps):
# 1. 构建提示
prompt = self._build_prompt(
current_query, history, steps
)
# 2. 调用LLM
response_text = await self.llm.agenerate([prompt])
response = json.loads(response_text)
thought = response.get("thought", "")
action = response.get("action", {})
# 3. 执行行动
step = ReActStep(
step_num=step_num + 1,
thought=thought
)
if action["type"] == "tool_call":
tool_name = action.get("tool")
tool_args = action.get("arguments", {})
# 检查工具是否存在
if tool_name not in self.tools:
observation = f"错误:工具 '{tool_name}' 不存在"
step.observation = observation
steps.append(step)
history.append({
"thought": thought,
"action": f"尝试调用工具 {tool_name}",
"observation": observation
})
continue
# 执行工具
try:
tool_func = self.tools[tool_name]
result = await self._execute_tool(tool_func, tool_args)
observation = str(result)
step.action = ToolCall(
name=tool_name,
arguments=tool_args,
result=result
)
step.observation = observation
history.append({
"thought": thought,
"action": f"调用 {tool_name}({tool_args})",
"observation": observation
})
# 动态决定下一步查询
current_query = f"基于之前的工具调用结果:{observation}\n\n请继续分析是否需要更多信息,或者给出最终答案。"
except Exception as e:
observation = f"工具执行错误:{str(e)}"
step.observation = observation
history.append({
"thought": thought,
"action": f"调用 {tool_name} 失败",
"observation": observation
})
elif action["type"] == "final_answer":
step.is_final = True
step.observation = action.get("answer", "")
steps.append(step)
return action.get("answer", "")
steps.append(step)
# 超过最大步数,返回最后结果
return self._summarize_steps(steps)
def _build_prompt(
self,
query: str,
history: List[Dict],
steps: List[ReActStep]
) -> str:
"""构建LLM提示"""
# 格式化可用工具
tool_descriptions = []
for name, func in self.tools.items():
desc = func.__doc__ or "无描述"
tool_descriptions.append(f"- {name}: {desc.strip().split(chr(10))[0]}")
tools_str = "\n".join(tool_descriptions)
# 格式化历史
history_str = ""
for h in history[-5:]: # 只保留最近5条
history_str += f"\n思考:{h['thought']}\n"
history_str += f"行动:{h['action']}\n"
history_str += f"观察:{h['observation']}\n"
prompt = f"""{self.system_prompt.format(available_tools=tools_str)}
当前问题:{query}
{'-' * 50}
历史执行记录:
{history_str or '(暂无)'}
{'-' * 50}
请分析当前状态,决定下一步行动:
"""
return prompt
async def _execute_tool(
self,
tool_func: Callable,
args: Dict[str, Any]
) -> Any:
"""安全执行工具"""
# 支持同步和异步工具
if asyncio.iscoroutinefunction(tool_func):
return await tool_func(**args)
else:
return tool_func(**args)
def _summarize_steps(self, steps: List[ReActStep]) -> str:
"""总结步骤,返回最后观察"""
for step in reversed(steps):
if step.observation:
return step.observation
return "任务执行超时"
# 使用示例
async def demo():
"""ReAct Agent演示"""
# 定义工具
async def search(query: str) -> str:
"""网络搜索工具"""
return f"搜索结果:关于'{query}'的信息...\n来源:搜索引擎"
async def calculator(expression: str) -> str:
"""计算器工具"""
try:
result = eval(expression)
return f"计算结果:{result}"
except:
return f"计算错误:无效表达式 '{expression}'"
async def get_weather(location: str) -> str:
"""天气查询工具"""
return f"{location}的天气:晴天,25°C,适宜出行"
async def query_database(sql: str) -> str:
"""数据库查询工具"""
return f"查询结果:执行SQL '{sql}' 返回的数据..."
# 创建工具注册表
tools = {
"search": search,
"calculator": calculator,
"get_weather": get_weather,
"query_database": query_database
}
# 初始化LLM(这里用模拟)
class MockLLM:
async def agenerate(self, prompts):
# 模拟LLM响应
return json.dumps({
"thought": "用户想知道上海的天气,我可以直接调用天气查询工具",
"action": {
"type": "tool_call",
"tool": "get_weather",
"arguments": {"location": "上海"}
}
})
# 创建Agent
agent = ReactAgent(
llm=MockLLM(),
tools=tools
)
# 运行
result = await agent.run("上海今天天气怎么样?")
print(f"最终结果:{result}")
# 运行演示
if __name__ == "__main__":
asyncio.run(demo())
四、工具模块与Function Calling
4.1 工具注册与调用机制
# Python实现:工具注册与Function Calling
from typing import Dict, List, Any, Callable, Optional
from dataclasses import dataclass
from enum import Enum
import inspect
import json
@dataclass
class ToolDefinition:
"""工具定义"""
name: str
description: str
parameters: Dict[str, Any] # JSON Schema
func: Callable
class ToolRegistry:
"""
工具注册表
管理所有可用的工具,提供Function Calling接口
"""
def __init__(self):
self._tools: Dict[str, ToolDefinition] = {}
def register(self, func: Callable, name: Optional[str] = None):
"""
注册工具函数
支持装饰器或直接调用
"""
tool_name = name or func.__name__
# 从函数签名提取参数schema
sig = inspect.signature(func)
parameters = self._extract_parameters(sig)
# 获取文档描述
doc = func.__doc__ or ""
description = doc.strip().split("\n")[0] if doc else "无描述"
tool_def = ToolDefinition(
name=tool_name,
description=description,
parameters=parameters,
func=func
)
self._tools[tool_name] = tool_def
return func # 支持装饰器
def _extract_parameters(self, sig: inspect.Signature) -> Dict[str, Any]:
"""从函数签名提取JSON Schema参数"""
properties = {}
required = []
for param_name, param in sig.parameters.items():
# 跳过self参数
if param_name == "self":
continue
# 确定类型
if param.annotation == str:
param_type = "string"
elif param.annotation == int:
param_type = "integer"
elif param.annotation == float:
param_type = "number"
elif param.annotation == bool:
param_type = "boolean"
elif param.annotation == list:
param_type = "array"
elif param.annotation == dict:
param_type = "object"
else:
param_type = "string" # 默认string
properties[param_name] = {
"type": param_type,
"description": f"参数 {param_name}"
}
# 如果有默认值就不是必须
if param.default == inspect.Parameter.empty:
required.append(param_name)
return {
"type": "object",
"properties": properties,
"required": required
}
def get_tool_schemas(self) -> List[Dict[str, Any]]:
"""
获取所有工具的Schema(用于Function Calling)
"""
schemas = []
for tool in self._tools.values():
schemas.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters
}
})
return schemas
def call(self, name: str, arguments: Dict[str, Any]) -> Any:
"""
调用工具
"""
if name not in self._tools:
raise ValueError(f"Tool not found: {name}")
tool = self._tools[name]
# 验证参数
self._validate_arguments(tool, arguments)
# 执行
return tool.func(**arguments)
def _validate_arguments(self, tool: ToolDefinition, args: Dict):
"""验证参数"""
required = tool.parameters.get("required", [])
for req in required:
if req not in args:
raise ValueError(f"Missing required argument: {req}")
# 使用示例:装饰器注册工具
registry = ToolRegistry()
@registry.register
async def search_web(query: str, max_results: int = 5) -> str:
"""
搜索互联网获取实时信息
适用于查询新闻、天气、股票价格等实时数据
"""
# 实际实现调用搜索API
return f"搜索'{query}'得到{max_results}条结果..."
@registry.register
async def send_email(to: str, subject: str, body: str) -> str:
"""
发送电子邮件
适用于通知、报告、提醒等场景
"""
# 实际实现调用邮件API
return f"邮件已发送至{to},主题:{subject}"
@registry.register
def calculate(expression: str) -> float:
"""
数学计算器
支持加减乘除、幂运算、括号等
"""
try:
result = eval(expression, {"__builtins__": {}}, {})
return result
except Exception as e:
raise ValueError(f"计算错误:{e}")
# 获取Function Calling格式的工具定义
def get_openai_function_schemas():
"""生成OpenAI格式的Function Calling定义"""
schemas = []
for tool in registry._tools.values():
schemas.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters
}
})
return schemas
4.2 LangChain Agent实现
# Python实现:基于LangChain的Agent
from langchain.agents import create_agent, AgentExecutor
from langchain.agents.middleware import (
PIIMiddleware,
SummarizationMiddleware,
ToolRetryMiddleware
)
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from typing import List, Dict, Any
# 1. 定义工具
@tool
def search_knowledge_base(query: str) -> str:
"""
在企业知识库中搜索相关信息
适用于查询公司政策、产品文档、历史案例等
"""
# 实际实现连接向量数据库
return f"知识库搜索结果:{query}的相关内容..."
@tool
def create_calendar_event(
title: str,
start_time: str,
duration_minutes: int = 30,
attendees: List[str] = None
) -> str:
"""
创建日历事件/会议
适用于安排会议、设置提醒、预约时间等
"""
attendees = attendees or []
return f"已创建会议:{title}\n时间:{start_time}\n时长:{duration_minutes}分钟\n参会人:{', '.join(attendees)}"
@tool
def send_notification(
channel: str,
message: str,
recipients: List[str] = None
) -> str:
"""
发送通知消息
支持邮件、Slack、企业微信等渠道
"""
return f"已通过{channel}发送通知:{message}\n接收人:{', '.join(recipients or ['全员'])}"
@tool
def query_database(sql: str) -> str:
"""
执行SQL查询数据库
适用于获取业务数据、生成报表等
"""
# 实际实现连接数据库
return f"查询结果:执行 {sql} 返回的数据..."
# 2. 初始化LLM
llm = ChatOpenAI(
model="gpt-4o",
temperature=0,
api_key="your-api-key"
)
# 3. 收集所有工具
tools = [
search_knowledge_base,
create_calendar_event,
send_notification,
query_database
]
# 4. 定义系统提示
system_prompt = """你是一个智能办公助手,帮助用户完成日常工作任务。
你的能力:
1. 搜索企业知识库获取相关信息
2. 创建日历事件和会议
3. 发送通知给团队成员
4. 查询数据库获取业务数据
工作原则:
- 在执行操作前,先理解用户的需求
- 如果信息不足,主动询问用户
- 执行后向用户确认结果
- 遇到错误时,说明原因并提供替代方案
"""
# 5. 创建Agent
agent = create_agent(
model=llm,
tools=tools,
system_prompt=system_prompt
)
# 6. 添加中间件(生产级功能)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
middleware=[
# PII检测:自动脱敏敏感信息
PIIMiddleware(
"email",
strategy="redact",
apply_to_input=True
),
PIIMiddleware(
"phone",
strategy="mask",
apply_to_input=True
),
# 自动摘要:管理上下文长度
SummarizationMiddleware(
model=llm,
max_tokens_before_summary=4000,
messages_to_keep=20
),
# 工具重试:自动重试失败的调用
ToolRetryMiddleware(
max_retries=3,
backoff_factor=2.0,
initial_delay=1.0,
max_delay=60.0,
jitter=True
)
],
verbose=True,
max_iterations=10
)
# 7. 执行任务
async def run_demo():
"""演示Agent执行任务"""
# 任务1:安排会议并通知
result1 = await agent_executor.ainvoke({
"input": "请安排明天下午3点的产品评审会议,时长1小时,邀请产品经理张三和技术负责人李四参加"
})
print("任务1结果:", result1["output"])
# 任务2:查询数据并通知
result2 = await agent_executor.ainvoke({
"input": "查询本月销售额,如果超过去年同期10%以上,就给销售团队发通知庆祝"
})
print("任务2结果:", result2["output"])
# 8. 流式执行(实时显示推理过程)
async def run_streaming():
"""流式执行,展示Agent思考过程"""
async for chunk in agent_executor.astream({
"input": "帮我搜索最新的AI Agent技术趋势,然后写一份报告"
}):
if hasattr(chunk, "messages"):
# 处理流式消息
for message in chunk["messages"]:
if hasattr(message, "type"):
print(f"[{message.type}]", end=" ")
if hasattr(message, "content"):
print(message.content[:100], "...")
# LangChain 1.0 新API:简化的Agent创建
def create_simple_agent():
"""使用LangChain 1.0的简化API创建Agent"""
from langchain.agents import create_agent
# 一个函数搞定Agent创建
agent = create_agent(
model="openai:gpt-4o",
tools=[search_knowledge_base, create_calendar_event],
system_prompt="你是一个专业的助手,帮助用户完成知识查询和日程安排。"
)
# 直接调用
result = agent.invoke({
"messages": [{"role": "user", "content": "帮我安排明天的周会"}]
})
return result
4.3 多Agent协作
# Python实现:多Agent协作系统
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import asyncio
class AgentRole(Enum):
SUPERVISOR = "supervisor" # 监督者:协调其他Agent
RESEARCHER = "researcher" # 研究者:信息检索
CODER = "coder" # 开发者:代码编写
WRITER = "writer" # 写手:内容创作
ANALYST = "analyst" # 分析师:数据分析
@dataclass
class AgentMessage:
"""Agent间消息"""
from_agent: str
to_agent: str # or "broadcast"
content: Any
msg_type: str # "task" | "result" | "status" | "error"
@dataclass
class AgentConfig:
"""Agent配置"""
name: str
role: AgentRole
system_prompt: str
tools: List[str]
capabilities: List[str]
class MultiAgentSystem:
"""
多Agent协作系统
支持Supervisor模式:监督者协调多个专业Agent
"""
def __init__(self, llm_factory):
self.llm_factory = llm_factory
self.agents: Dict[str, AgentConfig] = {}
self.message_queue: asyncio.Queue = asyncio.Queue()
def register_agent(self, config: AgentConfig):
"""注册Agent"""
self.agents[config.name] = config
async def run_supervisor_workflow(
self,
task: str,
agent_configs: List[AgentConfig]
) -> str:
"""
Supervisor工作流:
1. 监督者分解任务
2. 分派给专业Agent
3. 收集结果整合输出
"""
# 注册所有Agent
for config in agent_configs:
self.register_agent(config)
# 1. Supervisor分解任务
supervisor_llm = self.llm_factory.create(role="supervisor")
task_plan = await supervisor_llm.decompose_task(task)
# 2. 并行/串行执行子任务
results = {}
for subtask in task_plan["subtasks"]:
agent_name = subtask["assigned_to"]
agent_config = self.agents[agent_name]
# 创建专门的Agent实例
agent_llm = self.llm_factory.create(
role=agent_config.role,
system_prompt=agent_config.system_prompt
)
# 执行
result = await agent_llm.execute(
task=subtask["description"],
tools=agent_config.tools
)
results[subtask["id"]] = {
"agent": agent_name,
"result": result
}
# 3. Supervisor整合结果
final_result = await supervisor_llm.synthesize(task, results)
return final_result
# CrewAI风格的多Agent实现
class CrewAIAgent:
"""CrewAI风格的角色化Agent"""
def __init__(
self,
role: str,
goal: str,
backstory: str,
tools: List = None
):
self.role = role
self.goal = goal
self.backstory = backstory
self.tools = tools or []
async def execute_task(self, task: str, context: Dict = None) -> str:
"""执行任务"""
prompt = f"""
角色:{self.role}
目标:{self.goal}
背景:{self.backstory}
任务:{task}
{f"上下文:{context}" if context else ""}
请完成任务并报告结果。
"""
# 调用LLM
result = await self._call_llm(prompt)
return result
class CrewAIOrchestrator:
"""
CrewAI编排器
协调多个角色化Agent完成任务
"""
def __init__(self, agents: List[CrewAIAgent], verbose: bool = True):
self.agents = {a.role: a for a in agents}
self.verbose = verbose
async def kickoff(self, task: str) -> str:
"""
启动Crew执行任务
按顺序执行,每个Agent的输出传递给下一个
"""
current_input = task
context = {}
for agent in self.agents.values():
if self.verbose:
print(f"[{agent.role}] 正在执行...")
result = await agent.execute_task(current_input, context)
if self.verbose:
print(f"[{agent.role}] 完成")
# 传递结果给下一个Agent
context[agent.role] = result
current_input = f"基于{agent.role}的结果:{result}"
return context
async def kickoff_parallel(self, task: str) -> Dict[str, str]:
"""
并行执行所有Agent
然后聚合结果
"""
async def run_agent(agent):
return agent.role, await agent.execute_task(task)
# 并行执行
results = await asyncio.gather(
*[run_agent(agent) for agent in self.agents.values()]
)
return dict(results)
# 使用示例
async def demo_multi_agent():
"""多Agent协作演示"""
# 定义专业Agent
researcher = CrewAIAgent(
role="研究员",
goal="收集并分析相关信息",
backstory="你是一位专业的研究员,擅长信息检索和分析",
tools=["search"]
)
analyst = CrewAIAgent(
role="分析师",
goal="从数据中提取洞察",
backstory="你是一位资深数据分析师,擅长数据解读",
tools=["analyze"]
)
writer = CrewAIAgent(
role="撰稿人",
goal="将分析结果转化为清晰报告",
backstory="你是一位专业撰稿人,擅长撰写技术报告",
tools=["write"]
)
# 创建Crew
crew = CrewAIOrchestrator(
agents=[researcher, analyst, writer],
verbose=True
)
# 执行任务
result = await crew.kickoff(
"分析2026年AI Agent市场趋势并撰写报告"
)
print("最终报告:", result["撰稿人"])
五、2026年主流Agent框架对比
5.1 LangGraph vs AutoGen vs CrewAI
| 特性 | LangGraph | AutoGen | CrewAI |
|---|---|---|---|
| 核心理念 | 图结构工作流 | 对话驱动协作 | 角色化团队 |
| 适用场景 | 复杂决策链、生产级应用 | 代码生成、多Agent对话 | 任务链、内容创作 |
| 学习曲线 | 陡峭(需编程基础) | 中等 | 平缓(非技术友好) |
| 流程控制 | 确定性(流程图式) | 灵活性高(LLM决策) | 顺序+条件分支 |
| 多Agent支持 | 强(图节点) | 强(对话编排) | 强(角色分工) |
| 状态管理 | 内置持久化 | 需自行实现 | 内置 |
| 企业采用 | Uber, LinkedIn, Klarna | 微软生态 | 初创公司、中小企业 |
| 代码示例 | 复杂但可控 | 简洁但隐晦 | 直观但受限 |
5.2 LangGraph工作流编排
# Python实现:LangGraph工作流
from langgraph.graph import StateGraph, END, START
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, List
from pydantic import BaseModel
# 1. 定义状态
class AgentState(TypedDict):
messages: List[str]
current_task: str
task_results: dict
next_step: str
# 2. 定义节点函数
def research_node(state: AgentState) -> AgentState:
"""研究节点:搜索相关信息"""
query = state["current_task"]
results = search_knowledge(query)
return {
**state,
"task_results": {"research": results},
"next_step": "analyze"
}
def analyze_node(state: AgentState) -> AgentState:
"""分析节点:分析研究结果"""
research_results = state["task_results"]["research"]
analysis = analyze_data(research_results)
return {
**state,
"task_results": {
**state["task_results"],
"analysis": analysis
},
"next_step": "write"
}
def write_node(state: AgentState) -> AgentState:
"""写作节点:撰写报告"""
analysis = state["task_results"]["analysis"]
report = write_report(analysis)
return {
**state,
"task_results": {
**state["task_results"],
"report": report
},
"messages": state["messages"] + [report],
"next_step": END
}
# 3. 构建图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("research", research_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("write", write_node)
# 设置入口和出口
workflow.add_edge(START, "research")
workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "write")
workflow.add_edge("write", END)
# 4. 编译(支持检查点和恢复)
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)
# 5. 执行
async def run_graph():
initial_state = {
"messages": [],
"current_task": "分析2026年AI Agent技术趋势",
"task_results": {},
"next_step": ""
}
# 普通执行
result = await app.ainvoke(initial_state)
print("最终报告:", result["task_results"]["report"])
# 带检查点的执行(可恢复)
config = {"configurable": {"thread_id": "123"}}
for chunk in app.astream(initial_state, config):
print("执行步骤:", chunk)
# 6. 条件分支
def should_analyze(state: AgentState) -> str:
"""条件判断:是否进入分析阶段"""
if "error" in state.get("task_results", {}).get("research", ""):
return "research" # 重试研究
return "analyze"
# 添加条件边
workflow.add_conditional_edges(
"research",
should_analyze,
{
"analyze": "analyze",
"research": "research"
}
)
六、生产级Agent系统架构
6.1 企业级Agent架构设计
┌────────────────────────────────────────────────────────────────────────┐
│ 用户交互层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Web UI │ │ API Gateway│ │ Webhook │ │ 定时任务 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────────────────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────────────────────────────────┐
│ 安全与网关层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 认证授权 │ │ 限流熔断 │ │ 输入过滤 │ │ 审计日志 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────────────────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────────────────────────────────┐
│ Agent编排层 │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ LangGraph / CrewAI / AutoGen │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │Supervisor│ │Researcher│ │ Coder │ │ Writer │ ... │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ ↑ 工具层 ↑ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ MCP │ │ Function │ │ RAG │ │ Database │ │
│ │ Server │ │ Calling │ │ Engine │ │ Access │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────────────────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────────────────────────────────┐
│ 记忆与存储层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Redis │ │ Pinecone │ │ Postgres │ │ S3/MinIO │ │
│ │ 缓存 │ │ 向量存储 │ │ 结构化数据│ │ 文件存储 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────────────────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────────────────────────────────┐
│ 基础设施层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Kubernetes│ │GPU集群 │ │ vLLM/TGI │ │ 监控 │ │
│ │ 容器编排 │ │ 推理服务 │ │ 模型服务 │ │(Prometheus)│ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────────────────────────────────────────────────────────┘
6.2 MCP协议:工具标准化
# MCP (Model Context Protocol) 协议实现
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import json
@dataclass
class MCPResource:
"""MCP资源定义"""
uri: str
name: str
mime_type: str
description: str
@dataclass
class MCPTool:
"""MCP工具定义"""
name: str
description: str
input_schema: Dict[str, Any]
class MCPServer:
"""
MCP服务器实现
提供标准化的工具和资源访问接口
"""
def __init__(self, name: str):
self.name = name
self.tools: Dict[str, MCPTool] = {}
self.resources: Dict[str, MCPResource] = {}
def register_tool(self, tool: MCPTool):
"""注册工具"""
self.tools[tool.name] = tool
def register_resource(self, resource: MCPResource):
"""注册资源"""
self.resources[resource.uri] = resource
async def handle_request(self, request: Dict) -> Dict:
"""
处理MCP请求
"""
method = request.get("method")
if method == "tools/list":
return self._list_tools()
elif method == "tools/call":
return await self._call_tool(request["params"])
elif method == "resources/list":
return self._list_resources()
elif method == "resources/read":
return self._read_resource(request["params"]["uri"])
else:
return {"error": f"Unknown method: {method}"}
def _list_tools(self) -> Dict:
"""列出所有工具"""
return {
"tools": [
{
"name": tool.name,
"description": tool.description,
"inputSchema": tool.input_schema
}
for tool in self.tools.values()
]
}
async def _call_tool(self, params: Dict) -> Dict:
"""调用工具"""
tool_name = params["name"]
arguments = params.get("arguments", {})
if tool_name not in self.tools:
return {"error": f"Tool not found: {tool_name}"}
# 获取工具处理器
handler = self._get_tool_handler(tool_name)
try:
result = await handler(**arguments)
return {"content": [{"type": "text", "text": str(result)}]}
except Exception as e:
return {"error": str(e)}
def _get_tool_handler(self, name: str):
"""获取工具处理器"""
# 实际实现应该从注册表中获取
pass
def _list_resources(self) -> Dict:
"""列出所有资源"""
return {
"resources": [
{
"uri": r.uri,
"name": r.name,
"mimeType": r.mime_type,
"description": r.description
}
for r in self.resources.values()
]
}
def _read_resource(self, uri: str) -> Dict:
"""读取资源"""
if uri not in self.resources:
return {"error": f"Resource not found: {uri}"}
resource = self.resources[uri]
content = self._load_resource_content(uri)
return {
"contents": [{
"uri": uri,
"mimeType": resource.mime_type,
"text": content
}]
}
def _load_resource_content(self, uri: str) -> str:
"""加载资源内容"""
pass
# MCP客户端:连接MCP服务器
class MCPClient:
"""MCP客户端"""
def __init__(self, server_url: str):
self.server_url = server_url
self.session_id = None
async def connect(self) -> str:
"""连接服务器"""
# 建立连接,获取session_id
self.session_id = "session-123"
return self.session_id
async def list_tools(self) -> List[MCPTool]:
"""列出可用工具"""
response = await self._send_request({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
})
return [MCPTool(**t) for t in response["tools"]]
async def call_tool(self, name: str, arguments: Dict) -> Any:
"""调用工具"""
response = await self._send_request({
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": name,
"arguments": arguments
}
})
return response.get("content", [{}])[0].get("text")
async def _send_request(self, request: Dict) -> Dict:
"""发送请求"""
# 实际实现通过HTTP/WebSocket发送
pass
七、实战案例:智能客服Agent系统
7.1 需求分析
构建一个智能客服Agent,能够:
- 理解用户问题(FAQ查询、订单处理、技术支持)
- 调用知识库检索答案
- 查询订单状态
- 创建工单
- 发送通知
7.2 完整实现
# Python实现:智能客服Agent系统
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio
import json
class Intent(Enum):
FAQ_QUERY = "faq_query"
ORDER_STATUS = "order_status"
ORDER_CANCEL = "order_cancel"
COMPLAINT = "complaint"
TECHNICAL_SUPPORT = "technical_support"
HUMAN_HANDOVER = "human_handover"
UNKNOWN = "unknown"
@dataclass
class ConversationContext:
"""对话上下文"""
user_id: str
session_id: str
intent: Intent = Intent.UNKNOWN
entities: Dict[str, Any] = None
history: List[Dict] = None
state: str = "initial"
def __post_init__(self):
if self.entities is None:
self.entities = {}
if self.history is None:
self.history = []
class CustomerServiceAgent:
"""
智能客服Agent
支持意图识别、知识库问答、订单处理、工单创建
"""
def __init__(self, config: Dict):
self.config = config
self.llm = self._init_llm(config.get("llm"))
self.tools = self._init_tools(config.get("tools"))
self.rag = self._init_rag(config.get("vector_store"))
self.nlu = IntentClassifier(self.llm)
def _init_llm(self, config: Dict):
"""初始化LLM"""
return ChatLLM(
model=config.get("model", "gpt-4o"),
api_key=config.get("api_key")
)
def _init_tools(self, config: Dict) -> Dict:
"""初始化工具"""
tools = {}
# 知识库查询
if "knowledge_base" in config:
tools["search_knowledge"] = KnowledgeBaseTool(
config["knowledge_base"]
)
# 订单服务
if "order_service" in config:
tools["query_order"] = OrderServiceTool(
config["order_service"]
)
tools["cancel_order"] = OrderServiceTool(
config["order_service"],
action="cancel"
)
# 工单系统
if "ticket_system" in config:
tools["create_ticket"] = TicketTool(
config["ticket_system"]
)
# 通知服务
if "notification" in config:
tools["send_notification"] = NotificationTool(
config["notification"]
)
return tools
def _init_rag(self, config: Dict):
"""初始化RAG"""
return RAGEngine(
vector_store=config.get("url"),
embedding_model=config.get("embedding_model")
)
async def handle_message(
self,
user_id: str,
message: str,
session_id: Optional[str] = None
) -> Dict:
"""
处理用户消息
"""
# 1. 创建/恢复上下文
context = await self._get_or_create_context(
user_id, session_id
)
# 2. 意图识别
intent = await self.nlu.classify(message, context)
context.intent = intent
# 3. 实体抽取
entities = await self._extract_entities(message, intent)
context.entities.update(entities)
# 4. 根据意图执行
if intent == Intent.FAQ_QUERY:
result = await self._handle_faq(message, context)
elif intent == Intent.ORDER_STATUS:
result = await self._handle_order_status(context)
elif intent == Intent.ORDER_CANCEL:
result = await self._handle_order_cancel(context)
elif intent == Intent.COMPLAINT:
result = await self._handle_complaint(message, context)
elif intent == Intent.TECHNICAL_SUPPORT:
result = await self._handle_technical_support(message, context)
elif intent == Intent.HUMAN_HANDOVER:
result = await self._handover_to_human(context)
else:
result = await self._handle_unknown(message, context)
# 5. 更新上下文
context.history.append({
"user": message,
"agent": result["message"],
"intent": intent.value,
"entities": entities
})
await self._save_context(context)
# 6. 返回结果
return {
"message": result["message"],
"intent": intent.value,
"suggestions": result.get("suggestions", []),
"requires_handover": result.get("requires_handover", False)
}
async def _handle_faq(
self,
query: str,
context: ConversationContext
) -> Dict:
"""处理FAQ查询"""
# RAG检索
relevant_docs = await self.rag.search(query, top_k=3)
if relevant_docs:
# 合成答案
answer = await self.llm.generate(
prompt=f"""
用户问题:{query}
相关知识:
{chr(10).join([d['content'] for d in relevant_docs])}
请用友好的语气回答用户问题。如果知识库中的信息不完整,请明确说明。
"""
)
return {
"message": answer,
"suggestions": [
"还有什么可以帮您?",
"查看订单状态",
"联系人工客服"
]
}
else:
return {
"message": "抱歉,我暂时没有找到相关信息。让我为您转接人工客服。",
"requires_handover": True
}
async def _handle_order_status(
self,
context: ConversationContext
) -> Dict:
"""处理订单状态查询"""
order_id = context.entities.get("order_id")
if not order_id:
# 需要用户提供订单号
return {
"message": "请提供您的订单号,我可以帮您查询订单状态。",
"suggestions": []
}
# 调用订单服务
tool = self.tools["query_order"]
order_info = await tool.execute(order_id=order_id)
return {
"message": f"""
您的订单 {order_id} 状态如下:
- 订单状态:{order_info['status']}
- 下单时间:{order_info['create_time']}
- 预计送达:{order_info.get('delivery_time', '待确定')}
还有什么需要帮您查询的吗?
""",
"suggestions": [
"取消订单",
"查看物流",
"申请退款"
]
}
async def _handle_order_cancel(
self,
context: ConversationContext
) -> Dict:
"""处理订单取消"""
order_id = context.entities.get("order_id")
if not order_id:
return {
"message": "请提供您要取消的订单号。",
"suggestions": []
}
# 调用订单服务取消
tool = self.tools["cancel_order"]
result = await tool.execute(order_id=order_id)
if result["success"]:
return {
"message": f"订单 {order_id} 已成功取消,款项将在3-5个工作日内退回。",
"suggestions": [
"查看其他商品",
"联系人工客服"
]
}
else:
return {
"message": f"抱歉,订单取消失败:{result.get('reason', '请联系人工客服')}",
"suggestions": [
"联系人工客服",
"查看订单详情"
]
}
async def _handle_complaint(
self,
message: str,
context: ConversationContext
) -> Dict:
"""处理投诉"""
# 创建工单
tool = self.tools["create_ticket"]
ticket = await tool.execute(
title=f"客户投诉 - {context.user_id}",
description=message,
category="complaint",
priority="high"
)
return {
"message": f"""
我已记录您的反馈并创建工单(编号:{ticket['id']})。
我们的客服团队将在24小时内与您联系。
如有紧急问题,请拨打客服热线:400-xxx-xxxx
""",
"suggestions": [
"查看其他问题",
"联系人工客服"
]
}
async def _handle_technical_support(
self,
message: str,
context: ConversationContext
) -> Dict:
"""处理技术支持"""
# 先尝试知识库
relevant_docs = await self.rag.search(
message,
top_k=3,
filter={"category": "technical"}
)
if relevant_docs:
# 检查问题复杂度
complexity = await self._assess_complexity(message, relevant_docs)
if complexity < 0.7:
# 简单问题,知识库解决
answer = await self.llm.generate(
prompt=f"""
用户技术问题:{message}
相关文档:
{chr(10).join([d['content'] for d in relevant_docs])}
请给出详细的解决步骤。
"""
)
return {
"message": answer,
"suggestions": [
"问题解决了吗?",
"还有其他问题吗?"
]
}
# 复杂问题,转人工
return {
"message": """
您好,这个问题需要更专业的技术支持。
我将为您转接技术专员,请稍候...
""",
"requires_handover": True
}
async def _handover_to_human(self, context: ConversationContext) -> Dict:
"""转接人工"""
# 记录转接信息
await self._save_handover_record(context)
return {
"message": """
正在为您转接人工客服,请稍候...
当前排队人数:3人,预计等待时间:2分钟
""",
"requires_handover": True,
"suggestions": []
}
async def _handle_unknown(
self,
message: str,
context: ConversationContext
) -> Dict:
"""处理未知意图"""
clarification = await self.llm.generate(
prompt=f"""
用户消息:{message}
请判断用户的意图,可能的意图包括:
- 咨询问题(FAQ)
- 订单查询
- 投诉建议
- 技术支持
如果无法判断,请友好地询问用户具体需求。
"""
)
return {
"message": clarification,
"suggestions": [
"我可以帮您查询订单",
"我可以回答常见问题",
"我可以为您创建工单"
]
}
async def _get_or_create_context(
self,
user_id: str,
session_id: Optional[str]
) -> ConversationContext:
"""获取或创建对话上下文"""
# 实际实现从缓存/数据库获取
pass
async def _save_context(self, context: ConversationContext):
"""保存对话上下文"""
# 实际实现保存到缓存/数据库
pass
# 意图分类器
class IntentClassifier:
"""基于LLM的意图分类器"""
def __init__(self, llm):
self.llm = llm
self.intents = [i.value for i in Intent]
async def classify(
self,
message: str,
context: ConversationContext
) -> Intent:
"""分类用户意图"""
prompt = f"""
用户消息:{message}
可选意图:
{chr(10).join(self.intents)}
请根据用户消息判断其意图,返回最匹配的一个。
直接返回意图名称,不要有其他内容。
"""
result = await self.llm.generate(prompt)
try:
return Intent(result.strip())
except:
return Intent.UNKNOWN
async def _assess_complexity(
self,
query: str,
documents: List[Dict]
) -> float:
"""评估问题复杂度(0-1)"""
# 简单实现:基于文档数量和匹配度
avg_relevance = sum(d.get("score", 0) for d in documents) / len(documents)
return 1 - avg_relevance
# 工具类定义
class KnowledgeBaseTool:
"""知识库查询工具"""
def __init__(self, config: Dict):
self.endpoint = config["endpoint"]
async def execute(self, query: str, **kwargs) -> List[Dict]:
# 调用知识库API
pass
class OrderServiceTool:
"""订单服务工具"""
def __init__(self, config: Dict, action: str = "query"):
self.endpoint = config["endpoint"]
self.action = action
async def execute(self, **kwargs) -> Dict:
# 调用订单服务API
pass
class TicketTool:
"""工单创建工具"""
def __init__(self, config: Dict):
self.endpoint = config["endpoint"]
async def execute(self, **kwargs) -> Dict:
# 调用工单系统API
pass
class NotificationTool:
"""通知工具"""
def __init__(self, config: Dict):
self.endpoint = config["endpoint"]
async def execute(self, **kwargs) -> Dict:
# 调用通知服务
pass
# RAG引擎
class RAGEngine:
"""检索增强生成引擎"""
def __init__(self, vector_store, embedding_model):
self.vector_store = vector_store
self.embedding_model = embedding_model
async def search(
self,
query: str,
top_k: int = 5,
filter: Dict = None
) -> List[Dict]:
# 向量检索
pass
# Chat LLM封装
class ChatLLM:
"""Chat LLM封装"""
def __init__(self, model: str, api_key: str):
self.model = model
self.api_key = api_key
async def generate(self, prompt: str) -> str:
# 调用LLM API
pass
八、2026年AI Agent技术趋势与展望
8.1 2026年关键技术趋势
据Forbes China发布的《2026 Forbes China AI TECH Enterprises TOP 50》报告,以及Google I/O 2026、百度Create 2026等大会释放的信号,2026年AI Agent领域呈现以下趋势:
| 趋势 | 说明 | 代表产品 |
|---|---|---|
| Agentic AI成熟 | 从单Agent向多Agent协作演进 | Google Gemini Spark、百度DuMate |
| 工具生态完善 | MCP协议推动工具标准化 | Anthropic Claude Code |
| 记忆系统升级 | 分层记忆 + 持续学习 | LangChain Memory组件 |
| 企业级安全 | PII检测、沙箱执行、审计日志 | Kore Artemis平台 |
| DAA指标兴起 | 从Token到日活智能体数 | 百度提出的新度量衡 |
8.2 技术路线图
2026年AI Agent发展路线图:
Q1-Q2:
├── 单Agent能力增强
│ ├── 更强的推理能力(CoT/ToT)
│ ├── 长期记忆支持
│ └── 多模态工具调用
│
├── 多Agent协作成熟
│ ├── Supervisor模式标准化
│ ├── CrewAI风格编排
│ └── 跨Agent知识共享
│
Q3-Q4:
├── 企业级Agent平台
│ ├── 可观测性与监控
│ ├── 安全与合规
│ └── 规模化部署
│
└── 垂直领域Agent
├── 金融Agent
├── 医疗Agent
├── 法律Agent
└── 研发Agent
8.3 开发者建议
对于AI开发者:
- 掌握核心概念:ReAct循环、工具调用、记忆管理是基础
- 选择合适框架:LangGraph适合复杂生产系统,CrewAI适合快速原型
- 重视工具设计:好的工具设计是Agent能力的关键
- 关注安全:输入过滤、输出审核、沙箱执行不可或缺
对于企业决策者:
- 评估场景:并非所有场景都需要Agent,简单问答用LLM即可
- 数据准备:高质量的知识库和工具是Agent效果的前提
- 合规先行:AI监管加强,合规落地是必选项
- 渐进式推进:从简单场景开始,逐步扩展
九、总结
2026年,AI Agent已从概念走向成熟商用。从Google的"智能体Gemini时代"到百度的DAA新度量衡,AI正在完成从"能回答问题"到"能完成任务"的根本性转变。
本文深入剖析了AI Agent的技术架构、核心算法和工程实现:
架构层面:Agent由交互层、核心层、工具层、数据层组成,形成完整的感知-规划-行动-反思闭环
核心能力:ReAct推理循环、Function Calling工具调用、分层记忆系统、自我反思机制
工程实现:LangChain/LangGraph提供标准化开发框架,MCP协议推动工具生态标准化
发展趋势:多Agent协作、企业级安全、垂直领域Agent将成为新的增长点
关键要点:
- AI Agent = 大脑(LLM)+ 记忆 + 工具 + 规划引擎
- ReAct循环让Agent具备"思考后行动"的能力
- 工具生态和记忆系统是Agent能力的关键瓶颈
- 2026年是多Agent协作和企业级Agent平台的元年
参考资料
- Google I/O 2026 - Gemini Omni & Agent Announcements (https://io.google/2026)
- 百度 Create 2026 - DuMate & DAA Metric (https://create.baidu.com)
- Forbes China AI TECH Enterprises TOP 50 (2026)
- LangChain Documentation (https://docs.langchain.com)
- Anthropic Claude Code (https://claude.ai/code)
- ReAct: Synergizing Reasoning and Acting in Language Models (arxiv.org)