从代码到钢铁:英伟达ENPIRE让AI Agent在物理世界自主科研

8个AI Coding Agent × 8台真实机器人 = 物理世界AutoResearch首次闭环验证

2026年6月17-18日,英伟达GEAR实验室联合CMU、UC Berkeley发布ENPIRE项目,让AI Agent真正走出数字沙盒,自主操控机械臂完成插针、装GPU、剪扎带等高精度任务,最终成功率99%。


一、引言:当AI不再只是敲代码

2024年,Andrej Karpathy开源了autoresearch项目,AI可以自动完成模型训练和实验管理;2025年,AI Scientist已经能自动生成研究方案、运行实验并撰写论文。

但这些系统有一个共同点:它们始终活在数字环境中。代码跑完即出结果,模拟器里的物理是确定的,一次失败可以零成本重来。

现实世界不一样。

机器人碰撞时的摩擦力会变化,物体无法精确复原,光照和传感器噪声始终波动。ENPIRE论文中有一个鲜明的案例:在模拟环境中,三个被测Coding Agent全部成功完成了Push-T任务;但当同一方法部署到真实机器人上时,其中两个Agent直接失败。

这正是ENPIRE(Agentic Robot Policy Self-Improvement in the Real World)存在的意义——让AI科研第一次触及物理世界的非确定性。


二、硬件架构:8个独立"科研工位"

architecture

ENPIRE的物理配置堪称豪华:

  • 8个实验单元,每个独立运行
  • 每个单元配备
    • 6自由度YAM机械臂(协作操作,如一手固定、一手操作)
    • Intel RealSense深度摄像头(视觉感知)
    • RTX 5090工作站(32GB显存)(本地训练与推理)
  • 所有计算本地完成,不依赖共享集群
  • 安全机制:硬件层面——运动极限切断 + 扭矩受限夹爪;软件层面——奖励函数冻结,防止Agent篡改评分
# 实验单元配置示例(Python)
class ExperimentUnit:
    """ENPIRE单个实验单元配置"""
    def __init__(self, unit_id: int, ip: str):
        self.unit_id = unit_id
        self.robot_arms = [
            YAMArm(f"{ip}:50051"),  # 机械臂A
            YAMArm(f"{ip}:50052"),  # 机械臂B
        ]
        self.camera = RealSenseCamera(f"{ip}:50053")
        self.workstation = GPUWorkstation(
            gpu_model="RTX 5090",
            vram_gb=32,
            local_mode=True
        )
        self.safety = SafetyController(
            joint_limit_deg=270,      # 关节角度极限
            torque_limit_nm=5.0,      # 扭矩上限
            reward_frozen=True        # 奖励函数冻结
        )
    
    def reset_scene(self) -> bool:
        """自动场景重置"""
        self.robot_arms[0].move_to_home()
        self.robot_arms[1].move_to_home()
        return self.camera.verify_scene_ready()

三、四大核心模块:EN-PI-R-E闭环

ENPIRE的名字本身就是架构——四个模块首字母拼成"ENPIRE",构成完整的物理科研闭环:

architecture

EN - Environment(环境模块):自动场景重置与验证

核心发现:对许多机器人任务而言,重置环境比完成任务本身更容易

EN模块负责在每次实验后自动复位实验场景,并通过视觉分类器验证场景是否就绪。Agent使用Code-as-Policy方式编写重置程序——很多情况下,重置就是一个复杂的Pick-and-Place任务。

