2026年AI Agent智能体开发实战:从架构设计到生产部署的完整指南

标签:AI Agent、大模型、智能体架构、MCP协议、A2A协议、Python、Go


前言:为什么2026年是AI Agent元年

2026年5月20日,阿里云峰会与Google I/O同天发布重磅Agent战略,英伟达开源Nemotron 3 Nano Omni模型——三大科技巨头在同一天亮出Agent底牌,这绝非巧合。这是AI产业从"对话助手"向"自主执行"跃迁的历史性拐点。

根据Gartner最新报告,到2026年底,40%的企业应用将嵌入AI Agent,而这一数字在2025年还不到5%。更令人震撼的是,AI Agent的成功案例平均ROI达到171%,但与此同时,79%的Agent项目仍在"PPT阶段"无法落地生产。

本文将从技术架构设计核心代码实现协议生态解读三个维度,手把手教你构建一个生产级的AI Agent系统。文章包含超过40%的代码示例,覆盖Python和Go双语言实现,适合想要真正落地Agent技术的开发者阅读。


一、AI Agent系统架构全景图

1.1 六层架构设计理念

现代AI Agent系统采用六层架构设计,每一层都有明确的职责边界:

┌─────────────────────────────────────────────────────────┐
│                     接入层 (API Gateway)                 │
│    负载均衡 | 认证鉴权 | MCP适配器 | A2A适配器 | 限流熔断 │
├─────────────────────────────────────────────────────────┤
│                     Agent核心层                          │
│  意图理解 | 规划引擎 | 推理引擎 | 记忆系统 | 工具调用 | 执行 │
├─────────────────────────────────────────────────────────┤
│                     协作层 (Multi-Agent)                 │
│  注册中心 | 任务调度 | 协作协调 | 消息总线 | 共识协议      │
├─────────────────────────────────────────────────────────┤
│                     工具与数据层                         │
│   API工具 | 数据库 | 文件系统 | 代码执行 | 向量数据库     │
├─────────────────────────────────────────────────────────┤
│                     安全治理层                           │
│   策略引擎 | 权限控制 | 审计追踪 | 成本控制 | 沙箱隔离     │
└─────────────────────────────────────────────────────────┘

1.2 架构设计核心原则

原则一:松耦合高内聚

  • Agent核心逻辑与外部工具完全解耦
  • 通过标准化协议(MCP/A2A)进行通信
  • 每个Agent可独立开发、测试、部署

原则二:可观测性优先

  • 每一步推理可追溯、可审计
  • 完整记录Token消耗、工具调用、成功/失败率
  • 支持实时监控和异常告警

原则三:安全护栏内置

  • 策略即代码(Policy-as-Code)
  • 所有敏感操作需二次确认
  • 成本上限强制执行

二、Agent核心组件的Python实现

2.1 基础Agent框架

首先,我们定义Agent的基础接口和核心类:

# agent_core.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
import asyncio
import json
import logging
from datetime import datetime

logger = logging.getLogger(__name__)


class AgentStatus(Enum):
    IDLE = "idle"
    THINKING = "thinking"
    EXECUTING = "executing"
    WAITING = "waiting"
    ERROR = "error"
    COMPLETED = "completed"


@dataclass
class Message:
    """对话消息结构"""
    role: str  # "user" | "assistant" | "system" | "tool"
    content: str
    tool_calls: Optional[List[Dict]] = None
    tool_call_id: Optional[str] = None
    metadata: Dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)


@dataclass
class Tool:
    """工具定义"""
    name: str
    description: str
    parameters: Dict[str, Any]  # JSON Schema格式
    handler: Callable  # 实际处理函数
    requires_confirmation: bool = False  # 是否需要二次确认
    max_token_budget: int = 1000  # 最大Token预算


@dataclass
class AgentConfig:
    """Agent配置"""
    model_name: str = "gpt-4o"
    max_steps: int = 10  # 最大推理步数
    max_tokens_per_step: int = 4000
    temperature: float = 0.7
    system_prompt: str = ""
    tools: List[Tool] = field(default_factory=list)
    memory_window: int = 10  # 记忆窗口大小
    cost_limit_per_request: float = 1.0  # 每请求成本上限(美元)


@dataclass
class ExecutionContext:
    """执行上下文"""
    agent_id: str
    user_id: str
    session_id: str
    messages: List[Message] = field(default_factory=list)
    tools_results: Dict[str, Any] = field(default_factory=dict)
    current_step: int = 0
    total_tokens: int = 0
    total_cost: float = 0.0
    status: AgentStatus = AgentStatus.IDLE
    metadata: Dict[str, Any] = field(default_factory=dict)


class BaseAgent(ABC):
    """Agent基类"""
    
    def __init__(self, config: AgentConfig):
        self.config = config
        self._register_tools()
        
    def _register_tools(self):
        """注册工具到LLM"""
        self.tool_schemas = [
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.parameters
                }
            }
            for tool in self.config.tools
        ]
    
    @abstractmethod
    async def understand_intent(self, user_input: str, context: ExecutionContext) -> Dict:
        """理解用户意图"""
        pass
    
    @abstractmethod
    async def plan(self, intent: Dict, context: ExecutionContext) -> List[Dict]:
        """制定执行计划"""
        pass
    
    @abstractmethod
    async def reason(self, state: Dict, context: ExecutionContext) -> Dict:
        """推理下一步行动"""
        pass
    
    async def execute(self, context: ExecutionContext) -> str:
        """主执行循环"""
        try:
            context.status = AgentStatus.THINKING
            
            # 步骤1:理解意图
            intent = await self.understand_intent(
                context.messages[-1].content if context.messages else "",
                context
            )
            
            # 步骤2:规划执行路径
            plan = await self.plan(intent, context)
            
            # 步骤3:执行循环
            for step in plan:
                if context.current_step >= self.config.max_steps:
                    logger.warning(f"达到最大步数限制: {self.config.max_steps}")
                    break
                    
                if context.total_cost >= self.config.cost_limit_per_request:
                    logger.warning(f"达到成本上限: ${context.total_cost:.2f}")
                    break
                
                context.current_step += 1
                context.status = AgentStatus.EXECUTING
                
                # 执行当前步骤
                result = await self._execute_step(step, context)
                
                if result.get("requires_confirmation"):
                    context.status = AgentStatus.WAITING
                    return json.dumps({
                        "status": "awaiting_confirmation",
                        "action": result["action"],
                        "details": result["details"]
                    })
                
                if result.get("error"):
                    logger.error(f"步骤执行错误: {result['error']}")
                    context.status = AgentStatus.ERROR
                    return json.dumps({"status": "error", "message": result["error"]})
            
            context.status = AgentStatus.COMPLETED
            return context.tools_results.get("final_response", "任务完成")
            
        except Exception as e:
            logger.exception("Agent执行异常")
            context.status = AgentStatus.ERROR
            return json.dumps({"status": "error", "message": str(e)})
    
    async def _execute_step(self, step: Dict, context: ExecutionContext) -> Dict:
        """执行单个步骤"""
        tool_name = step.get("tool")
        params = step.get("parameters", {})
        
        # 查找工具
        tool = next((t for t in self.config.tools if t.name == tool_name), None)
        if not tool:
            return {"error": f"未找到工具: {tool_name}"}
        
        # 检查是否需要确认
        if tool.requires_confirmation:
            return {
                "requires_confirmation": True,
                "action": tool_name,
                "details": params
            }
        
        # 执行工具
        try:
            if asyncio.iscoroutinefunction(tool.handler):
                result = await tool.handler(**params)
            else:
                result = tool.handler(**params)
            
            context.tools_results[tool_name] = result
            return {"success": True, "result": result}
            
        except Exception as e:
            return {"error": f"工具执行失败: {str(e)}"}

2.2 记忆系统实现

记忆系统是Agent的"大脑",负责管理短期记忆、长期记忆和工作记忆:

# memory_system.py
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import json
import hashlib


@dataclass
class MemoryItem:
    """记忆条目"""
    content: str
    memory_type: str  # "short_term" | "long_term" | "working"
    importance: float = 0.5  # 0-1
    access_count: int = 0
    last_access: datetime = field(default_factory=datetime.now)
    created_at: datetime = field(default_factory=datetime.now)
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    @property
    def memory_id(self) -> str:
        return hashlib.md5(f"{self.content}{self.created_at}".encode()).hexdigest()[:12]


class VectorStore:
    """向量存储接口(支持多种后端)"""
    
    def __init__(self, backend: str = "qdrant"):
        self.backend = backend
        self._clients = {
            "qdrant": self._init_qdrant,
            "milvus": self._init_milvus,
            "chroma": self._init_chroma,
        }
        
    def _init_qdrant(self):
        # 实际使用时导入 qdrant_client
        pass
    
    def _init_milvus(self):
        pass
    
    def _init_chroma(self):
        pass
    
    async def upsert(self, id: str, vector: List[float], payload: Dict):
        """存储向量"""
        pass
    
    async def search(self, query_vector: List[float], top_k: int = 5) -> List[Dict]:
        """向量检索"""
        pass


class KnowledgeGraph:
    """知识图谱"""
    
    def __init__(self):
        self.nodes: Dict[str, Dict] = {}
        self.edges: Dict[str, List[Dict]] = {}
    
    def add_triple(self, subject: str, predicate: str, obj: str, weight: float = 1.0):
        """添加三元组"""
        if subject not in self.nodes:
            self.nodes[subject] = {"id": subject, "type": "entity"}
        if obj not in self.nodes:
            self.nodes[obj] = {"id": obj, "type": "entity"}
        
        edge_key = f"{subject}|{predicate}"
        if edge_key not in self.edges:
            self.edges[edge_key] = []
        self.edges[edge_key].append({
            "subject": subject,
            "predicate": predicate,
            "object": obj,
            "weight": weight
        })
    
    def query(self, subject: str, predicate: Optional[str] = None) -> List[Dict]:
        """查询三元组"""
        if predicate:
            edge_key = f"{subject}|{predicate}"
            return self.edges.get(edge_key, [])
        else:
            # 返回该节点的所有出边
            results = []
            for key, edges in self.edges.items():
                for edge in edges:
                    if edge["subject"] == subject:
                        results.append(edge)
            return results
    
    def get_related_nodes(self, node_id: str, depth: int = 1) -> List[str]:
        """获取相关节点"""
        related = set()
        current_level = {node_id}
        
        for _ in range(depth):
            next_level = set()
            for node in current_level:
                for edge in self.query(node):
                    next_level.add(edge["object"])
                    related.add(edge["object"])
            current_level = next_level
        
        return list(related)