# 环境自动重置示例(Python)
class EnvironmentModule:
    """ENPIRE - Environment Module"""
    def __init__(self, unit: ExperimentUnit):
        self.unit = unit
        self.verifier = VisualVerifier(
            model_path="models/scene_classifier.onnx",
            confidence_threshold=0.95
        )
    
    def auto_reset(self, task_config: dict) -> bool:
        """
        自动场景重置流程
        支持最大重试次数,防止死循环
        """
        max_retries = 3
        for attempt in range(max_retries):
            # 1. 移走所有物体到初始位置
            for obj_pose in task_config["initial_poses"]:
                success = self.unit.robot_arms[0].pick_and_place(
                    target_pos=obj_pose["current"],
                    goal_pos=obj_pose["initial"],
                    gripper_force=2.0  # N, 轻柔抓取
                )
                if not success:
                    self._log_error(f"Object {obj_pose['id']} reset failed")
                    continue
            
            # 2. 机械臂归零位
            self.unit.robot_arms[0].move_to_home()
            self.unit.robot_arms[1].move_to_home()
            
            # 3. 视觉验证
            frame = self.unit.camera.capture_frame()
            verified, confidence = self.verifier.verify(frame)
            
            if verified and confidence > 0.95:
                return True
            
            self._log_warning(f"Reset attempt {attempt+1} failed, confidence={confidence:.3f}")
        
        return False
    
    def _log_error(self, msg: str):
        print(f"[EN:ERROR] {msg}")

PI - Policy Improvement(策略精化模块):持续优化控制算法

PI模块是ENPIRE的"大脑"。Agent在这里尝试不同的算法范式——从启发式规则到行为克隆,再到强化学习。关键创新:Agent不是简单调参,而是从根本上重写算法。

在插针任务中,一个Agent甚至自行编写了接触力安全控制器,效果超过了单纯调节RL参数。

# 策略精化示例(Python + Go混合语义)
class PolicyImprovementModule:
    """ENPIRE - Policy Improvement Module"""
    
    def __init__(self, agent: CodingAgent):
        self.agent = agent
        self.best_policy = None
        self.best_score = -float('inf')
        self.experiment_log = []
    
    def search_and_improve(self, 
                          task_name: str,
                          search_space: dict,
                          max_iterations: int = 10) -> dict:
        """
        策略搜索与精化核心流程
        Agent自主决定探索方向:新算法 / 新奖励 / 新数据管道
        """
        for iteration in range(max_iterations):
            # Agent阅读论文,提出假设
            hypothesis = self.agent.brainstorm(
                task=task_name,
                context=self.experiment_log[-3:],  # 最近3次实验
                search_prompt="Read related papers and propose a new approach"
            )
            
            # Agent自主修改训练代码
            new_policy_code = self.agent.generate_policy(
                hypothesis=hypothesis,
                base_policy=self.best_policy
            )
            
            # 编译/部署策略到机器人
            compiled = self._compile_and_deploy(new_policy_code)
            
            if compiled:
                rollout_results = self._run_rollout(new_policy_code)
                score = rollout_results["success_rate"]
                
                if score > self.best_score:
                    self.best_policy = new_policy_code
                    self.best_score = score
                    self._git_commit(f"Iter {iteration}: Improved to {score:.2%}")
                
                self.experiment_log.append({
                    "iteration": iteration,
                    "hypothesis": hypothesis,
                    "score": score,
                    "code_hash": hash(new_policy_code)
                })
        
        return {"best_policy": self.best_policy, "best_score": self.best_score}
    
    def _compile_and_deploy(self, code: str) -> bool:
        """编译并部署策略代码到机器人控制层"""
        # 伪代码:实际调用Go后端编译
        return True
    
    def _run_rollout(self, policy_code: str) -> dict:
        """执行策略评估"""
        return {"success_rate": 0.85, "avg_time_s": 12.3}
    
    def _git_commit(self, message: str):
        """Git自动提交,供其他Agent共享"""
        import subprocess
        subprocess.run(["git", "add", "."], capture_output=True)
        subprocess.run(["git", "commit", "-m", message], capture_output=True)

R - Rollout(评估模块):多机器人并行策略评估

R模块在真实机器人上并行部署策略,收集统计数据。这里的pass@8标准是:8次尝试中只要有一次成功即算成功——体现了系统对"足够优秀即可"的务实态度。

# 多机器人并行Rollout示例(Python)
import asyncio
from dataclasses import dataclass
from typing import List

@dataclass
class RolloutResult:
    """一次Rollout的结果"""
    unit_id: int
    success: bool
    execution_time_s: float
    failure_reason: str = ""

class RolloutModule:
    """ENPIRE - Rollout Module (多机器人并行评估)"""
    
    def __init__(self, units: List[ExperimentUnit]):
        self.units = units
        self.pass_at_k = 8  # pass@8标准
    
    async def parallel_rollout(self, 
                             policy_code: str,
                             num_episodes: int = 10) -> dict:
        """
        在所有机器人上并行执行策略评估
        使用asyncio实现真正的并行调度
        """
        tasks = []
        for unit in self.units:
            for episode in range(num_episodes // len(self.units)):
                tasks.append(
                    self._run_single_episode(unit, policy_code, episode)
                )
        
        # 并发执行所有任务
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 统计分析
        successes = [r for r in results if isinstance(r, RolloutResult) and r.success]
        failures = [r for r in results if isinstance(r, RolloutResult) and not r.success]
        
        return {
            "total_episodes": len(results),
            "success_count": len(successes),
            "failure_count": len(failures),
            "success_rate": len(successes) / len(results) * 100,
            "pass_at_8": len(successes) >= self.pass_at_k,
            "avg_execution_time_s": sum(
                r.execution_time_s for r in results 
                if isinstance(r, RolloutResult)
            ) / len(results),
            "failure_analysis": self._analyze_failures(failures)
        }
    
    async def _run_single_episode(self, 
                                 unit: ExperimentUnit,
                                 policy_code: str,
                                 episode_id: int) -> RolloutResult:
        """单次Rollout执行"""
        try:
            # 1. 环境重置
            await asyncio.get_event_loop().run_in_executor(
                None, unit.reset_scene
            )
            
            # 2. 执行策略(调用Go后端高速执行)
            start_time = time.time()
            exec_result = await self._execute_policy(
                unit, policy_code
            )
            elapsed = time.time() - start_time
            
            # 3. 验证结果
            verified = unit.camera.verify_task_complete()
            
            return RolloutResult(
                unit_id=unit.unit_id,
                success=verified,
                execution_time_s=elapsed,
                failure_reason=exec_result.get("error", "") if not verified else ""
            )
        except Exception as e:
            return RolloutResult(
                unit_id=unit.unit_id,
                success=False,
                execution_time_s=0,
                failure_reason=str(e)
            )
    
    async def _execute_policy(self, unit: ExperimentUnit, code: str) -> dict:
        """调用Go后端策略执行引擎"""
        # 实际通过gRPC调用Go推理引擎
        return {"status": "ok"}
    
    def _analyze_failures(self, failures: List[RolloutResult]) -> dict:
        """失败模式分析"""
        reasons = {}
        for f in failures:
            reasons[f.failure_reason] = reasons.get(f.failure_reason, 0) + 1
        return dict(sorted(reasons.items(), key=lambda x: -x[1]))

E - Evolution(进化模块):失败分析驱动代码改进

E模块是ENPIRE最具"人类科研风格"的部分。Agent分析实验日志、联网搜索相关论文、提出新算法假设、修改代码并提交Git。关键约束:Agent不能修改奖励函数(已冻结),只能改进策略本身。

// 进化模块核心 - Go实现
// Go的高并发特性用于并行搜索论文和分析日志

package evolution

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"
)

// ExperimentLog 实验日志结构
type ExperimentLog struct {
	UnitID      int       `json:"unit_id"`
	Timestamp   time.Time `json:"timestamp"`
	Success     bool      `json:"success"`
	ErrorMsg    string    `json:"error_msg,omitempty"`
	RewardValue float64   `json:"reward_value"`
	DurationMs  int64     `json:"duration_ms"`
}

// FailureAnalysis 失败分析器
type FailureAnalyzer struct {
	agent       *CodingAgent
	paperSearch *PaperSearcher
	logStore    []ExperimentLog
	mu          sync.Mutex
}