class TemporalGraph:
    """时序图谱 - 管理时间关联记忆"""
    
    def __init__(self):
        self.timeline: Dict[str, List[MemoryItem]] = {}  # date -> memories
        self.index: Dict[str, List[str]] = {}  # memory_id -> dates
    
    def add_memory(self, memory: MemoryItem):
        """添加带时间戳的记忆"""
        date_key = memory.created_at.strftime("%Y-%m-%d")
        
        if date_key not in self.timeline:
            self.timeline[date_key] = []
        self.timeline[date_key].append(memory)
        
        if memory.memory_id not in self.index:
            self.index[memory.memory_id] = []
        self.index[memory.memory_id].append(date_key)
    
    def get_memories_in_range(self, start: datetime, end: datetime) -> List[MemoryItem]:
        """获取时间范围内的记忆"""
        results = []
        current = start
        while current <= end:
            date_key = current.strftime("%Y-%m-%d")
            if date_key in self.timeline:
                results.extend(self.timeline[date_key])
            current += timedelta(days=1)
        return results
    
    def get_temporal_neighbors(self, memory: MemoryItem, hours: int = 24) -> List[MemoryItem]:
        """获取时间相邻的记忆"""
        start = memory.created_at - timedelta(hours=hours)
        end = memory.created_at + timedelta(hours=hours)
        return self.get_memories_in_range(start, end)


class HybridMemory:
    """混合记忆系统 - 向量 + 图 + 时序"""
    
    def __init__(self, config: Dict):
        self.short_term_capacity = config.get("short_term_capacity", 100)
        self.long_term_threshold = config.get("long_term_threshold", 0.7)
        self.consolidation_interval = config.get("consolidation_interval", 100)
        
        # 存储组件
        self.short_term: List[MemoryItem] = []
        self.long_term: List[MemoryItem] = []
        self.vector_store = VectorStore(config.get("vector_backend", "qdrant"))
        self.knowledge_graph = KnowledgeGraph()
        self.temporal_graph = TemporalGraph()
        
        # 访问统计
        self.access_stats: Dict[str, int] = {}
    
    def add(self, content: str, memory_type: str = "short_term", 
            importance: float = 0.5, metadata: Dict = None) -> MemoryItem:
        """添加记忆"""
        memory = MemoryItem(
            content=content,
            memory_type=memory_type,
            importance=importance,
            metadata=metadata or {}
        )
        
        if memory_type == "short_term":
            self.short_term.append(memory)
            # 检查是否需要淘汰到长期记忆
            if len(self.short_term) > self.short_term_capacity:
                self._consolidate_memory()
        else:
            self.long_term.append(memory)
        
        # 存入向量数据库
        vector = self._embed(content)  # 需要接入embedding服务
        self.vector_store.upsert(memory.memory_id, vector, {
            "content": content,
            "memory_type": memory_type,
            "importance": importance
        })
        
        # 存入时序图谱
        self.temporal_graph.add_memory(memory)
        
        # 更新知识图谱(实体抽取)
        self._extract_entities(content, memory)
        
        return memory
    
    def retrieve(self, query: str, top_k: int = 5, 
                 memory_types: List[str] = None) -> List[MemoryItem]:
        """检索记忆"""
        query_vector = self._embed(query)
        vector_results = self.vector_store.search(query_vector, top_k)
        
        results = []
        for r in vector_results:
            memory_id = r["id"]
            # 从short_term或long_term中查找
            memory = self._find_memory_by_id(memory_id)
            if memory and (memory_types is None or memory.memory_type in memory_types):
                memory.access_count += 1
                memory.last_access = datetime.now()
                results.append(memory)
        
        return results
    
    def retrieve_by_context(self, current_memory: MemoryItem, 
                           time_window_hours: int = 24) -> List[MemoryItem]:
        """基于上下文检索(时间邻近 + 语义相似)"""
        # 时间邻近
        temporal_neighbors = self.temporal_graph.get_temporal_neighbors(
            current_memory, time_window_hours
        )
        
        # 语义相似
        semantic_neighbors = self.retrieve(
            current_memory.content, top_k=5
        )
        
        # 合并去重
        seen_ids = set()
        combined = []
        for m in temporal_neighbors + semantic_neighbors:
            if m.memory_id not in seen_ids:
                seen_ids.add(m.memory_id)
                combined.append(m)
        
        return combined[:10]
    
    def _consolidate_memory(self):
        """记忆整合:将短期记忆转化为长期记忆"""
        if not self.short_term:
            return
        
        # 选择最重要且最近访问的记忆
        candidates = sorted(
            self.short_term,
            key=lambda m: m.importance * (1 + 0.1 * m.access_count),
            reverse=True
        )[:10]
        
        for memory in candidates:
            if memory.importance >= self.long_term_threshold:
                memory.memory_type = "long_term"
                self.long_term.append(memory)
                self.short_term.remove(memory)
                
                # 建立关联(与相似记忆建立边)
                similar = self.retrieve(memory.content, top_k=3)
                for s in similar:
                    self.knowledge_graph.add_triple(
                        memory.memory_id, "related_to", s.memory_id,
                        weight=0.8
                    )
    
    def _embed(self, text: str) -> List[float]:
        """文本嵌入(需接入embedding服务)"""
        # 实际实现时调用 OpenAI/Cohere 等embedding API
        import hashlib
        # 简化实现,返回模拟向量
        h = hashlib.sha256(text.encode()).digest()
        return [float(b) / 255.0 for b in h[:32]]
    
    def _find_memory_by_id(self, memory_id: str) -> Optional[MemoryItem]:
        """根据ID查找记忆"""
        for memory in self.short_term + self.long_term:
            if memory.memory_id == memory_id:
                return memory
        return None
    
    def _extract_entities(self, content: str, memory: MemoryItem):
        """提取实体并更新知识图谱"""
        # 简化实现:实际需要NER模型
        words = content.split()
        for word in words:
            if word[0].isupper() and len(word) > 1:
                self.knowledge_graph.add_triple(
                    memory.memory_id, "mentions", word
                )
    
    def get_context_for_llm(self, recent_messages: List[str], 
                           max_memories: int = 5) -> str:
        """生成LLM可用的上下文"""
        memories = []
        for msg in recent_messages:
            retrieved = self.retrieve(msg, top_k=2)
            memories.extend(retrieved)
        
        # 去重并限制数量
        seen = set()
        unique_memories = []
        for m in memories:
            if m.memory_id not in seen:
                seen.add(m.memory_id)
                unique_memories.append(m)
                if len(unique_memories) >= max_memories:
                    break
        
        if not unique_memories:
            return "无相关记忆"
        
        context_parts = []
        for m in unique_memories:
            context_parts.append(f"- [{m.memory_type}] {m.content}")
        
        return "相关记忆:\n" + "\n".join(context_parts)

2.3 工具调用引擎实现

工具调用是Agent与外部世界交互的桥梁:

# tool_engine.py
import asyncio
import json
import re
from typing import Dict, Any, Callable, Optional, List
from dataclasses import dataclass
from abc import ABC, abstractmethod
import aiohttp
import sqlite3
from pathlib import Path


@dataclass
class ToolCall:
    """工具调用请求"""
    tool_name: str
    parameters: Dict[str, Any]
    call_id: str
    timestamp: float


@dataclass
class ToolResult:
    """工具执行结果"""
    call_id: str
    success: bool
    result: Any
    error: Optional[str] = None
    tokens_used: int = 0
    execution_time: float = 0.0


class BaseTool(ABC):
    """工具基类"""
    
    def __init__(self, name: str, description: str):
        self.name = name
        self.description = description
    
    @abstractmethod
    async def execute(self, **kwargs) -> Any:
        """执行工具"""
        pass
    
    @abstractmethod
    def get_schema(self) -> Dict:
        """返回JSON Schema"""
        pass