// AnalyzeAndIterate 分析失败并迭代改进
func (fa *FailureAnalyzer) AnalyzeAndIterate(ctx context.Context, logs []ExperimentLog) (*ImprovementPlan, error) {
	// 1. 并发分析所有失败日志
	failureCh := make(chan FailurePattern, len(logs))
	var wg sync.WaitGroup
	
	for _, log := range logs {
		if !log.Success {
			wg.Add(1)
			go func(l ExperimentLog) {
				defer wg.Done()
				pattern := fa.analyzeSingleFailure(l)
				failureCh <- pattern
			}(log)
		}
	}
	
	go func() {
		wg.Wait()
		close(failureCh)
	}()
	
	// 2. 汇总失败模式
	patterns := make([]FailurePattern, 0)
	for p := range failureCh {
		patterns = append(patterns, p)
	}
	
	// 3. 基于失败模式搜索相关论文(并发)
	paperCh := make(chan PaperResult, len(patterns))
	var searchWg sync.WaitGroup
	
	for _, pattern := range patterns {
		searchWg.Add(1)
		go func(p FailurePattern) {
			defer searchWg.Done()
			papers, err := fa.paperSearch.Search(ctx, p.Keyword, 3)
			if err != nil {
				log.Printf("Paper search failed for %s: %v", p.Keyword, err)
				return
			}
			paperCh <- PaperResult{Pattern: p, Papers: papers}
		}(pattern)
	}
	
	go func() {
		searchWg.Wait()
		close(paperCh)
	}()
	
	// 4. 生成改进计划
	plan := &ImprovementPlan{
		Timestamp:    time.Now(),
		Changes:      make([]CodeChange, 0),
		Hypothesis:   "",
	}
	
	for pr := range paperCh {
		if len(pr.Papers) > 0 {
			change := fa.agent.ProposeChange(pr.Pattern, pr.Papers[0])
			plan.Changes = append(plan.Changes, change)
		}
	}
	
	// 5. 生成总体假设
	plan.Hypothesis = fa.agent.SynthesizeHypothesis(patterns, plan.Changes)
	
	return plan, nil
}

func (fa *FailureAnalyzer) analyzeSingleFailure(log ExperimentLog) FailurePattern {
	// 根据错误信息分类失败模式
	pattern := FailurePattern{
		Frequency: 1,
	}
	
	switch {
	case contains(log.ErrorMsg, "collision"):
		pattern.Type = CollisionFailure
		pattern.Keyword = "robot collision avoidance contact force control"
	case contains(log.ErrorMsg, "precision"):
		pattern.Type = PrecisionFailure
		pattern.Keyword = "high precision robotic insertion impedance control"
	case contains(log.ErrorMsg, "timeout"):
		pattern.Type = TimeoutFailure
		pattern.Keyword = "robot motion planning optimization"
	default:
		pattern.Type = UnknownFailure
		pattern.Keyword = "robotic manipulation failure recovery"
	}
	
	return pattern
}

func contains(s, substr string) bool {
	return len(s) >= len(substr) && 
		searchString(s, substr) // 简化实现
}

// ImprovementPlan 改进计划
type ImprovementPlan struct {
	Timestamp  time.Time    `json:"timestamp"`
	Changes    []CodeChange `json:"changes"`
	Hypothesis string       `json:"hypothesis"`
}

// CodeChange 具体的代码修改
type CodeChange struct {
	File        string `json:"file"`
	Description string `json:"description"`
	Code        string `json:"code"`
}

// ApplyToRepository 将改进应用到代码仓库
func (plan *ImprovementPlan) ApplyToRepository(repo *GitRepository) error {
	for _, change := range plan.Changes {
		if err := repo.ApplyChange(change); err != nil {
			return fmt.Errorf("apply change failed: %w", err)
		}
	}
	return repo.Commit(plan.Hypothesis)
}

四、三种Coding Agent横向对比

ENPIRE同时测试了三款主流Coding Agent:

Agent底层模型模拟器表现真实机器人表现特点
OpenAI CodexGPT-5.5✅ 全部成功✅ 最优达到目标成功率所需时间最短
Anthropic Claude CodeOpus 4.7✅ 全部成功✅ 良好代码质量高,但速度略慢
Kimi CodeKimi K2.6✅ 全部成功⚠️ 部分任务需更多迭代国产代表,架构兼容性好

关键发现:在Push-T基准任务上,三个Agent都没有使用神经网络不依赖训练数据不搞行为克隆——仅凭自己写的规则启发式方法,不到2小时就解决了这个通常需要大量人类示教数据的任务。Agent"动脑子"比"动手"快得多。

// Agent调度与负载均衡 - Go并发模型
package scheduler

import (
	"context"
	"log"
	"sync"
	"time"
)

// AgentTask AI Agent的任务描述
type AgentTask struct {
	ID          string                 `json:"id"`
	AgentType   string                 `json:"agent_type"` // codex / claude / kimi
	UnitID      int                    `json:"unit_id"`
	TaskType    string                 `json:"task_type"`  // push_t / peg_insert / gpu_insert / zip_tie
	Config      map[string]interface{} `json:"config"`
	CreatedAt   time.Time              `json:"created_at"`
	MaxDuration time.Duration          `json:"max_duration"`
}

// Scheduler 多Agent调度器
type Scheduler struct {
	agents    map[string]*AgentHandle
	units     []*ExperimentUnit
	workQueue chan AgentTask
	results   sync.Map
}

// NewScheduler 创建调度器,支持3种Agent类型
func NewScheduler(agentConfigs map[string]AgentConfig, units []*ExperimentUnit) *Scheduler {
	s := &Scheduler{
		agents:    make(map[string]*AgentHandle),
		units:     units,
		workQueue: make(chan AgentTask, 100),
	}

	// 为每种Agent类型创建3个实例(共9个,但仅8台机器人)
	for agentType, config := range agentConfigs {
		for i := 0; i < config.Instances; i++ {
			handle := &AgentHandle{
				ID:        fmt.Sprintf("%s-%d", agentType, i),
				AgentType: agentType,
				Config:    config,
				Busy:      false,
			}
			s.agents[handle.ID] = handle
		}
	}

	return s
}

// Start 启动调度器
func (s *Scheduler) Start(ctx context.Context) {
	var wg sync.WaitGroup

	// 启动工作协程池
	for i := 0; i < len(s.agents); i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for {
				select {
				case task := <-s.workQueue:
					s.executeTask(ctx, task)
				case <-ctx.Done():
					return
				}
			}
		}()
	}

	wg.Wait()
}

// executeTask 执行单个Agent任务
func (s *Scheduler) executeTask(ctx context.Context, task AgentTask) {
	// 1. 查找空闲Agent
	agent := s.findAvailableAgent(task.AgentType)
	if agent == nil {
		log.Printf("No available agent for task %s", task.ID)
		return
	}

	agent.Busy = true
	defer func() { agent.Busy = false }()

	// 2. 分配实验单元
	unit := s.units[task.UnitID]

	// 3. 记录开始时间
	startTime := time.Now()

	// 4. 执行任务(Agent自主完成)
	result := agent.Run(ctx, unit, task)

	// 5. 记录结果
	s.results.Store(task.ID, TaskResult{
		TaskID:     task.ID,
		AgentID:    agent.ID,
		UnitID:     task.UnitID,
		Success:    result.Success,
		Duration:   time.Since(startTime),
		TokenUsage: result.TokenUsage,
	})

	// 6. Git自动共享成果
	if result.Success {
		gitShare(agent.ID, task.ID, result.PolicyCode)
	}
}

// findAvailableAgent 查找指定类型的空闲Agent
func (s *Scheduler) findAvailableAgent(agentType string) *AgentHandle {
	for _, agent := range s.agents {
		if agent.AgentType == agentType && !agent.Busy {
			return agent
		}
	}
	return nil
}

五、物理Scaling Law:机器人数量成为新scaling资源

ENPIRE最引人注目的发现之一,是物理世界的Scaling Law

实验结果

在插针任务中,不同规模配置的实验时间:

机器人数量完成时间加速比并行机制
1台>1.5小时1.0x (基准)单一路径探索
4台~50分钟~1.8x多路径并行探索
8台~40分钟~2.25x全量并行 + 最优选取