class DatabaseTool(BaseTool):
    """数据库查询工具"""
    
    def __init__(self, db_path: str = ":memory:"):
        super().__init__(
            name="query_database",
            description="执行SQL查询并返回结果"
        )
        self.db_path = db_path
        self._init_db()
    
    def _init_db(self):
        """初始化示例数据库"""
        self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
        cursor = self.conn.cursor()
        
        # 创建示例表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                name TEXT,
                email TEXT,
                department TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS orders (
                id INTEGER PRIMARY KEY,
                user_id INTEGER,
                product TEXT,
                amount REAL,
                status TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (user_id) REFERENCES users(id)
            )
        """)
        
        # 插入示例数据
        cursor.execute("DELETE FROM users")
        cursor.execute("DELETE FROM orders")
        
        users = [
            (1, "张三", "zhangsan@company.com", "技术部"),
            (2, "李四", "lisi@company.com", "产品部"),
            (3, "王五", "wangwu@company.com", "销售部"),
        ]
        cursor.executemany("INSERT INTO users VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)", users)
        
        orders = [
            (1, 1, "MacBook Pro", 24999.00, "已完成"),
            (2, 1, "iPhone 15", 8999.00, "已完成"),
            (3, 2, "iPad Pro", 7999.00, "处理中"),
            (4, 3, "AirPods", 1999.00, "已完成"),
        ]
        cursor.executemany(
            "INSERT INTO orders VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)", orders
        )
        
        self.conn.commit()
    
    async def execute(self, sql: str, params: tuple = ()) -> Dict:
        """执行SQL查询"""
        try:
            # 安全检查:只允许SELECT语句
            if not sql.strip().upper().startswith("SELECT"):
                return {"error": "只允许执行SELECT语句"}
            
            cursor = self.conn.cursor()
            cursor.execute(sql, params)
            rows = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]
            
            return {
                "columns": columns,
                "rows": [dict(zip(columns, row)) for row in rows],
                "row_count": len(rows)
            }
        except Exception as e:
            return {"error": str(e)}
    
    def get_schema(self) -> Dict:
        return {
            "name": self.name,
            "description": self.description,
            "parameters": {
                "type": "object",
                "properties": {
                    "sql": {
                        "type": "string",
                        "description": "SQL查询语句(仅支持SELECT)"
                    },
                    "params": {
                        "type": "array",
                        "description": "查询参数",
                        "items": {"type": "string"}
                    }
                },
                "required": ["sql"]
            }
        }


class HTTPRequestTool(BaseTool):
    """HTTP请求工具"""
    
    def __init__(self, timeout: int = 30):
        super().__init__(
            name="http_request",
            description="发送HTTP请求获取外部数据"
        )
        self.timeout = timeout
        self.session: Optional[aiohttp.ClientSession] = None
    
    async def execute(self, method: str, url: str, 
                     headers: Dict = None, json_body: Dict = None,
                     params: Dict = None) -> Dict:
        """发送HTTP请求"""
        try:
            if not self.session:
                self.session = aiohttp.ClientSession()
            
            async with self.session.request(
                method=method.upper(),
                url=url,
                headers=headers,
                json=json_body,
                params=params,
                timeout=aiohttp.ClientTimeout(total=self.timeout)
            ) as response:
                content = await response.text()
                
                return {
                    "status_code": response.status,
                    "headers": dict(response.headers),
                    "body": content[:10000],  # 限制返回长度
                    "url": str(response.url)
                }
        except aiohttp.ClientError as e:
            return {"error": f"HTTP请求失败: {str(e)}"}
        except asyncio.TimeoutError:
            return {"error": "请求超时"}
    
    def get_schema(self) -> Dict:
        return {
            "name": self.name,
            "description": self.description,
            "parameters": {
                "type": "object",
                "properties": {
                    "method": {
                        "type": "string",
                        "enum": ["GET", "POST", "PUT", "DELETE"],
                        "description": "HTTP方法"
                    },
                    "url": {
                        "type": "string",
                        "description": "请求URL"
                    },
                    "headers": {
                        "type": "object",
                        "description": "请求头"
                    },
                    "json_body": {
                        "type": "object",
                        "description": "JSON请求体"
                    },
                    "params": {
                        "type": "object",
                        "description": "URL查询参数"
                    }
                },
                "required": ["method", "url"]
            }
        }


class CodeExecutionTool(BaseTool):
    """代码执行工具(沙箱环境)"""
    
    def __init__(self, timeout: int = 10):
        super().__init__(
            name="execute_code",
            description="在沙箱环境中执行Python代码"
        )
        self.timeout = timeout
        self.execution_count = 0
    
    async def execute(self, code: str, language: str = "python") -> Dict:
        """执行代码"""
        import traceback
        
        if language != "python":
            return {"error": f"不支持的语言: {language}"}
        
        self.execution_count += 1
        
        # 限制执行次数防止滥用
        if self.execution_count > 100:
            return {"error": "执行次数超限"}
        
        # 捕获输出
        from io import StringIO
        import sys
        
        old_stdout = sys.stdout
        sys.stdout = StringIO()
        
        try:
            # 在异步上下文中执行代码
            result = await asyncio.wait_for(
                asyncio.get_event_loop().run_in_executor(
                    None, exec, compile(code, "<string>", "exec")
                ),
                timeout=self.timeout
            )
            
            output = sys.stdout.getvalue()
            sys.stdout = old_stdout
            
            return {
                "success": True,
                "output": output,
                "execution_id": self.execution_count
            }
            
        except asyncio.TimeoutError:
            sys.stdout = old_stdout
            return {"error": f"代码执行超时({self.timeout}秒)"}
            
        except Exception as e:
            sys.stdout = old_stdout
            return {
                "success": False,
                "error": str(e),
                "traceback": traceback.format_exc()
            }
    
    def get_schema(self) -> Dict:
        return {
            "name": self.name,
            "description": self.description,
            "parameters": {
                "type": "object",
                "properties": {
                    "code": {
                        "type": "string",
                        "description": "要执行的Python代码"
                    },
                    "language": {
                        "type": "string",
                        "enum": ["python"],
                        "default": "python"
                    }
                },
                "required": ["code"]
            }
        }


class FileTool(BaseTool):
    """文件操作工具"""
    
    def __init__(self, base_path: str = "./workspace"):
        super().__init__(
            name="file_operations",
            description="读写文件系统的文件"
        )
        self.base_path = Path(base_path)
        self.base_path.mkdir(parents=True, exist_ok=True)
        self.allowed_extensions = {".txt", ".md", ".json", ".csv", ".py", ".yaml", ".yml"}
    
    async def execute(self, operation: str, path: str, 
                     content: str = None, encoding: str = "utf-8") -> Dict:
        """文件操作"""
        file_path = self.base_path / path
        
        # 路径安全检查
        try:
            file_path.resolve().relative_to(self.base_path.resolve())
        except ValueError:
            return {"error": "路径不允许在工作目录外"}
        
        # 扩展名检查
        if file_path.suffix and file_path.suffix not in self.allowed_extensions:
            return {"error": f"不支持的文件类型: {file_path.suffix}"}
        
        try:
            if operation == "read":
                if not file_path.exists():
                    return {"error": "文件不存在"}
                return {
                    "content": file_path.read_text(encoding=encoding),
                    "size": file_path.stat().st_size
                }
                
            elif operation == "write":
                file_path.parent.mkdir(parents=True, exist_ok=True)
                file_path.write_text(content or "", encoding=encoding)
                return {
                    "success": True,
                    "path": str(file_path),
                    "size": file_path.stat().st_size
                }
                
            elif operation == "list":
                if not file_path.is_dir():
                    return {"error": "路径不是目录"}
                files = [
                    {"name": f.name, "type": "dir" if f.is_dir() else "file"}
                    for f in file_path.iterdir()
                ]
                return {"files": files}
                
            elif operation == "exists":
                return {"exists": file_path.exists()}
                
            else:
                return {"error": f"未知操作: {operation}"}
                
        except Exception as e:
            return {"error": str(e)}
    
    def get_schema(self) -> Dict:
        return {
            "name": self.name,
            "description": self.description,
            "parameters": {
                "type": "object",
                "properties": {
                    "operation": {
                        "type": "string",
                        "enum": ["read", "write", "list", "exists"],
                        "description": "操作类型"
                    },
                    "path": {
                        "type": "string",
                        "description": "文件路径(相对于工作目录)"
                    },
                    "content": {
                        "type": "string",
                        "description": "文件内容(write操作时使用)"
                    },
                    "encoding": {
                        "type": "string",
                        "default": "utf-8"
                    }
                },
                "required": ["operation", "path"]
            }
        }


class ToolEngine:
    """工具调用引擎"""
    
    def __init__(self):
        self.tools: Dict[str, BaseTool] = {}
        self.execution_log: List[Dict] = []
    
    def register(self, tool: BaseTool):
        """注册工具"""
        self.tools[tool.name] = tool
        print(f"✓ 工具注册成功: {tool.name}")
    
    def get_tool_schemas(self) -> List[Dict]:
        """获取所有工具的Schema"""
        return [tool.get_schema() for tool in self.tools.values()]
    
    async def execute(self, tool_name: str, parameters: Dict,
                     call_id: str = None) -> ToolResult:
        """执行工具调用"""
        import time
        
        start_time = time.time()
        call_id = call_id or f"call_{int(start_time * 1000)}"
        
        tool = self.tools.get(tool_name)
        if not tool:
            return ToolResult(
                call_id=call_id,
                success=False,
                error=f"工具不存在: {tool_name}"
            )
        
        try:
            result = await tool.execute(**parameters)
            
            # 记录执行日志
            log_entry = {
                "call_id": call_id,
                "tool_name": tool_name,
                "parameters": parameters,
                "success": "error" not in result,
                "timestamp": start_time,
                "execution_time": time.time() - start_time
            }
            self.execution_log.append(log_entry)
            
            return ToolResult(
                call_id=call_id,
                success="error" not in result,
                result=result,
                error=result.get("error"),
                execution_time=time.time() - start_time
            )
            
        except Exception as e:
            return ToolResult(
                call_id=call_id,
                success=False,
                error=str(e),
                execution_time=time.time() - start_time
            )
    
    def parse_llm_response(self, response: str) -> List[ToolCall]:
        """解析LLM返回的工具调用"""
        tool_calls = []
        
        # 匹配 ```tool_call ... ``` 格式
        pattern = r'```tool_call\s*\n(.*?)\n```'
        matches = re.findall(pattern, response, re.DOTALL)
        
        for match in matches:
            try:
                data = json.loads(match)
                tool_calls.append(ToolCall(
                    tool_name=data["name"],
                    parameters=data.get("parameters", {}),
                    call_id=f"call_{len(tool_calls)}"
                ))
            except json.JSONDecodeError:
                continue
        
        return tool_calls
    
    def get_statistics(self) -> Dict:
        """获取工具使用统计"""
        stats = {}
        for entry in self.execution_log:
            tool_name = entry["tool_name"]
            if tool_name not in stats:
                stats[tool_name] = {"total": 0, "success": 0, "total_time": 0}
            stats[tool_name]["total"] += 1
            if entry["success"]:
                stats[tool_name]["success"] += 1
            stats[tool_name]["total_time"] += entry["execution_time"]
        
        return stats

三、Multi-Agent协作系统的Go实现

3.1 Agent注册与发现

// agent/registry.go
package agent

import (
	"context"
	"encoding/json"
	"fmt"
	"sync"
	"time"
)

// Capability 定义Agent的能力
type Capability struct {
	Name        string            `json:"name"`
	Description string            `json:"description"`
	InputTypes  []string          `json:"input_types"`
	OutputTypes []string          `json:"output_types"`
	Metadata    map[string]string `json:"metadata"`
}

// AgentInfo Agent注册信息
type AgentInfo struct {
	ID          string       `json:"id"`
	Name        string       `json:"name"`
	Description string       `json:"description"`
	Version     string       `json:"version"`
	Capabilities []Capability `json:"capabilities"`
	Endpoint    string       `json:"endpoint"`
	Status      string       `json:"status"` // "active", "busy", "offline"
	Load        int          `json:"load"`    // 当前任务负载
	MaxLoad     int          `json:"max_load"`
	Heartbeat   time.Time    `json:"heartbeat"`
	CreatedAt   time.Time    `json:"created_at"`
	Metadata    map[string]string `json:"metadata"`
}

// AgentRegistry Agent注册中心
type AgentRegistry struct {
	mu         sync.RWMutex
	agents     map[string]*AgentInfo
	byCapability map[string][]string // capability name -> agent IDs
	byStatus    map[string][]string // status -> agent IDs
	
	// 事件通知
	eventCh chan AgentEvent
	ctx     context.Context
	cancel  context.CancelFunc
}

// AgentEvent Agent事件
type AgentEvent struct {
	Type    string    `json:"type"` // "registered", "unregistered", "status_changed"
	AgentID string    `json:"agent_id"`
	Info    *AgentInfo `json:"info,omitempty"`
	Time    time.Time `json:"time"`
}

// NewAgentRegistry 创建注册中心
func NewAgentRegistry(ctx context.Context) *AgentRegistry {
	ctx, cancel := context.WithCancel(ctx)
	reg := &AgentRegistry{
		agents:         make(map[string]*AgentInfo),
		byCapability:  make(map[string][]string),
		byStatus:       make(map[string][]string),
		eventCh:        make(chan AgentEvent, 100),
		ctx:            ctx,
		cancel:         cancel,
	}
	
	// 启动心跳检测
	go reg.heartbeatChecker()
	
	return reg
}

// Register 注册Agent
func (r *AgentRegistry) Register(info *AgentInfo) error {
	r.mu.Lock()
	defer r.mu.Unlock()
	
	// 检查是否已存在
	if existing, ok := r.agents[info.ID]; ok && existing.Status != "offline" {
		return fmt.Errorf("agent %s already registered and active", info.ID)
	}
	
	info.CreatedAt = time.Now()
	info.Heartbeat = time.Now()
	info.Status = "active"
	
	r.agents[info.ID] = info
	
	// 更新能力索引
	for _, cap := range info.Capabilities {
		r.byCapability[cap.Name] = append(r.byCapability[cap.Name], info.ID)
	}
	
	// 更新状态索引
	r.byStatus["active"] = append(r.byStatus["active"], info.ID)
	
	// 发送事件
	select {
	case r.eventCh <- AgentEvent{Type: "registered", AgentID: info.ID, Info: info, Time: time.Now()}:
	default:
	}
	
	return nil
}

// Unregister 注销Agent
func (r *AgentRegistry) Unregister(agentID string) error {
	r.mu.Lock()
	defer r.mu.Unlock()
	
	info, ok := r.agents[agentID]
	if !ok {
		return fmt.Errorf("agent %s not found", agentID)
	}
	
	// 从索引中移除
	r.removeFromIndices(agentID, info)
	
	// 标记为离线而非删除(保留历史)
	info.Status = "offline"
	
	// 发送事件
	select {
	case r.eventCh <- AgentEvent{Type: "unregistered", AgentID: agentID, Time: time.Now()}:
	default:
	}
	
	return nil
}

// UpdateStatus 更新Agent状态
func (r *AgentRegistry) UpdateStatus(agentID, status string) error {
	r.mu.Lock()
	defer r.mu.Unlock()
	
	info, ok := r.agents[agentID]
	if !ok {
		return fmt.Errorf("agent %s not found", agentID)
	}
	
	oldStatus := info.Status
	info.Status = status
	
	// 更新状态索引
	r.byStatus[oldStatus] = removeFromSlice(r.byStatus[oldStatus], agentID)
	r.byStatus[status] = append(r.byStatus[status], agentID)
	
	// 发送事件
	select {
	case r.eventCh <- AgentEvent{
		Type: "status_changed",
		AgentID: agentID,
		Info: info,
		Time: time.Now(),
	}:
	default:
	}
	
	return nil
}

// FindByCapability 根据能力查找Agent
func (r *AgentRegistry) FindByCapability(capability string) []*AgentInfo {
	r.mu.RLock()
	defer r.mu.RUnlock()
	
	var result []*AgentInfo
	for _, agentID := range r.byCapability[capability] {
		if agent := r.agents[agentID]; agent != nil && agent.Status == "active" {
			result = append(result, agent)
		}
	}
	
	return result
}

// FindByID 根据ID查找Agent
func (r *AgentRegistry) FindByID(agentID string) *AgentInfo {
	r.mu.RLock()
	defer r.mu.RUnlock()
	
	return r.agents[agentID]
}

// ListAll 获取所有Agent
func (r *AgentRegistry) ListAll() []*AgentInfo {
	r.mu.RLock()
	defer r.mu.RUnlock()
	
	result := make([]*AgentInfo, 0, len(r.agents))
	for _, info := range r.agents {
		result = append(result, info)
	}
	
	return result
}

// FindBestAgent 根据能力找到最佳Agent(负载最低)
func (r *AgentRegistry) FindBestAgent(capability string) *AgentInfo {
	agents := r.FindByCapability(capability)
	if len(agents) == 0 {
		return nil
	}
	
	best := agents[0]
	for _, agent := range agents[1:] {
		if agent.Load < best.Load {
			best = agent
		}
	}
	
	return best
}

// Heartbeat 更新心跳
func (r *AgentRegistry) Heartbeat(agentID string) error {
	r.mu.Lock()
	defer r.mu.Unlock()
	
	info, ok := r.agents[agentID]
	if !ok {
		return fmt.Errorf("agent %s not found", agentID)
	}
	
	info.Heartbeat = time.Now()
	return nil
}

// heartbeatChecker 定期检查Agent心跳
func (r *AgentRegistry) heartbeatChecker() {
	ticker := time.NewTicker(30 * time.Second)
	defer ticker.Stop()
	
	for {
		select {
		case <-r.ctx.Done():
			return
		case <-ticker.C:
			r.checkHeartbeats()
		}
	}
}

func (r *AgentRegistry) checkHeartbeats() {
	r.mu.Lock()
	defer r.mu.Unlock()
	
	threshold := time.Now().Add(-2 * time.Minute)
	
	for id, agent := range r.agents {
		if agent.Status == "active" && agent.Heartbeat.Before(threshold) {
			// 心跳超时,标记为离线
			r.removeFromIndices(id, agent)
			agent.Status = "offline"
		}
	}
}

func (r *AgentRegistry) removeFromIndices(agentID string, info *AgentInfo) {
	// 从能力索引移除
	for _, cap := range info.Capabilities {
		r.byCapability[cap.Name] = removeFromSlice(r.byCapability[cap.Name], agentID)
	}
	
	// 从状态索引移除
	r.byStatus[info.Status] = removeFromSlice(r.byStatus[info.Status], agentID)
}

// EventChannel 获取事件通道
func (r *AgentRegistry) EventChannel() <-chan AgentEvent {
	return r.eventCh
}

// GetStats 获取注册统计
func (r *AgentRegistry) GetStats() map[string]int {
	r.mu.RLock()
	defer r.mu.RUnlock()
	
	stats := map[string]int{
		"total":    len(r.agents),
		"active":   len(r.byStatus["active"]),
		"busy":     len(r.byStatus["busy"]),
		"offline":  len(r.byStatus["offline"]),
	}
	
	for cap, agents := range r.byCapability {
		stats["capability_"+cap] = len(agents)
	}
	
	return stats
}

// 辅助函数:从切片中移除元素
func removeFromSlice(slice []string, item string) []string {
	result := make([]string, 0, len(slice))
	for _, s := range slice {
		if s != item {
			result = append(result, s)
		}
	}
	return result
}

// MarshalJSON 序列化AgentInfo
func (a *AgentInfo) MarshalJSON() ([]byte, error) {
	type Alias AgentInfo
	return json.Marshal((*Alias)(a))
}

3.2 任务调度器实现

// scheduler/dispatcher.go
package scheduler

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"sync"
	"time"
)

// TaskPriority 任务优先级
type TaskPriority int

const (
	PriorityLow TaskPriority = iota
	PriorityNormal
	PriorityHigh
	PriorityUrgent
)

// TaskStatus 任务状态
type TaskStatus string

const (
	TaskStatusPending   TaskStatus = "pending"
	TaskStatusRunning   TaskStatus = "running"
	TaskStatusCompleted TaskStatus = "completed"
	TaskStatusFailed    TaskStatus = "failed"
	TaskStatusCanceled  TaskStatus = "canceled"
)

// Task 任务定义
type Task struct {
	ID          string            `json:"id"`
	Type        string            `json:"type"`
	Priority    TaskPriority      `json:"priority"`
	Status      TaskStatus        `json:"status"`
	Payload     json.RawMessage   `json:"payload"`
	RequiredCaps []string         `json:"required_capabilities"`
	MaxRetries  int               `json:"max_retries"`
	Retries     int               `json:"retries"`
	Timeout     time.Duration     `json:"timeout"`
	AssignedTo  string            `json:"assigned_to,omitempty"`
	CreatedAt   time.Time         `json:"created_at"`
	StartedAt   *time.Time        `json:"started_at,omitempty"`
	CompletedAt *time.Time        `json:"completed_at,omitempty"`
	Result      json.RawMessage   `json:"result,omitempty"`
	Error       string            `json:"error,omitempty"`
	Metadata    map[string]string `json:"metadata"`
}

// TaskEvent 任务事件
type TaskEvent struct {
	Type    string    `json:"type"`
	TaskID  string    `json:"task_id"`
	AgentID string    `json:"agent_id,omitempty"`
	Time    time.Time `json:"time"`
	Data    any       `json:"data,omitempty"`
}

// TaskResult 任务执行结果
type TaskResult struct {
	TaskID  string          `json:"task_id"`
	Success bool            `json:"success"`
	Output  json.RawMessage `json:"output,omitempty"`
	Error   string          `json:"error,omitempty"`
}

// TaskHandler 任务处理器接口
type TaskHandler interface {
	CanHandle(task *Task) bool
	Handle(ctx context.Context, task *Task) (*TaskResult, error)
}

// Dispatcher 任务调度器
type Dispatcher struct {
	// 配置
	maxQueueSize    int
	maxRetries      int
	defaultTimeout  time.Duration
	
	// 组件
	taskQueue   *PriorityQueue
	registry    AgentRegistryInterface
	taskHandlers []TaskHandler
	eventCh     chan TaskEvent
	
	// 状态
	mu         sync.RWMutex
	tasks      map[string]*Task
	running    map[string]context.CancelFunc // taskID -> cancel func
	stats      DispatcherStats
	
	ctx    context.Context
	cancel context.CancelFunc
	wg     sync.WaitGroup
}

// AgentRegistryInterface 注册中心接口
type AgentRegistryInterface interface {
	FindBestAgent(capability string) *AgentInfo
	Heartbeat(agentID string) error
	UpdateStatus(agentID, status string) error
}

// DispatcherStats 调度统计
type DispatcherStats struct {
	TotalSubmitted int64 `json:"total_submitted"`
	TotalCompleted int64 `json:"total_completed"`
	TotalFailed    int64 `json:"total_failed"`
	QueueSize      int   `json:"queue_size"`
	RunningTasks   int   `json:"running_tasks"`
}

// NewDispatcher 创建调度器
func NewDispatcher(ctx context.Context, registry AgentRegistryInterface) *Dispatcher {
	ctx, cancel := context.WithCancel(ctx)
	
	d := &Dispatcher{
		maxQueueSize:   10000,
		maxRetries:     3,
		defaultTimeout: 5 * time.Minute,
		taskQueue:      NewPriorityQueue(10000),
		registry:       registry,
		taskHandlers:   make([]TaskHandler, 0),
		eventCh:        make(chan TaskEvent, 1000),
		tasks:          make(map[string]*Task),
		running:        make(map[string]context.CancelFunc),
		ctx:            ctx,
		cancel:         cancel,
	}
	
	// 启动调度循环
	d.wg.Add(1)
	go d.dispatchLoop()
	
	// 启动统计更新
	go d.statsUpdater()
	
	return d
}

// RegisterHandler 注册任务处理器
func (d *Dispatcher) RegisterHandler(handler TaskHandler) {
	d.taskHandlers = append(d.taskHandlers, handler)
}

// Submit 提交任务
func (d *Dispatcher) Submit(task *Task) error {
	d.mu.Lock()
	defer d.mu.Unlock()
	
	// 设置默认值
	if task.ID == "" {
		task.ID = fmt.Sprintf("task_%d", time.Now().UnixNano())
	}
	if task.Timeout == 0 {
		task.Timeout = d.defaultTimeout
	}
	if task.MaxRetries == 0 {
		task.MaxRetries = d.maxRetries
	}
	
	task.Status = TaskStatusPending
	task.CreatedAt = time.Now()
	task.Metadata = make(map[string]string)
	
	d.tasks[task.ID] = task
	d.stats.TotalSubmitted++
	d.stats.QueueSize++
	
	// 加入优先队列
	d.taskQueue.Push(task)
	
	// 发送事件
	d.sendEvent(TaskEvent{
		Type:   "task_submitted",
		TaskID: task.ID,
		Time:   time.Now(),
	})
	
	return nil
}

// Cancel 取消任务
func (d *Dispatcher) Cancel(taskID string) error {
	d.mu.Lock()
	defer d.mu.Unlock()
	
	task, ok := d.tasks[taskID]
	if !ok {
		return fmt.Errorf("task %s not found", taskID)
	}
	
	if task.Status == TaskStatusCompleted || task.Status == TaskStatusCanceled {
		return fmt.Errorf("task %s cannot be canceled (status: %s)", taskID, task.Status)
	}
	
	// 如果正在运行,取消上下文
	if cancel, ok := d.running[taskID]; ok {
		cancel()
		delete(d.running, taskID)
	}
	
	task.Status = TaskStatusCanceled
	task.CompletedAt = timePtr(time.Now())
	
	d.sendEvent(TaskEvent{
		Type:   "task_canceled",
		TaskID: taskID,
		Time:   time.Now(),
	})
	
	return nil
}

// GetTask 获取任务状态
func (d *Dispatcher) GetTask(taskID string) *Task {
	d.mu.RLock()
	defer d.mu.RUnlock()
	return d.tasks[taskID]
}

// dispatchLoop 任务分发循环
func (d *Dispatcher) dispatchLoop() {
	defer d.wg.Done()
	
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()
	
	for {
		select {
		case <-d.ctx.Done():
			return
		case <-ticker.C:
			d.dispatchReadyTasks()
		}
	}
}

func (d *Dispatcher) dispatchReadyTasks() {
	d.mu.Lock()
	defer d.mu.Unlock()
	
	// 获取可用的Agent
	availableAgents := d.getAvailableAgents()
	if len(availableAgents) == 0 {
		return
	}
	
	// 尝试分配任务
	for _, agent := range availableAgents {
		if d.taskQueue.IsEmpty() {
			break
		}
		
		task := d.taskQueue.Pop()
		if task == nil {
			continue
		}
		
		// 检查Agent是否有任务要求的能力
		if !d.agentHasCapabilities(agent, task.RequiredCaps) {
			// 放回队列
			d.taskQueue.Push(task)
			continue
		}
		
		// 分配任务
		d.assignTask(task, agent)
	}
}

func (d *Dispatcher) getAvailableAgents() []*AgentInfo {
	// 从注册中心获取可用的Agent
	// 简化实现:获取所有active且负载低的Agent
	return nil // 实际需要调用 registry.FindByCapability
}

func (d *Dispatcher) agentHasCapabilities(agent *AgentInfo, required []string) bool {
	agentCaps := make(map[string]bool)
	for _, cap := range agent.Capabilities {
		agentCaps[cap.Name] = true
	}
	
	for _, req := range required {
		if !agentCaps[req] {
			return false
		}
	}
	return true
}

func (d *Dispatcher) assignTask(task *Task, agent *AgentInfo) {
	task.Status = TaskStatusRunning
	task.AssignedTo = agent.ID
	now := time.Now()
	task.StartedAt = &now
	
	d.stats.QueueSize--
	d.stats.RunningTasks++
	
	// 更新Agent状态
	d.registry.UpdateStatus(agent.ID, "busy")
	
	// 创建带超时的上下文
	ctx, cancel := context.WithTimeout(d.ctx, task.Timeout)
	d.running[task.ID] = cancel
	
	// 发送任务分配事件
	d.sendEvent(TaskEvent{
		Type:    "task_assigned",
		TaskID:  task.ID,
		AgentID: agent.ID,
		Time:    time.Now(),
	})
	
	// 异步执行任务
	go d.executeTask(ctx, task, agent)
}

func (d *Dispatcher) executeTask(ctx context.Context, task *Task, agent *AgentInfo) {
	defer func() {
		if r := recover(); r != nil {
			log.Printf("Task %s panic: %v", task.ID, r)
			d.completeTask(task, nil, fmt.Sprintf("panic: %v", r))
		}
	}()
	
	// 找到合适的处理器
	var handler TaskHandler
	for _, h := range d.taskHandlers {
		if h.CanHandle(task) {
			handler = h
			break
		}
	}
	
	if handler == nil {
		d.completeTask(task, nil, "no handler found")
		return
	}
	
	// 执行任务
	result, err := handler.Handle(ctx, task)
	
	if err != nil {
		if ctx.Err() == context.DeadlineExceeded {
			d.completeTask(task, nil, "task timeout")
		} else if task.Retries < task.MaxRetries {
			// 重试
			task.Retries++
			task.Status = TaskStatusPending
			d.taskQueue.Push(task)
		} else {
			d.completeTask(task, result, err.Error())
		}
	} else {
		d.completeTask(task, result, "")
	}
}

func (d *Dispatcher) completeTask(task *Task, result *TaskResult, errMsg string) {
	d.mu.Lock()
	defer d.mu.Unlock()
	
	now := time.Now()
	task.CompletedAt = &now
	
	if cancel, ok := d.running[task.ID]; ok {
		cancel()
		delete(d.running, task.ID)
	}
	
	d.stats.RunningTasks--
	
	if errMsg != "" {
		task.Status = TaskStatusFailed
		task.Error = errMsg
		d.stats.TotalFailed++
	} else {
		task.Status = TaskStatusCompleted
		if result != nil {
			task.Result = result.Output
		}
		d.stats.TotalCompleted++
	}
	
	// 恢复Agent状态
	if task.AssignedTo != "" {
		d.registry.UpdateStatus(task.AssignedTo, "active")
	}
	
	// 发送事件
	d.sendEvent(TaskEvent{
		Type:   "task_completed",
		TaskID: task.ID,
		AgentID: task.AssignedTo,
		Time:   now,
		Data:   task,
	})
}

func (d *Dispatcher) sendEvent(event TaskEvent) {
	select {
	case d.eventCh <- event:
	default:
		log.Printf("Event channel full, dropping event: %s", event.Type)
	}
}

func (d *Dispatcher) statsUpdater() {
	ticker := time.NewTicker(5 * time.Second)
	for {
		select {
		case <-d.ctx.Done():
			return
		case <-ticker.C:
			d.mu.Lock()
			d.stats.QueueSize = d.taskQueue.Size()
			d.mu.Unlock()
		}
	}
}

// EventChannel 获取事件通道
func (d *Dispatcher) EventChannel() <-chan TaskEvent {
	return d.eventCh
}

// GetStats 获取调度统计
func (d *Dispatcher) GetStats() DispatcherStats {
	d.mu.RLock()
	defer d.mu.RUnlock()
	return d.stats
}

// Shutdown 关闭调度器
func (d *Dispatcher) Shutdown() {
	d.cancel()
	d.wg.Wait()
}

// 辅助函数
func timePtr(t time.Time) *time.Time {
	return &t
}

四、MCP与A2A协议深度解读

4.1 MCP协议(Model Context Protocol)

MCP是Anthropic主导的AI模型上下文协议,类似于"AI时代的USB接口":

# mcp_protocol.py
import asyncio
import json
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
from enum import Enum
import aiohttp
import hashlib


class MCPMessageType(Enum):
    INITIALIZE = "initialize"
    INITIALIZED = "initialized"
    TOOLS_LIST = "tools/list"
    TOOLS_CALL = "tools/call"
    TOOLS_RESULT = "tools/result"
    RESOURCES_LIST = "resources/list"
    RESOURCES_READ = "resources/read"
    PROMPTS_LIST = "prompts/list"
    PROMPTS_GET = "prompts/get"


@dataclass
class MCPMessage:
    """MCP消息格式"""
    jsonrpc: str = "2.0"
    id: Optional[str] = None
    method: Optional[str] = None
    params: Optional[Dict] = None
    result: Optional[Any] = None
    error: Optional[Dict] = None
    
    def to_dict(self) -> Dict:
        msg = {"jsonrpc": self.jsonrpc}
        if self.id is not None:
            msg["id"] = self.id
        if self.method is not None:
            msg["method"] = self.method
        if self.params is not None:
            msg["params"] = self.params
        if self.result is not None:
            msg["result"] = self.result
        if self.error is not None:
            msg["error"] = self.error
        return msg
    
    @classmethod
    def from_dict(cls, data: Dict) -> "MCPMessage":
        return cls(
            id=data.get("id"),
            method=data.get("method"),
            params=data.get("params"),
            result=data.get("result"),
            error=data.get("error")
        )


@dataclass
class MCPTool:
    """MCP工具定义"""
    name: str
    description: str
    inputSchema: Dict[str, Any]
    annotations: Optional[Dict] = None


@dataclass 
class MCPResource:
    """MCP资源定义"""
    uri: str
    name: str
    description: Optional[str] = None
    mimeType: Optional[str] = None


class MCPClient:
    """MCP客户端"""
    
    def __init__(self, server_url: str):
        self.server_url = server_url
        self.session_id = None
        self.capabilities = {}
        self.tools: Dict[str, MCPTool] = {}
        self.resources: Dict[str, MCPResource] = {}
        self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
        self._pending_requests: Dict[str, asyncio.Future] = {}
    
    async def connect(self):
        """建立连接"""
        async with aiohttp.ClientSession() as session:
            self._ws = await session.ws_connect(self.server_url)
            
            # 发送初始化请求
            init_msg = MCPMessage(
                id="init_1",
                method="initialize",
                params={
                    "protocolVersion": "2024-11-05",
                    "capabilities": {
                        "roots": {"listChanged": True},
                        "sampling": {}
                    },
                    "clientInfo": {
                        "name": "mcp-python-client",
                        "version": "1.0.0"
                    }
                }
            )
            
            response = await self._send_request(init_msg)
            self.capabilities = response.result.get("capabilities", {})
            self.session_id = response.id
            
            # 通知初始化完成
            await self._send_notification("initialized", {
                "protocolVersion": "2024-11-05"
            })
            
            # 加载工具和资源列表
            await self._load_tools()
            await self._load_resources()
    
    async def _load_tools(self):
        """加载可用工具"""
        msg = MCPMessage(method="tools/list")
        response = await self._send_request(msg)
        
        for tool in response.result.get("tools", []):
            self.tools[tool["name"]] = MCPTool(
                name=tool["name"],
                description=tool.get("description", ""),
                inputSchema=tool.get("inputSchema", {}),
                annotations=tool.get("annotations")
            )
    
    async def _load_resources(self):
        """加载可用资源"""
        msg = MCPMessage(method="resources/list")
        response = await self._send_request(msg)
        
        for resource in response.result.get("resources", []):
            self.resources[resource["uri"]] = MCPResource(
                uri=resource["uri"],
                name=resource.get("name", ""),
                description=resource.get("description"),
                mimeType=resource.get("mimeType")
            )
    
    async def call_tool(self, tool_name: str, arguments: Dict) -> Dict:
        """调用工具"""
        if tool_name not in self.tools:
            raise ValueError(f"Unknown tool: {tool_name}")
        
        msg = MCPMessage(
            id=f"call_{hashlib.md5(str(time.time()).encode()).hexdigest()[:8]}",
            method="tools/call",
            params={
                "name": tool_name,
                "arguments": arguments
            }
        )
        
        response = await self._send_request(msg)
        return response.result
    
    async def read_resource(self, uri: str) -> str:
        """读取资源"""
        msg = MCPMessage(
            id=f"res_{hashlib.md5(uri.encode()).hexdigest()[:8]}",
            method="resources/read",
            params={"uri": uri}
        )
        
        response = await self._send_request(msg)
        contents = response.result.get("contents", [])
        if contents:
            return contents[0].get("text", "")
        return ""
    
    async def _send_request(self, msg: MCPMessage) -> MCPMessage:
        """发送请求并等待响应"""
        future = asyncio.Future()
        self._pending_requests[msg.id] = future
        
        await self._ws.send_json(msg.to_dict())
        
        try:
            response = await future
            return response
        finally:
            del self._pending_requests[msg.id]
    
    async def _send_notification(self, method: str, params: Dict = None):
        """发送通知(不需要响应)"""
        await self._ws.send_json({
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {}
        })
    
    async def _receive_loop(self):
        """接收消息循环"""
        async for msg in self._ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                data = json.loads(msg.data)
                
                # 处理响应
                if "id" in data:
                    msg_id = data["id"]
                    if msg_id in self._pending_requests:
                        future = self._pending_requests[msg_id]
                        if not future.done():
                            future.set_result(MCPMessage.from_dict(data))
                
                # 处理通知
                elif "method" in data:
                    asyncio.create_task(self._handle_notification(data))
    
    async def _handle_notification(self, data: Dict):
        """处理服务器通知"""
        method = data.get("method")
        params = data.get("params", {})
        
        if method == "notifications/tools/list_changed":
            await self._load_tools()
        elif method == "notifications/resources/list_changed":
            await self._load_resources()


class MCPServer:
    """MCP服务器(简化实现)"""
    
    def __init__(self, port: int = 8080):
        self.port = port
        self.tools: Dict[str, Callable] = {}
        self.resources: Dict[str, Dict] = {}
        self.prompts: Dict[str, str] = {}
        self._app: Optional[aiohttp.web.Application] = None
    
    def register_tool(self, name: str, handler: Callable, 
                      description: str = "", input_schema: Dict = None):
        """注册工具"""
        self.tools[name] = handler
        # 存储工具元数据
    
    def register_resource(self, uri: str, name: str, 
                         content_provider: Callable):
        """注册资源"""
        self.resources[uri] = {
            "name": name,
            "provider": content_provider
        }
    
    async def handle_request(self, msg: MCPMessage) -> MCPMessage:
        """处理请求"""
        method = msg.method
        
        if method == "initialize":
            return MCPMessage(
                id=msg.id,
                result={
                    "protocolVersion": "2024-11-05",
                    "capabilities": {
                        "tools": {"listChanged": True},
                        "resources": {"subscribe": True, "listChanged": True}
                    },
                    "serverInfo": {
                        "name": "mcp-server",
                        "version": "1.0.0"
                    }
                }
            )
        
        elif method == "tools/list":
            tools_list = []
            for name, handler in self.tools.items():
                tools_list.append({
                    "name": name,
                    "description": handler.__doc__ or ""
                })
            return MCPMessage(id=msg.id, result={"tools": tools_list})
        
        elif method == "tools/call":
            params = msg.params
            tool_name = params.get("name")
            arguments = params.get("arguments", {})
            
            if tool_name not in self.tools:
                return MCPMessage(
                    id=msg.id,
                    error={"code": -32602, "message": f"Unknown tool: {tool_name}"}
                )
            
            try:
                result = await self.tools[tool_name](**arguments)
                return MCPMessage(
                    id=msg.id,
                    result={
                        "content": [
                            {"type": "text", "text": json.dumps(result)}
                        ]
                    }
                )
            except Exception as e:
                return MCPMessage(
                    id=msg.id,
                    error={"code": -32603, "message": str(e)}
                )
        
        elif method == "resources/list":
            resources_list = [
                {"uri": uri, "name": meta["name"]}
                for uri, meta in self.resources.items()
            ]
            return MCPMessage(id=msg.id, result={"resources": resources_list})
        
        elif method == "resources/read":
            uri = msg.params.get("uri")
            if uri not in self.resources:
                return MCPMessage(
                    id=msg.id,
                    error={"code": -32602, "message": f"Unknown resource: {uri}"}
                )
            
            content = await self.resources[uri]["provider"]()
            return MCPMessage(
                id=msg.id,
                result={
                    "contents": [{"uri": uri, "text": content}]
                }
            )
        
        return MCPMessage(
            id=msg.id,
            error={"code": -32601, "message": f"Method not found: {method}"}
        )

4.2 A2A协议(Agent-to-Agent)

A2A协议让不同厂商的Agent能够互相通信:

# a2a_protocol.py
import asyncio
import json
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import uuid


class A2AMessageType(Enum):
    TASK_SUBMIT = "task/submit"
    TASK_GET = "task/get"
    TASK_CANCEL = "task/cancel"
    TASK_SUBSCRIBE = "task/subscribe"
    TASK_FEEDBACK = "task/feedback"
    AGENT_DISCOVER = "agent/discover"
    AGENT_CAPABILITIES = "agent/capabilities"


@dataclass
class A2ACapability:
    """Agent能力描述"""
    name: str
    description: str
    input_types: List[str]
    output_types: List[str]
    quality_metrics: Optional[Dict] = None


@dataclass
class A2ATask:
    """A2A任务"""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    type: str = ""
    priority: int = 0
    input: Dict[str, Any] = field(default_factory=dict)
    output: Optional[Dict] = None
    status: str = "pending"  # pending, running, completed, failed
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)
    error: Optional[str] = None
    artifacts: List[Dict] = field(default_factory=list)
    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass
class A2AMessage:
    """A2A消息"""
    type: A2AMessageType
    task_id: Optional[str] = None
    sender_id: str = ""
    receiver_id: str = ""
    payload: Dict[str, Any] = field(default_factory=dict)
    correlation_id: Optional[str] = None
    timestamp: datetime = field(default_factory=datetime.now)


class A2AAgent:
    """A2A兼容的Agent"""
    
    def __init__(self, agent_id: str, name: str):
        self.agent_id = agent_id
        self.name = name
        self.capabilities: List[A2ACapability] = []
        self.peer_agents: Dict[str, Dict] = {}
        self.message_queue: asyncio.Queue = asyncio.Queue()
        self._running = False
    
    def add_capability(self, capability: A2ACapability):
        """添加能力"""
        self.capabilities.append(capability)
    
    async def start(self):
        """启动Agent"""
        self._running = True
        asyncio.create_task(self._message_processor())
    
    async def stop(self):
        """停止Agent"""
        self._running = False
    
    async def send_message(self, receiver_id: str, message: A2AMessage) -> Dict:
        """发送消息到其他Agent"""
        # 实际实现需要消息传输层(WebSocket/HTTP/gRPC等)
        message.sender_id = self.agent_id
        message.receiver_id = receiver_id
        message.timestamp = datetime.now()
        
        # 模拟发送
        return {"status": "queued", "message_id": str(uuid.uuid4())}
    
    async def discover_agent(self, agent_id: str) -> Optional[Dict]:
        """发现其他Agent"""
        msg = A2AMessage(
            type=A2AMessageType.AGENT_DISCOVER,
            receiver_id=agent_id
        )
        response = await self.send_message(agent_id, msg)
        return response.get("capabilities")
    
    async def submit_task(self, receiver_id: str, task: A2ATask) -> str:
        """向其他Agent提交任务"""
        msg = A2AMessage(
            type=A2AMessageType.TASK_SUBMIT,
            payload={"task": {
                "id": task.id,
                "type": task.type,
                "priority": task.priority,
                "input": task.input
            }}
        )
        response = await self.send_message(receiver_id, msg)
        return response.get("task_id", task.id)
    
    async def subscribe_task(self, agent_id: str, task_id: str) -> asyncio.Queue:
        """订阅任务状态更新"""
        queue = asyncio.Queue()
        
        msg = A2AMessage(
            type=A2AMessageType.TASK_SUBSCRIBE,
            task_id=task_id,
            receiver_id=agent_id
        )
        await self.send_message(agent_id, msg)
        
        return queue
    
    async def _message_processor(self):
        """消息处理器"""
        while self._running:
            try:
                message = await asyncio.wait_for(
                    self.message_queue.get(),
                    timeout=1.0
                )
                await self._handle_message(message)
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                print(f"Message processor error: {e}")
    
    async def _handle_message(self, message: A2AMessage):
        """处理接收到的消息"""
        if message.type == A2AMessageType.TASK_SUBMIT:
            await self._handle_task_submit(message)
        elif message.type == A2AMessageType.TASK_SUBSCRIBE:
            await self._handle_task_subscribe(message)
        elif message.type == A2AMessageType.AGENT_DISCOVER:
            await self._handle_agent_discover(message)
        elif message.type == A2AMessageType.TASK_GET:
            await self._handle_task_get(message)
        elif message.type == A2AMessageType.TASK_CANCEL:
            await self._handle_task_cancel(message)
    
    async def _handle_task_submit(self, message: A2AMessage):
        """处理任务提交"""
        task_data = message.payload.get("task", {})
        task = A2ATask(**task_data)
        task.status = "pending"
        
        # 执行任务
        result = await self._execute_task(task)
        
        # 发送结果
        response_msg = A2AMessage(
            type=A2AMessageType.TASK_FEEDBACK,
            task_id=task.id,
            receiver_id=message.sender_id,
            payload={"status": task.status, "result": task.output, "error": task.error}
        )
        await self.send_message(message.sender_id, response_msg)
    
    async def _execute_task(self, task: A2ATask) -> A2ATask:
        """执行任务(由子类重写)"""
        task.status = "running"
        task.updated_at = datetime.now()
        
        # 模拟执行
        await asyncio.sleep(0.1)
        
        task.status = "completed"
        task.output = {"result": "Task completed successfully"}
        task.updated_at = datetime.now()
        
        return task
    
    async def _handle_task_subscribe(self, message: A2AMessage):
        """处理任务订阅"""
        task_id = message.task_id
        # 注册订阅者,后续任务更新会通知
        pass
    
    async def _handle_agent_discover(self, message: A2AMessage):
        """处理Agent发现请求"""
        response = A2AMessage(
            type=A2AMessageType.AGENT_CAPABILITIES,
            receiver_id=message.sender_id,
            payload={
                "agent_id": self.agent_id,
                "name": self.name,
                "capabilities": [
                    {
                        "name": cap.name,
                        "description": cap.description,
                        "input_types": cap.input_types,
                        "output_types": cap.output_types
                    }
                    for cap in self.capabilities
                ]
            }
        )
        await self.send_message(message.sender_id, response)
    
    async def _handle_task_get(self, message: A2AMessage):
        """处理获取任务状态请求"""
        task_id = message.payload.get("task_id")
        # 返回任务状态
        pass
    
    async def _handle_task_cancel(self, message: A2AMessage):
        """处理取消任务请求"""
        task_id = message.task_id
        # 取消任务
        pass


class A2ACoordinator:
    """A2A协调器 - 管理多个Agent的协作"""
    
    def __init__(self):
        self.agents: Dict[str, A2AAgent] = {}
        self.task_graph: Dict[str, List[str]] = {}  # task_id -> subtask_ids
    
    def register_agent(self, agent: A2AAgent):
        """注册Agent"""
        self.agents[agent.agent_id] = agent
    
    async def submit_workflow(self, tasks: List[A2ATask], 
                              dependencies: Dict[str, List[str]] = None) -> str:
        """提交工作流"""
        workflow_id = str(uuid.uuid4())
        
        # 记录任务依赖关系
        if dependencies:
            self.task_graph[workflow_id] = []
            for task_id, deps in dependencies.items():
                self.task_graph[workflow_id].extend(deps)
        
        # 按依赖顺序提交任务
        submitted_tasks = {}
        pending_tasks = {t.id: t for t in tasks}
        
        while pending_tasks:
            # 找到可以提交的任务(没有未完成的依赖)
            ready_tasks = []
            for task_id, task in list(pending_tasks.items()):
                deps = dependencies.get(task_id, []) if dependencies else []
                if all(dep in submitted_tasks for dep in deps):
                    ready_tasks.append(task)
            
            if not ready_tasks:
                break
            
            # 提交就绪任务
            for task in ready_tasks:
                # 选择最合适的Agent
                agent = await self._select_agent(task)
                if agent:
                    await agent.submit_task(agent.agent_id, task)
                    submitted_tasks[task.id] = task
                    del pending_tasks[task.id]
        
        return workflow_id
    
    async def _select_agent(self, task: A2ATask) -> Optional[A2AAgent]:
        """选择最适合执行任务的Agent"""
        # 简化实现:随机选择
        if self.agents:
            return list(self.agents.values())[0]
        return None
    
    async def get_workflow_status(self, workflow_id: str) -> Dict:
        """获取工作流状态"""
        # 返回工作流中所有任务的状态汇总
        pass

五、生产环境部署指南

5.1 容器化部署架构

# docker-compose.yml
version: '3.8'

services:
  # Agent核心服务
  agent-core:
    build:
      context: .
      dockerfile: Dockerfile.agent
    ports:
      - "8000:8000"
    environment:
      - MODEL_PROVIDER=openai
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_URL=redis://redis:6379
      - DATABASE_URL=postgresql://agent:password@postgres:5432/agent_db
    depends_on:
      - redis
      - postgres
    deploy:
      replicas: 3
      resources:
        limits:
          cpus: '2'
          memory: 4G
        reservations:
          cpus: '1'
          memory: 2G
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 10s
      timeout: 5s
      retries: 3

  # MCP工具服务器
  mcp-server:
    build:
      context: .
      dockerfile: Dockerfile.mcp
    ports:
      - "8080:8080"
    environment:
      - DB_PATH=/data/tools.db
    volumes:
      - tool-data:/data

  # Redis集群
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes
    volumes:
      - redis-data:/data

  # PostgreSQL数据库
  postgres:
    image: postgres:15-alpine
    environment:
      - POSTGRES_DB=agent_db
      - POSTGRES_USER=agent
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres-data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  # 向量数据库 (Qdrant)
  qdrant:
    image: qdrant/qdrant:latest
    ports:
      - "6333:6333"
      - "6334:6334"
    volumes:
      - qdrant-data:/qdrant/storage

  # 负载均衡器
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - agent-core

  # 监控服务
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro

  # 日志聚合
  loki:
    image: grafana/loki:latest
    ports:
      - "3100:3100"

volumes:
  redis-data:
  postgres-data:
  qdrant-data:
  tool-data:

5.2 Kubernetes部署配置

# k8s/agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ai-agent-core
  namespace: ai-platform
spec:
  replicas: 5
  selector:
    matchLabels:
      app: ai-agent-core
  template:
    metadata:
      labels:
        app: ai-agent-core
    spec:
      containers:
        - name: agent
          image: registry.example.com/ai-agent:v1.0.0
          ports:
            - containerPort: 8000
          env:
            - name: MODEL_PROVIDER
              value: "openai"
            - name: OPENAI_API_KEY
              valueFrom:
                secretKeyRef:
                  name: ai-agent-secrets
                  key: openai-api-key
          resources:
            requests:
              memory: "2Gi"
              cpu: "1000m"
            limits:
              memory: "4Gi"
              cpu: "2000m"
          livenessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 30
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /ready
              port: 8000
            initialDelaySeconds: 10
            periodSeconds: 5
          volumeMounts:
            - name: agent-config
              mountPath: /app/config
      volumes:
        - name: agent-config
          configMap:
            name: agent-config
---
apiVersion: v1
kind: Service
metadata:
  name: ai-agent-service
  namespace: ai-platform
spec:
  selector:
    app: ai-agent-core
  ports:
    - port: 80
      targetPort: 8000
  type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ai-agent-hpa
  namespace: ai-platform
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ai-agent-core
  minReplicas: 3
  maxReplicas: 20
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80

5.3 监控与可观测性

# observability/metrics.py
from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry
from opentelemetry import trace
from opentelemetry.exporter import jaeger
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
import time
import logging

logger = logging.getLogger(__name__)

# Prometheus指标
AGENT_REQUESTS = Counter(
    'agent_requests_total',
    'Total agent requests',
    ['agent_id', 'status']
)

AGENT_LATENCY = Histogram(
    'agent_request_duration_seconds',
    'Agent request latency',
    ['agent_id', 'operation']
)

AGENT_TOKENS = Counter(
    'agent_tokens_total',
    'Total tokens consumed',
    ['agent_id', 'model', 'type']
)

TOOL_CALLS = Counter(
    'tool_calls_total',
    'Total tool calls',
    ['tool_name', 'status']
)

TOOL_LATENCY = Histogram(
    'tool_call_duration_seconds',
    'Tool call latency',
    ['tool_name']
)

ACTIVE_TASKS = Gauge(
    'active_tasks',
    'Number of active tasks',
    ['agent_id', 'priority']
)

QUEUE_SIZE = Gauge(
    'task_queue_size',
    'Task queue size',
    ['queue_name']
)

COST_USD = Counter(
    'agent_cost_usd',
    'Agent cost in USD',
    ['agent_id', 'cost_type']
)


class AgentMetrics:
    """Agent指标收集器"""
    
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.tracer = trace.get_tracer(f"agent-{agent_id}")
    
    def record_request(self, status: str):
        """记录请求"""
        AGENT_REQUESTS.labels(agent_id=self.agent_id, status=status).inc()
    
    def record_latency(self, operation: str, duration: float):
        """记录延迟"""
        AGENT_LATENCY.labels(
            agent_id=self.agent_id,
            operation=operation
        ).observe(duration)
    
    def record_tokens(self, model: str, token_type: str, count: int):
        """记录Token使用"""
        AGENT_TOKENS.labels(
            agent_id=self.agent_id,
            model=model,
            type=token_type
        ).inc(count)
    
    def record_cost(self, cost_type: str, amount: float):
        """记录成本"""
        COST_USD.labels(
            agent_id=self.agent_id,
            cost_type=cost_type
        ).inc(amount)
    
    def set_active_tasks(self, priority: str, count: int):
        """设置活跃任务数"""
        ACTIVE_TASKS.labels(
            agent_id=self.agent_id,
            priority=priority
        ).set(count)
    
    @contextlib.contextmanager
    def trace_operation(self, operation_name: str):
        """追踪操作"""
        with self.tracer.start_as_current_span(operation_name) as span:
            start_time = time.time()
            try:
                yield span
            except Exception as e:
                span.set_attribute("error", True)
                span.set_attribute("error.message", str(e))
                raise
            finally:
                duration = time.time() - start_time
                span.set_attribute("duration_ms", duration * 1000)
                self.record_latency(operation_name, duration)


# 分布式追踪设置
def setup_tracing(service_name: str, jaeger_endpoint: str):
    """设置分布式追踪"""
    trace.set_tracer_provider(TracerProvider())
    
    exporter = jaeger.JaegerExporter(
        agent_host_name=jaeger_endpoint,
        agent_port=6831,
    )
    
    span_processor = BatchSpanProcessor(exporter)
    trace.get_tracer_provider().add_span_processor(span_processor)

六、避坑指南:88%项目失败的教训

6.1 成本估算的常见错误

# 正确的成本估算示例
class CostEstimator:
    """成本估算器"""
    
    def estimate_request(self, request: Dict) -> float:
        """
        估算单次请求成本
        注意:Agent的成本不是简单的 LLM调用成本!
        """
        total_cost = 0.0
        
        # 1. LLM推理成本
        input_tokens = request.get("input_tokens", 0)
        output_tokens = request.get("output_tokens", 0)
        model = request.get("model", "gpt-4")
        
        # 典型成本(美元/千Token)
        pricing = {
            "gpt-4": {"input": 0.03, "output": 0.06},
            "gpt-4-turbo": {"input": 0.01, "output": 0.03},
            "claude-3-opus": {"input": 0.015, "output": 0.075},
        }
        
        if model in pricing:
            p = pricing[model]
            total_cost += (input_tokens / 1000) * p["input"]
            total_cost += (output_tokens / 1000) * p["output"]
        
        # 2. 工具调用成本(外部API费用)
        for tool_call in request.get("tool_calls", []):
            tool_name = tool_call.get("name")
            # 外部API成本需要单独计算
            total_cost += self._estimate_tool_cost(tool_name, tool_call)
        
        # 3. 重试成本(通常首次失败会重试)
        retry_count = request.get("retry_count", 0)
        total_cost *= (1 + retry_count * 0.5)  # 假设重试成本为50%
        
        # 4. Token放大效应
        # 如果一次请求触发多次LLM调用(规划→执行→验证)
        llm_call_count = request.get("llm_call_count", 1)
        total_cost *= llm_call_count
        
        return total_cost
    
    def _estimate_tool_cost(self, tool_name: str, params: Dict) -> float:
        """估算工具调用成本"""
        costs = {
            "database_query": 0.001,  # 假设每次DB查询$0.001
            "http_request": 0.0001,    # 假设每次HTTP请求$0.0001
            "code_execution": 0.01,    # 沙箱执行$0.01
        }
        return costs.get(tool_name, 0.0)
    
    def estimate_daily_cost(self, requests_per_day: int, 
                           avg_cost_per_request: float,
                           p95_multiplier: float = 1.5) -> Dict:
        """估算日成本(含P95缓冲)"""
        avg_daily = requests_per_day * avg_cost_per_request
        p95_daily = avg_daily * p95_multiplier
        
        return {
            "avg_daily": avg_daily,
            "p95_daily": p95_daily,
            "avg_monthly": avg_daily * 30,
            "p95_monthly": p95_daily * 30
        }

6.2 组合爆炸问题的解决方案

# 组合爆炸控制
class ToolCallOptimizer:
    """工具调用优化器"""
    
    def __init__(self, max_tools: int = 10):
        self.max_tools = max_tools
        self.tool_cache = {}
    
    def optimize_plan(self, plan: List[Dict], 
                     context: ExecutionContext) -> List[Dict]:
        """
        优化执行计划,防止组合爆炸
        
        策略1:工具合并
        策略2:依赖剪枝
        策略3:缓存复用
        """
        optimized = []
        
        for step in plan:
            tool_name = step.get("tool")
            params = step.get("parameters", {})
            
            # 检查缓存
            cache_key = self._get_cache_key(tool_name, params)
            if cache_key in self.tool_cache:
                # 使用缓存结果
                cached_result = self.tool_cache[cache_key]
                context.tools_results[tool_name] = cached_result
                continue
            
            # 检查是否可以合并到上一步
            if optimized and self._can_merge(optimized[-1], step):
                merged = self._merge_steps(optimized[-1], step)
                optimized[-1] = merged
            else:
                optimized.append(step)
        
        # 限制总步数
        if len(optimized) > self.max_tools:
            optimized = self._prune_plan(optimized)
        
        return optimized
    
    def _get_cache_key(self, tool_name: str, params: Dict) -> str:
        """生成缓存键"""
        import hashlib
        import json
        key_data = {"tool": tool_name, "params": params}
        return hashlib.md5(json.dumps(key_data, sort_keys=True).encode()).hexdigest()
    
    def _can_merge(self, step1: Dict, step2: Dict) -> bool:
        """判断两个步骤是否可以合并"""
        # 简单实现:相同工具且无依赖关系可合并
        return (step1.get("tool") == step2.get("tool") and
                step1.get("tool") in ["database_query", "http_request"])
    
    def _merge_steps(self, step1: Dict, step2: Dict) -> Dict:
        """合并两个步骤"""
        # 批量执行
        return {
            "tool": step1["tool"],
            "parameters": {
                "batch": [step1["parameters"], step2["parameters"]]
            }
        }
    
    def _prune_plan(self, plan: List[Dict]) -> List[Dict]:
        """剪枝计划"""
        # 优先保留关键路径上的步骤
        # 使用启发式算法评估每个步骤的重要性
        return plan[:self.max_tools]

七、核心发现与总结

7.1 本文核心发现

发现一:2026年AI Agent的三大技术突破

  1. 长期自主性+记忆系统:Agent可以持续数周工作,关键信息遗忘率降到10%以下,月均性能提升15%
  2. Computer Use成为标配:跨系统操作成功率82%,工具调用成功率从68%提升到89%
  3. Multi-Agent协作架构主导:协作效率提升71%,A2A开放协议打破供应商壁垒

发现二:MCP与A2A协议的双轮驱动

  • MCP协议解决"AI与工具的连接"问题
  • A2A协议解决"Agent与Agent的连接"问题
  • 两个协议结合,AI Agent从"单机游戏"变成"网络游戏"

发现三:容器化成为Agent标准部署方式

Gartner预测到2028年,95%的新AI部署将基于容器化架构。阿里云已展示15000个沙箱/分钟的并发创建能力。

7.2 落地建议

阶段关键动作推荐工具/框架
快速验证最小可行AgentLangChain/Dify
生产部署容器化+可观测性Kubernetes+Prometheus
规模化多Agent协作自研A2A协调器
企业级安全治理+合规Policy-as-Code

7.3 关键数据参考

  • 成功项目平均ROI:171%
  • 失败项目主因:治理问题(58% CTO认为)
  • 推理成本下降:英伟达吞吐量提升9倍
  • Agent任务完成率:头部场景已达92.7%

参考资料

  1. Gartner《2026智能体AI核心指南》
  2. IBM与AWS联合发布的《2026年智能体AI核心指南》
  3. 中国信通院《2026智能体协同矩阵白皮书》
  4. Anthropic MCP Protocol Specification v2024-11-05
  5. Google I/O 2026 技术发布
  6. 阿里云峰会2026 Agentic Cloud发布
  7. OpenAI Agents SDK Documentation

关于作者:本文内容基于2026年5月全球AI Agent领域的最新进展。如有疑问,欢迎联系交流。

版权声明:本文允许转载,转载时请注明出处。