核心机制

多个Coding Agent同时探索不同路线:

  • 有的尝试新的强化学习算法
  • 有的修改奖励函数
  • 有的调整训练基础设施

一旦某个方向被证明有效,其他Agent通过Git自动复制、合并甚至直接复用这些成果;效果不佳的路线被快速淘汰。

瓶颈揭示

但ENPIRE也暴露了一个反直觉的短板:MRU(平均机器人利用率)始终低于50%——机器人有一半时间在发呆,等待Agent想清楚下一步该做什么。

真正的瓶颈不在机器人硬件,而在Agent的思考速度

// 物理Scaling Law仿真 - Go并发模型
package scaling

import (
	"fmt"
	"math"
	"sync"
	"time"
)

// ScalingExperiment 物理Scaling Law实验仿真
type ScalingExperiment struct {
	NumRobots      int           // 机器人数量
	NumAgents      int           // Agent数量
	AgentThinkTime time.Duration // Agent平均思考时间
	RobotExecTime  time.Duration // 机器人执行时间
	CommOverhead   time.Duration // 通信/同步开销
}

// SimulateResult 仿真结果
type SimulateResult struct {
	NumRobots        int
	TotalTime        time.Duration
	Speedup          float64
	MRU              float64 // 机器人利用率
	TokenConsumption int64   // Token消耗
}

// SimulatePhysicalScaling 物理Scaling Law仿真
func SimulatePhysicalScaling(configs []ScalingExperiment) []SimulateResult {
	results := make([]SimulateResult, len(configs))
	var wg sync.WaitGroup

	for i, config := range configs {
		wg.Add(1)
		go func(idx int, cfg ScalingExperiment) {
			defer wg.Done()
			results[idx] = runSimulation(cfg)
		}(i, config)
	}

	wg.Wait()
	return results
}

func runSimulation(cfg ScalingExperiment) SimulateResult {
	// 模型参数(基于ENPIRE论文数据拟合)
	baseTime := 90.0 * float64(time.Minute) // 1台机器人基准时间
	
	// 并行加速公式(考虑通信开销)
	// Speedup = N / (1 + alpha * (N-1))
	// 其中alpha为通信开销系数
	alpha := 0.15 // 基于ENPIRE实验数据拟合
	
	N := float64(cfg.NumRobots)
	
	// 理论加速
	idealSpeedup := N
	// 实际加速(考虑通信开销)
	actualSpeedup := N / (1 + alpha * (N - 1))
	
	totalTime := baseTime / actualSpeedup
	
	// 机器人利用率计算
	// MRU = RobotExecTime / (RobotExecTime + AgentThinkTime + CommOverhead)
	totalCycleTime := float64(cfg.RobotExecTime) + 
		float64(cfg.AgentThinkTime) + 
		float64(cfg.CommOverhead) * (N - 1)
	
	mru := float64(cfg.RobotExecTime) / totalCycleTime
	
	// Token消耗(超线性增长)
	// 每个Agent不仅要自己实验,还要读其他Agent的进展
	tokenBase := int64(100000) // 单Agent基准Token
	tokenConsumption := tokenBase * int64(N) * int64(1 + int(0.3*(N-1)))
	
	return SimulateResult{
		NumRobots:        cfg.NumRobots,
		TotalTime:        time.Duration(totalTime),
		Speedup:          actualSpeedup,
		MRU:              math.Min(mru, 1.0),
		TokenConsumption: tokenConsumption,
	}
}

// RunBenchmark 运行ENPIRE论文基准对比
func RunBenchmark() {
	configs := []ScalingExperiment{
		{NumRobots: 1, NumAgents: 1, AgentThinkTime: 30 * time.Second, 
		 RobotExecTime: 25 * time.Second, CommOverhead: 5 * time.Second},
		{NumRobots: 4, NumAgents: 4, AgentThinkTime: 35 * time.Second, 
		 RobotExecTime: 25 * time.Second, CommOverhead: 8 * time.Second},
		{NumRobots: 8, NumAgents: 8, AgentThinkTime: 40 * time.Second, 
		 RobotExecTime: 25 * time.Second, CommOverhead: 12 * time.Second},
	}
	
	results := SimulatePhysicalScaling(configs)
	
	fmt.Println("=== Physical Scaling Law Benchmark ===")
	fmt.Printf("%-12s %-16s %-12s %-12s %-16s\n", 
		"Robots", "Total Time", "Speedup", "MRU", "Token Cons.")
	for _, r := range results {
		fmt.Printf("%-12d %-16s %-12.2f %-12.2f %-16d\n",
			r.NumRobots,
			r.TotalTime.Round(time.Second),
			r.Speedup,
			r.MRU,
			r.TokenConsumption)
	}
}

六、真实机器人任务与实验结果

四大任务

任务描述精度要求Agent自主策略
Push-T将T形积木推到目标区域~5mm启发式力控策略
插针将大头针插入4mm孔洞4mm接触力安全控制器
GPU插入将GPU插入主板插槽高精度阻抗控制 + 视觉引导
剪扎带用剪刀剪断塑料扎带灵巧操作双机械臂协作

实验结果

  • 最终成功率:所有任务达到99%(pass@8标准)
  • Push-T任务:8个Agent团队仅用2小时将成功率推到99%;单个Agent需接近5小时
  • 插针任务:Agent自主编写的接触力安全控制器效果超过人类调参
  • 知识迁移:Agent将插针任务经验总结为Markdown文档,附加到GPU插入任务说明后,新任务学习速度显著加快
# 机器人控制接口示例(Python)
import numpy as np
from typing import Tuple, Optional

class RobotControlInterface:
    """ENPIRE机器人控制接口 - Agent可直接调用的API"""
    
    def __init__(self, grpc_endpoint: str):
        self.endpoint = grpc_endpoint
        # 通过gRPC连接到Go后端控制引擎
        self._connect()
    
    def _connect(self):
        """建立与机器人控制器的连接"""
        pass
    
    def move_to_pose(self, 
                     x: float, y: float, z: float,
                     roll: float, pitch: float, yaw: float,
                     speed: float = 0.1) -> bool:
        """
        末端执行器移动到指定位姿
        
        Args:
            x, y, z: 目标位置 (m)
            roll, pitch, yaw: 目标欧拉角 (rad)
            speed: 移动速度 (m/s), 默认0.1
        
        Returns:
            是否成功到达
        """
        return self._call_grpc("MoveToPose", {
            "pose": [x, y, z, roll, pitch, yaw],
            "speed": speed
        })
    
    def grasp(self, force: float = 2.0) -> bool:
        """
        执行夹爪抓取
        
        Args:
            force: 抓取力 (N), 范围0.5-5.0
        
        Returns:
            是否成功抓取
        """
        return self._call_grpc("Grasp", {"force": force})
    
    def release(self) -> bool:
        """释放夹爪"""
        return self._call_grpc("Release", {})
    
    def apply_force(self, 
                   force_vector: Tuple[float, float, float],
                   duration: float = 1.0) -> bool:
        """
        施加力控(用于插针等精密操作)
        
        Args:
            force_vector: (Fx, Fy, Fz) 力向量 (N)
            duration: 持续时间 (s)
        """
        return self._call_grpc("ApplyForce", {
            "force": list(force_vector),
            "duration": duration
        })
    
    def get_joint_states(self) -> np.ndarray:
        """获取当前关节角度"""
        response = self._call_grpc("GetJointStates", {})
        return np.array(response["joint_angles"])
    
    def get_ee_pose(self) -> Tuple[np.ndarray, np.ndarray]:
        """获取末端执行器位姿"""
        response = self._call_grpc("GetEEPose", {})
        pos = np.array(response["position"])
        orient = np.array(response["orientation"])
        return pos, orient
    
    def _call_grpc(self, method: str, params: dict) -> dict:
        """调用Go后端gRPC服务"""
        # 实际实现:发送gRPC请求
        return {"status": "ok", "data": {}}

七、安全机制:物理世界科研的底线

物理世界搞自动化研究,翻一次车就可能损坏硬件甚至伤人。ENPIRE上了两层硬件兜底 + 一层软冻结

硬件安全层

  1. 硬运动极限切断:机械臂超出预设安全范围直接断电
  2. 扭矩受限夹爪:机械力天花板卡死在安全阈值内

软件安全层

  • 奖励函数冻结:通过视觉分类器离线固定并冻结,杜绝Agent篡改评分函数"自己给自己打高分"的作弊路径
# 安全控制器示例
class SafetyController:
    """ENPIRE双重安全控制系统"""
    
    def __init__(self, 
                 joint_soft_limit_deg: float = 270.0,
                 joint_hard_limit_deg: float = 300.0,
                 torque_limit_nm: float = 5.0,
                 force_limit_n: float = 20.0):
        self.soft_limit = joint_soft_limit_deg
        self.hard_limit = joint_hard_limit_deg
        self.torque_limit = torque_limit_nm
        self.force_limit = force_limit_n
        self.emergency_stopped = False
    
    def check_motion_safety(self, target_joints: np.ndarray) -> bool:
        """运动安全预检"""
        if self.emergency_stopped:
            return False
        
        # 检查每个关节是否在安全范围内
        for i, angle in enumerate(np.degrees(target_joints)):
            if abs(angle) > self.soft_limit:
                if abs(angle) > self.hard_limit:
                    self.emergency_stop(f"Joint {i} exceeded hard limit: {angle:.1f}deg")
                    return False
                self._log_warning(f"Joint {i} close to limit: {angle:.1f}deg")
        
        return True
    
    def emergency_stop(self, reason: str):
        """紧急停止"""
        self.emergency_stopped = True
        # 硬件级断电
        self._cut_power()
        print(f"[SAFETY] EMERGENCY STOP: {reason}")
    
    def _cut_power(self):
        """切断动力电源(硬件级操作)"""
        pass

八、开源生态与未来展望

开源计划

ENPIRE的全部代码和系统将在未来开源。基于LeRobot SO-101套件 + NVIDIA Jetson Thor的"平民版"也在路上,让普通开发者也能在家里搭建类似的自主机器人研究系统。

Jim Fan的评价

“AutoResearch(自动科研)进入物理世界的一次尝试。我们所做的只是为Codex提供了一个通往原子世界的API,其余的一切都是涌现。”

“目标是让全组去休假,连黄仁勋都察觉不到实验室还在跑。”

未来方向

  1. Agent思考加速:当前机器人利用率<50%,提升Agent推理速度是直接瓶颈
  2. 更大规模验证:16台、32台甚至更多机器人的Scaling Law边界在哪里?
  3. 多任务迁移:知识迁移已在插针→GPU插入任务中验证,能否扩展到完全不同的任务域?
  4. 人类角色转变:从"操作员"退到"看报告的人",人类研究者的工作变为设计Agent能做实验的环境

九、总结与技术洞察

ENPIRE的核心贡献不是单个算法突破,而是一个让AI Agent在物理世界自主科研的基础设施

  1. 架构创新:EN-PI-R-E四模块闭环,将真实世界的机器人学习转化为Agent管理的可控优化过程
  2. 物理Scaling Law:首次在机器人领域验证了"增加并行实验单元可加速科研"的scaling效应
  3. Agent自主性:从读论文、提假设、写代码、做实验到分析结果,全链路自主
  4. 代码即策略:Agent用编程语言而非配置文件驱动实验,极大扩展了探索空间
  5. 两层安全兜底:为物理世界AutoResearch确立了安全基线

机器人研究的下半场,人类从"操作员"退到"看报告的人"——ENPIRE用8个AI Agent + 8台机器人 + 两重安全锁,把"自动化科研"从代码沙盒搬进了物理世界。


参考文献:NVIDIA GEAR Lab, “ENPIRE: Agentic Robot Policy Self-Improvement in the Real World”, 2026. 项目主页:https://research.nvidia.com/labs/gear/enpire/