Google I/O 2026: Agentic Era - Multi-Agent System Architecture and Self-Evolution Technology
I. Event Overview and Technical Background
1.1 A Historic Moment: Google I/O 2026
From May 19-20, 2026, Google held its annual developer conference Google I/O 2026 at the Shoreline Amphitheater in Mountain View, California. This event was not only the most prolific I/O in Google’s history (with 100 announcements), but also marked a pivotal transition for the AI industry—from “AI as an assistant tool” to “AI as an autonomous agent.”
Google CEO Sundar Pichai declared in his opening keynote: “The era of AI as a tool is over. What has arrived is AI that doesn’t just respond—it acts.” This announcement signifies a fundamental shift in how the entire tech industry perceives the boundaries of AI capabilities.
According to “Eight Stories That Defined the AI Week of May 18-25” (Digital Applied, 2026-05-25), Google I/O 2026’s core theme was defined as “Agentic Era,” encompassing complete technology stack reconstruction from underlying models to upper-layer applications.
1.2 Why Multi-Agent Systems as This Article’s Theme
This article selects Multi-Agent System and Self-Evolution Technology as its core theme for the following reasons:
- Technical Completeness: Multi-agent systems involve the complete technology stack including model layer, orchestration layer, tool layer, and data layer
- Business Urgency: Products like Microsoft Copilot Studio and Cursor Composer 2.5 were released within 72 hours of each other
- Engineering Innovation: The feat of 93 coordinated sub-agents building a complete operating system in 12 hours
- Academic Frontier: Fujitsu’s self-evolving multi-agent technology and joint research with Carnegie Mellon University
1.3 Key AI Industry Data This Week
| Metric | Data | Source |
|---|---|---|
| Gemini Monthly Token Processing | 3.2 quadrillion (7x YoY growth) | Google I/O 2026 |
| AI Mode Monthly Active Users | 1 billion | Google I/O 2026 |
| Gemini 3.5 Flash Pricing | $1.50 input / $9.00 output per M tokens | Google I/O 2026 |
| Antigravity 2.0 API Calls | 2.6 billion tokens, cost <$1,000 | Google I/O 2026 |
| Copilot Studio Billing | $0.04/step (standard model) | Microsoft TechCommunity |
II. Core Technology Analysis: Gemini 3.5 Flash and Agentic Architecture
2.1 Gemini 3.5 Flash: Foundation of the Speed Revolution
2.1.1 Technical Specifications and Performance Breakthrough
Gemini 3.5 Flash is Google’s high-speed model specifically optimized for persistent agent tasks, with its core design philosophy being “unification of economy and performance.”
Core Parameters:
- Inference Speed: 4x faster than GPT-5.5 and Opus 4.7
- Price Point: $1.50 input / $9.00 output per million tokens
- Cost Advantage: ~25% cheaper than Gemini 3.1 Pro, 3.3x cheaper input than GPT-5.5
- Context Window: Supports processing up to 1 million tokens of context
- Default Deployment: Default model for Gemini App and AI Mode globally
Google CEO Pichai revealed in the keynote:
“Using Antigravity and Gemini 3.5 Flash, we asked our agents to build a working operating system from scratch. 93 sub-agents, working in parallel, made over 15,000 model requests and processed 2.6 billion tokens—and consumed less than $1,000 of API credits.”
Significance of this number: $1,000 for 2.6 billion tokens means long-duration autonomous AI work is now economically viable.
2.1.2 Enterprise Application Scenarios
According to Digital Applied’s analysis, large enterprises can save over $1 billion per year by shifting 80% of AI workloads to Gemini 3.5 Flash. This pricing strategy transforms AI from “experimental projects” to “production-grade infrastructure.”
Typical Application Scenarios:
# Gemini 3.5 Flash Enterprise Application Example
# Source: Google Cloud Documentation & Antigravity 2.0 SDK
from google import genai
from google.genai import types
# Initialize client
client = genai.Client(
api_key=os.environ["GEMINI_API_KEY"],
http_options=types.HTTPOptions(api_version="v1alpha")
)
# Create high-speed agent task
def create_long_running_agent_task(prompt: str, max_steps: int = 100):
"""
Create a long-duration agent task
Suitable for: code generation, multi-step reasoning, cross-system automation
"""
response = client.models.generate_content(
model="gemini-3.5-flash",
contents=prompt,
config=types.GenerateContentConfig(
# Enable agent mode
automatic_function_calling=types.AutomaticFunctionCalling(
maximum_calls=max_steps
),
# Tool configuration
tools=[
types.Tool(code_execution=types.CodeExecutionConfig()),
types.Tool(file_utils=types.FileUtilsConfig()),
types.Tool(google_search=types.GoogleSearchConfig()),
],
# Thinking budget (balance speed and quality)
thinking_config=types.ThinkingConfig(
thinking_budget=2048 # 3.5 Flash optimized thinking budget
)
)
)
return response
# Example: Batch code refactoring task
def batch_code_refactoring(repo_paths: list[str], target_style: str):
"""
Batch code refactoring task
Typical scenario: 93 agents processing 2.6 billion tokens in parallel
"""
tasks = [
create_long_running_agent_task(
prompt=f"Please refactor all Python files in {repo_path}, "
f"applying Google style guide, target: {target_style}"
)
for repo_path in repo_paths
]
# Execute in parallel
results = client.models.generate_content_stream(
model="gemini-3.5-flash",
contents=tasks, # Batch processing
config=types.GenerateContentConfig(
automatic_function_calling=types.AutomaticFunctionCalling(
max_calls=50
),
thinking_config=types.ThinkingConfig(thinking_budget=1024)
)
)
return results
2.2 Gemini Omni: Ambitions of a World Model
2.2.1 From Multimodal to World Model
Gemini Omni is described by Google as a “World Model”—a definition that transcends traditional multimodal AI capabilities.
Technical Definition Differences:
| Model Type | Capability Description | Typical Representatives |
|---|---|---|
| Multimodal Model | Process and generate across text, image, audio, video formats | GPT-4V, Gemini Pro |
| World Model | Develop internal representations of physical world behavior, understand physics | Gemini Omni |
Google DeepMind CEO Demis Hassabis stated:
“Last year, I outlined our vision of extending Gemini’s incredible multimodal capabilities to become a world model AI that can understand and simulate the world. This is a crucial step toward achieving AGI—a step change in simulating kinetic energy and gravity.”
2.2.2 World Model Engineering Implementation
# Gemini Omni World Model API Example
# Source: Google DeepMind Gemini Omni Technical Documentation
from google.deepmind import omnimodel
import numpy as np
# Initialize world model
world_model = omnimodel.WorldModel.from_pretrained(
model_name="gemini-omni-1.0",
weights_path="gs://bucket/gemini-omni-weights"
)
class PhysicalSimulation:
"""Simulate physical environments using world models"""
def __init__(self, model: omnimodel.WorldModel):
self.model = model
self.physics_rules = self._load_physics_knowledge()
def predict_trajectory(
self,
initial_state: dict,
forces: list[dict],
time_steps: int
) -> list[dict]:
"""
Predict object trajectory under multi-step force applications
World model understands physics: kinetic energy, potential energy, gravity
"""
state = initial_state.copy()
trajectory = [state]
for t in range(time_steps):
# Build physics simulation prompt
prompt = self._build_physics_prompt(state, forces[t])
# Call world model to predict next state
prediction = self.model.predict_next_state(
current_state=state,
applied_forces=forces[t],
physics_constraints=self.physics_rules
)
state = prediction.next_state
trajectory.append(state)
return trajectory
def simulate_robot_planning(
self,
robot_description: dict,
task: str,
environment: str
) -> dict:
"""
Robot task planning
World model understands physical constraints and action consequences
"""
planning_prompt = f"""
Robot configuration: {robot_description}
Task objective: {task}
Environment description: {environment}
Please plan the robot's action sequence, ensuring:
1. Understanding of physical constraints (gravity, friction, inertia)
2. Prediction of consequences for each action
3. Optimization of energy consumption
"""
return self.model.generate_action_plan(
prompt=planning_prompt,
mode="physical_reasoning"
)
# Example: Predict projectile motion
simulation = PhysicalSimulation(world_model)
initial_state = {
"position": {"x": 0, "y": 0, "z": 100},
"velocity": {"x": 20, "y": 0, "z": 0},
"mass": 1.0,
"shape": "sphere"
}
gravity_force = {"type": "gravity", "magnitude": 9.8, "direction": (0, 0, -1)}
trajectory = simulation.predict_trajectory(
initial_state=initial_state,
forces=[gravity_force] * 100,
time_steps=100
)
III. Deep Dive: Agentic AI System Architecture
3.1 Architecture Design Principles
The Agentic AI system showcased at Google I/O 2026 follows these core architectural principles:
Three-Layer Core Architecture:
- Model Layer: Gemini 3.5 Flash, Gemini Omni provide reasoning capabilities
- Orchestration Layer: Antigravity 2.0 provides agent orchestration capabilities
- Application Layer: Search Agents, Gemini Spark provide user interaction
Four Key Characteristics of Agent Systems:
- Persistence: Transcends single conversations, running 24/7 in the background
- Multi-Step Reasoning: Plans and executes complex sequential tasks
- Autonomous Execution: Completes work rather than suggesting next steps
- Tool Calling: Invokes external tools and APIs
3.2 Supervisor Agent Architecture
# Supervisor Agent Core Implementation
# Source: Agent orchestration architecture based on Antigravity 2.0 SDK
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Awaitable
import asyncio
class TaskComplexity(Enum):
"""Task complexity classification"""
SIMPLE = 1 # Single-turn response
MODERATE = 2 # 2-5 step multi-step reasoning
COMPLEX = 3 # 5-20 step complex tasks
SUBAGENT = 4 # Requires sub-agent coordination
@dataclass
class AgentCapability:
"""Agent capability description"""
name: str
description: str
supported_complexity: list[TaskComplexity]
tool_definitions: list[dict]
estimated_cost_per_step: float
@dataclass
class ExecutionPlan:
"""Execution plan"""
plan_id: str
complexity: TaskComplexity
assigned_agents: list[str]
tool_sequence: list[dict]
estimated_steps: int
fallback_strategy: str
class SupervisorAgent:
"""
Supervisor Agent: Responsible for task planning and routing
Core orchestration component of Agentic AI systems
"""
def __init__(
self,
model_client,
agent_registry: dict[str, AgentCapability],
tool_registry: list[dict]
):
self.model = model_client
self.agents = agent_registry
self.tools = tool_registry
self.execution_history = []
async def plan_and_route(
self,
user_request: str,
context: dict = None
) -> ExecutionPlan:
"""
Core function: Analyze requests and plan execution routes
"""
# Step 1: Complexity analysis
complexity_prompt = f"""
Analyze the complexity level of the following request:
Request: {user_request}
Context: {context}
Evaluation criteria:
- How many reasoning steps are needed?
- Are external tool calls required?
- Is multi-agent coordination needed?
- Is long-term memory retrieval required?
"""
complexity_response = await self.model.generate(
prompt=complexity_prompt,
output_schema=TaskComplexity
)
complexity = complexity_response.parsed
# Step 2: Agent selection
eligible_agents = [
name for name, cap in self.agents.items()
if complexity in cap.supported_complexity
]
# Step 3: Tool planning
tools_prompt = f"""
Based on request "{user_request}", plan the required tool call sequence.
Available tools: {self.tools}
"""
tools_response = await self.model.generate(
prompt=tools_prompt,
output_schema=list[dict]
)
tool_sequence = tools_response.parsed
# Step 4: Generate execution plan
plan = ExecutionPlan(
plan_id=self._generate_plan_id(),
complexity=complexity,
assigned_agents=eligible_agents,
tool_sequence=tool_sequence,
estimated_steps=len(tool_sequence) + 1,
fallback_strategy=self._generate_fallback(complexity)
)
return plan
async def execute_with_subagents(
self,
plan: ExecutionPlan,
user_request: str
) -> dict:
"""
Execute complex tasks using sub-agents
"""
results = {}
if plan.complexity >= TaskComplexity.SUBAGENT:
# Launch multiple sub-agents in parallel
async def run_agent(agent_name: str):
agent_cap = self.agents[agent_name]
return await self._run_single_agent(
agent_name,
user_request,
plan.tool_sequence
)
# Parallel execution
agent_tasks = [
run_agent(name)
for name in plan.assigned_agents
]
agent_results = await asyncio.gather(*agent_tasks)
# Aggregate results
results["subagent_outputs"] = dict(
zip(plan.assigned_agents, agent_results)
)
results["final"] = await self._synthesize_outputs(
results["subagent_outputs"]
)
else:
# Single agent execution
primary_agent = plan.assigned_agents[0]
results["final"] = await self._run_single_agent(
primary_agent,
user_request,
plan.tool_sequence
)
return results
3.3 Specialized Agent Pool Implementation
3.3.1 Search Agent (Gemini Spark)
# Search Agent - Gemini Spark Implementation
# Source: Google Search Agents Technical Documentation
class SearchAgent:
"""
24/7 Persistent Search Agent
Core capabilities: Information monitoring, multi-source aggregation, proactive notification
"""
def __init__(
self,
search_api_client,
notification_service,
memory_store
):
self.search = search_api_client
self.notify = notification_service
self.memory = memory_store
async def create_persistent_monitor(
self,
topic: str,
criteria: dict,
notify_channels: list[str]
) -> str:
"""
Create a persistent monitoring task
Example: Monitor news updates for specific stocks
"""
monitor_id = f"monitor_{uuid.uuid4().hex[:8]}"
# Initialize monitoring configuration
monitor_config = {
"id": monitor_id,
"topic": topic,
"criteria": criteria,
"channels": notify_channels,
"frequency": "realtime", # realtime/hourly/daily
"last_check": None,
"state": "active"
}
await self.memory.save("monitors", monitor_config)
# Start background monitoring loop
asyncio.create_task(self._monitoring_loop(monitor_id))
return monitor_id
async def _monitoring_loop(self, monitor_id: str):
"""Background monitoring loop"""
while True:
config = await self.memory.load("monitors", monitor_id)
if config["state"] != "active":
break
# Execute search
results = await self.search.query(
query=config["topic"],
filters=config["criteria"],
since=config["last_check"]
)
# Check for new results
new_results = self._filter_new_results(
results,
config["last_check"]
)
if new_results:
# Send notification
await self.notify.send(
channels=config["channels"],
message=self._format_results(new_results),
priority="high" if self._is_urgent(new_results) else "normal"
)
# Update memory
config["last_check"] = datetime.now()
await self.memory.save("monitors", config)
# Wait for next check
await asyncio.sleep(self._get_interval(config["frequency"]))
async def execute_research_task(
self,
objective: str,
depth: str = "standard" # quick/standard/deep
) -> dict:
"""
Execute deep research task
Multi-step cross-source information aggregation
"""
research_phases = {
"quick": ["search", "summarize"],
"standard": ["search", "explore", "synthesize"],
"deep": ["search", "explore", "verify", "analyze", "synthesize"]
}
phases = research_phases[depth]
context = {}
for phase in phases:
if phase == "search":
# Multi-source search
context["search_results"] = await self._multi_source_search(objective)
elif phase == "explore":
# Deep exploration of related links
context["deep_dives"] = await self._explore_sources(
context["search_results"]
)
elif phase == "verify":
# Cross-validation of information
context["verified"] = await self._verify_claims(
context["deep_dives"]
)
elif phase == "analyze":
# Deep analysis
context["analysis"] = await self._analyze_data(context["verified"])
elif phase == "synthesize":
# Comprehensive output
context["final_report"] = await self._synthesize_report(context)
elif phase == "summarize":
# Quick summary
context["summary"] = await self._quick_summary(
context["search_results"]
)
return context.get("final_report") or context.get("summary")
3.3.2 Coding Agent (Antigravity Agent)
# Coding Agent - Antigravity 2.0 Implementation
# Source: Google Antigravity SDK Documentation
from google.antigravity import Agent, ToolRegistry, ProjectContext
from dataclasses import dataclass
@dataclass
class CodeTask:
"""Code task description"""
type: str # generate/refactor/test/analyze/migrate
language: str
scope: str # file/module/project
requirements: dict
constraints: dict
class AntigravityCodingAgent:
"""
Google Antigravity 2.0 Coding Agent
Core capabilities: Code generation, refactoring, testing, cross-file refactoring
"""
def __init__(
self,
model_client,
tool_registry: ToolRegistry,
project_context: ProjectContext
):
self.model = model_client
self.tools = tool_registry
self.context = project_context
async def generate_project_from_spec(
self,
spec: dict,
output_dir: str
) -> dict:
"""
Generate complete project from specification
This is the core capability showcase of Antigravity 2.0
"""
# Phase 1: Project structure planning
structure_plan = await self._plan_project_structure(spec)
# Phase 2: Dependency analysis
dependencies = await self._analyze_dependencies(structure_plan)
# Phase 3: Code generation (multi-agent parallel)
generated_files = await self._parallel_generate(structure_plan)
# Phase 4: Integration testing
test_results = await self._run_integration_tests(generated_files)
# Phase 5: Fix and optimize
if test_results["failures"]:
fixed_files = await self._fix_failures(
test_results["failures"],
generated_files
)
generated_files.update(fixed_files)
return {
"project_path": output_dir,
"files": list(generated_files.keys()),
"test_results": test_results,
"stats": self._calculate_stats(generated_files)
}
async def _parallel_generate(
self,
structure_plan: dict
) -> dict[str, str]:
"""
Multi-agent parallel code generation
Each sub-agent responsible for one module
"""
# Create sub-agent pool
agents = []
for module_spec in structure_plan["modules"]:
agent = AntigravitySubAgent(
module_name=module_spec["name"],
specifications=module_spec,
context=self.context,
tools=self.tools
)
agents.append(agent)
# Parallel generation
generation_tasks = [
agent.generate_code()
for agent in agents
]
results = await asyncio.gather(*generation_tasks)
return dict(zip(
[a.module_name for a in agents],
results
))
async def refactor_large_scale(
self,
scope: str, # "repo" for entire repository
target_style: str,
dry_run: bool = True
) -> dict:
"""
Large-scale code refactoring
Process codebases containing millions of tokens
"""
# Get codebase overview
codebase_summary = await self.context.get_summary(scope)
# Analyze files needing refactoring
files_to_refactor = await self._analyze_refactoring_targets(
codebase_summary,
target_style
)
# Batch processing (avoid token overflow)
batch_size = 50 # 50 files per batch
batches = self._chunk_files(files_to_refactor, batch_size)
results = []
for batch_num, batch in enumerate(batches):
# Create specialized agent for each batch
batch_agent = RefactoringBatchAgent(
files=batch,
style_guide=target_style,
context=self.context
)
batch_result = await batch_agent.execute()
results.append(batch_result)
# Progress report
progress = (batch_num + 1) / len(batches) * 100
print(f"Progress: {progress:.1f}% ({len(batch)} files)")
return self._aggregate_results(results)
IV. Self-Evolving Multi-Agent Technology: Fujitsu Takane Case Study
4.1 Technical Background and Innovations
On May 25, 2026, Fujitsu announced the development of self-evolving multi-agent technology, enabling multiple AI agents to perform tasks as a team, continuously and safely learning from daily execution results, human feedback, policy revisions, and specification changes.
Core Technical Breakthroughs:
- Automatic Prompt Adjustment: Agents autonomously identify reasons for success and failure, extracting actionable improvement suggestions
- No Human Intervention: Prompts, search methods, and evaluation criteria that previously required continuous expert adjustment are now completed autonomously by AI
- Domain Adaptation: Achieved an average accuracy improvement of 28 percentage points across multiple domains including healthcare, finance, and public administration
4.2 Self-Evolution Architecture Implementation
# Self-Evolving Multi-Agent System Core Implementation
# Source: Technical architecture based on Fujitsu EVE-Agent (arXiv:2605.22905)
from dataclasses import dataclass
from typing import Optional
from enum import Enum
import numpy as np
class ExperienceType(Enum):
"""Experience types"""
SUCCESS = "success"
FAILURE = "failure"
HUMAN_CORRECTION = "correction"
POLICY_CHANGE = "policy_change"
SPEC_UPDATE = "spec_update"
@dataclass
class Experience:
"""Execution experience record"""
experience_id: str
task_type: str
outcome: ExperienceType
input_data: dict
output_data: dict
human_feedback: Optional[dict]
timestamp: datetime
verified: bool = False
@dataclass
class LearnedInsight:
"""Learned insight"""
insight_id: str
pattern: str
trigger_conditions: dict
recommended_action: dict
confidence_score: float
source_experiences: list[str]
class SelfEvolvingAgent:
"""
Self-Evolving Agent Core Class
Key technologies: Continuous learning, safety verification, incremental improvement
"""
def __init__(
self,
base_model,
knowledge_base,
verification_system,
config: dict
):
self.model = base_model
self.knowledge = knowledge_base
self.verify = verification_system
self.config = config
# Learning state
self.experience_buffer = []
self.insights = []
self.improvement_proposals = []
async def execute_task_with_learning(
self,
task: dict,
context: dict = None
) -> dict:
"""
Execute task with continuous learning
"""
# Step 1: Retrieve relevant experience
relevant_insights = await self.knowledge.retrieve(
query=task,
top_k=5
)
# Step 2: Generate enhanced prompt
enhanced_prompt = self._enhance_prompt_with_insights(
base_prompt=task,
insights=relevant_insights
)
# Step 3: Execute task
result = await self.model.generate(enhanced_prompt)
# Step 4: Verify result
verification = await self.verify.check(
input_task=task,
output=result
)
# Step 5: Record experience
experience = Experience(
experience_id=str(uuid.uuid4()),
task_type=self._classify_task(task),
outcome=ExperienceType.SUCCESS if verification.valid
else ExperienceType.FAILURE,
input_data=task,
output_data=result,
human_feedback=None,
timestamp=datetime.now()
)
await self._record_experience(experience)
# Step 6: Generate improvement suggestions (async)
if not verification.valid:
asyncio.create_task(
self._generate_improvement_proposal(experience)
)
return result
async def _generate_improvement_proposal(
self,
failed_experience: Experience
) -> LearnedInsight:
"""
Generate improvement suggestions from failed experience
Core innovation: Analyze failure causes and propose actionable improvements
"""
# Build analysis prompt
analysis_prompt = f"""
Analyze the following failed experience and extract actionable improvement suggestions:
Failed task: {failed_experience.input_data}
Failed output: {failed_experience.output_data}
Verification result: {failed_experience}
Please analyze:
1. What is the root cause of the failure?
2. Which prompts/parameters need adjustment?
3. What are the new trigger conditions?
4. What specific actions are recommended?
"""
analysis = await self.model.generate(analysis_prompt)
# Extract insight
insight = LearnedInsight(
insight_id=str(uuid.uuid4()),
pattern=analysis["pattern"],
trigger_conditions=analysis["conditions"],
recommended_action=analysis["action"],
confidence_score=analysis["confidence"],
source_experiences=[failed_experience.experience_id]
)
# Safety verification (do not apply directly)
safety_check = await self.verify.safety_validation(insight)
if safety_check.approved:
# Only add to knowledge base after safety verification passes
await self.knowledge.add_insight(insight)
self.insights.append(insight)
return insight
async def process_human_feedback(
self,
experience_id: str,
feedback: dict
) -> None:
"""
Process human feedback and update knowledge
"""
experience = await self.knowledge.get_experience(experience_id)
experience.human_feedback = feedback
experience.outcome = ExperienceType.HUMAN_CORRECTION
# Learn from human correction
insight = await self._learn_from_correction(experience, feedback)
if insight:
await self.knowledge.add_insight(insight)
class MultiAgentCollaboration:
"""
Multi-Agent Collaboration Framework
Core capability of Fujitsu Takane
"""
def __init__(self, agents: list[SelfEvolvingAgent]):
self.agents = {agent.id: agent for agent in agents}
self.shared_knowledge = SharedKnowledgeStore()
self.coordination = AgentCoordinator()
async def execute_complex_task(
self,
task: dict,
agent_roles: dict
) -> dict:
"""
Coordinate multi-agents to execute complex tasks
"""
# Step 1: Task decomposition
subtasks = await self._decompose_task(task, agent_roles)
# Step 2: Assign agents
assignments = self._assign_agents(subtasks, agent_roles)
# Step 3: Execute in parallel
results = await asyncio.gather(*[
self._execute_subtask(agent_id, subtask)
for agent_id, subtask in assignments.items()
])
# Step 4: Aggregate results
aggregated = await self._aggregate_results(results)
# Step 5: Cross-agent knowledge sharing
await self._share_knowledge(results)
return aggregated
async def _share_knowledge(
self,
results: list[dict]
) -> None:
"""
Cross-agent knowledge sharing mechanism
One agent's experience benefits other agents
"""
for result in results:
if "learned_insights" in result:
for insight in result["learned_insights"]:
# Broadcast to all agents
for agent in self.agents.values():
if insight.is_relevant_to(agent.specialization):
await agent.knowledge.add_insight(insight)
4.3 Takane LLM Domain Adaptation Case
# Takane LLM Domain Adaptation Implementation
# Source: Fujitsu Kozuchi AI Platform
class TakaneDomainAdapter:
"""
Takane Domain Adapter
Core technology: Continuous learning from business execution results
"""
def __init__(self, base_model, domain: str):
self.base_model = base_model
self.domain = domain
self.performance_history = []
self.optimization_cycles = 0
async def auto_enhance(
self,
training_data: list[dict],
evaluation_criteria: dict,
target_improvement: float = 0.1
) -> dict:
"""
Auto-enhance domain-specific model
Multi-agents collaborate on data selection, training condition adjustment, evaluation, improvement
"""
print(f"Starting auto-enhancement for {self.domain}")
best_model = self.base_model
best_accuracy = 0.0
iteration = 0
max_iterations = 20
while iteration < max_iterations:
# Data selection agent
selected_data = await self._select_training_data(
training_data,
current_model=best_model
)
# Training condition adjustment agent
training_config = await self._optimize_training_config(
current_accuracy=best_accuracy,
target=target_improvement
)
# Train model
candidate_model = await self._train_model(
data=selected_data,
config=training_config
)
# Evaluation agent
evaluation = await self._evaluate_model(
candidate_model,
criteria=evaluation_criteria
)
if evaluation["accuracy"] > best_accuracy:
best_model = candidate_model
best_accuracy = evaluation["accuracy"]
print(f"Iteration {iteration}: New best accuracy = {best_accuracy:.2%}")
else:
# Analyze failure
failure_analysis = await self._analyze_failure(
candidate_model,
evaluation
)
await self._generate_improvement(failure_analysis)
# Check if target reached
if best_accuracy >= target_improvement:
break
iteration += 1
return {
"final_model": best_model,
"final_accuracy": best_accuracy,
"iterations": iteration,
"improvement": best_accuracy - 0.0 # Relative to baseline
}
async def apply_to_medical_records(
self,
unstructured_text: str
) -> dict:
"""
Medical record structured extraction example
Extract diagnostic names, progression stages, treatment strategies from unstructured medical records
"""
extraction_prompt = f"""
Extract structured information from the following medical record:
Raw text:
{unstructured_text}
Please extract:
1. Diagnostic name (ICD-10 code)
2. Disease progression stage (if applicable)
3. Treatment strategy
4. Medication regimen
5. Follow-up plan
"""
result = await self.base_model.generate(extraction_prompt)
return self._parse_structured_output(result)
# Practical Application Example
async def demonstrate_takane_enhancement():
"""
Demonstrate Takane auto-enhancement process
"""
# Initialize
base_model = load_base_model("gemini-pro")
adapter = TakaneDomainAdapter(
base_model=base_model,
domain="healthcare"
)
# Load medical data
medical_records = load_medical_dataset("hospital_records.jsonl")
# Define evaluation criteria
evaluation_criteria = {
"diagnostic_accuracy": 0.95,
"completeness": 0.90,
"format_correctness": 0.98
}
# Execute auto-enhancement
result = await adapter.auto_enhance(
training_data=medical_records,
evaluation_criteria=evaluation_criteria,
target_improvement=0.28 # 28 percentage points improvement reported by Fujitsu
)
print(f"Final accuracy: {result['final_accuracy']:.2%}")
print(f"Total iterations: {result['iterations']}")
V. Agentic AI Technology Standards and Ecosystem
5.1 Linux Foundation AI Agent Foundation (AAIF)
In May 2026, the Linux Foundation announced the formation of the AI Agent Foundation (AAIF), a consortium of over 30 tech giants aimed at creating open, universal standards for AI systems.
Founding Member Contributions:
| Company | Contribution | Technical Positioning |
|---|---|---|
| Anthropic | MCP (Model Context Protocol) | “USB-C” interface for AI system interoperability |
| OpenAI | Agents.md | AI tool description standardization |
| Block | Goose | Privacy-preserving offline agent |
| Agent tool integration | Cross-platform support |
5.2 MCP Protocol Implementation
# MCP (Model Context Protocol) Client Implementation
# Source: Anthropic MCP Documentation
from typing import Any
import json
class MCPClient:
"""
Model Context Protocol client
Implements universal interoperability between AI systems
"""
def __init__(self, server_url: str, auth_token: str):
self.server_url = server_url
self.auth_token = auth_token
self.tools = []
self.context_cache = {}
async def connect(self) -> None:
"""Connect to MCP server"""
async with aiohttp.ClientSession() as session:
# Get available tools list
response = await session.get(
f"{self.server_url}/tools",
headers={"Authorization": f"Bearer {self.auth_token}"}
)
self.tools = await response.json()
async def call_tool(
self,
tool_name: str,
arguments: dict
) -> Any:
"""
Cross-AI system tool calling
Standardized interface, different AIs can interoperate
"""
# Normalize request
mcp_request = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
},
"id": str(uuid.uuid4())
}
# Send request
async with aiohttp.ClientSession() as session:
response = await session.post(
f"{self.server_url}/rpc",
json=mcp_request,
headers={"Authorization": f"Bearer {self.auth_token}"}
)
result = await response.json()
return result["result"]
async def share_context(
self,
context_type: str,
data: Any
) -> None:
"""
Cross-AI system context sharing
Foundation for multi-agent collaboration
"""
self.context_cache[context_type] = {
"data": data,
"timestamp": datetime.now(),
"source": "mcp_client"
}
# Broadcast context update
await self._broadcast_context_update(context_type)
# Cross-platform tool registration example
async def register_cross_platform_tools():
"""Register tools shareable across multiple AI systems"""
mcp_client = MCPClient(
server_url="https://mcp.anthropic.com",
auth_token=os.environ["MCP_TOKEN"]
)
await mcp_client.connect()
# Register tool definitions (Agents.md format)
tools = [
{
"name": "search_web",
"description": "Search the web for information",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer"}
}
}
},
{
"name": "execute_code",
"description": "Execute Python code in sandbox",
"input_schema": {
"type": "object",
"properties": {
"code": {"type": "string"},
"language": {"type": "string"}
}
}
},
{
"name": "read_file",
"description": "Read a file from filesystem",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"encoding": {"type": "string"}
}
}
}
]
for tool in tools:
await mcp_client.register_tool(tool)
VI. Computer Use Agent Technology
6.1 Microsoft Copilot Studio Computer Use
# Microsoft Copilot Studio Computer Use SDK
# Source: Microsoft TechCommunity (GA: 2026-05-13)
from microsoft.copilot import ComputerUseAgent
from azure.identity import DefaultAzureCredential
from microsoft.purview import PurviewClient
class EnterpriseComputerAgent:
"""
Enterprise-grade Computer Use agent
Features: Purview logging, Key Vault credentials, Step-by-step billing
"""
def __init__(
self,
credentials: DefaultAzureCredential,
purview_client: PurviewClient
):
self.agent = ComputerUseAgent(
credential=credentials,
billing_model="per_step"
)
self.purview = purview_client
self.session_config = {
"log_to_purview": True,
"credential_source": "key_vault"
}
async def execute_automated_workflow(
self,
workflow_definition: dict,
initial_data: dict = None
) -> dict:
"""
Execute automated workflow
Typical scenario: Complete business process across multiple systems
"""
# Create session
session = await self.agent.create_session(
config=self.session_config
)
try:
# Execute workflow steps
results = []
current_state = initial_data or {}
for step in workflow_definition["steps"]:
# Execute single step
step_result = await session.execute_step(
action=step["action"],
target=step["target"],
parameters=step.get("params", {}),
context=current_state
)
results.append({
"step": step["name"],
"status": step_result.status,
"output": step_result.output
})
# Update state
current_state.update(step_result.state_updates)
# Approval check (configurable human review)
if step.get("require_human_approval"):
approval = await self._wait_for_approval(
session.id,
step_result
)
if not approval.approved:
return {"status": "rejected", "step": step["name"]}
return {
"status": "completed",
"results": results,
"final_state": current_state
}
finally:
# Close session (Purview has recorded complete logs)
await session.close()
async def fill_business_form(
self,
form_template: dict,
data_source: str
) -> dict:
"""
Fill business form example
Extract information from data source and fill into target system
"""
workflow = {
"name": "form_filling_workflow",
"steps": [
{
"name": "extract_data",
"action": "read_document",
"target": data_source
},
{
"name": "parse_information",
"action": "extract_fields",
"target": "document"
},
{
"name": "navigate_to_form",
"action": "open_url",
"target": form_template["url"]
},
{
"name": "fill_fields",
"action": "fill_form",
"target": "form",
"params": {"fields": form_template["fields"]}
},
{
"name": "submit",
"action": "click_button",
"target": "submit_button"
}
]
}
return await self.execute_automated_workflow(workflow)
# Billing example
def calculate_computer_use_cost(steps_count: int) -> float:
"""
Calculate Computer Use cost
Microsoft standard model: 5 Copilot Credits/step
Approximately $0.04/step (prepaid)
"""
credits_per_step = 5
cents_per_credit = 0.008 # $0.008/credit (prepaid plan)
total_cost_cents = steps_count * credits_per_step * cents_per_credit
return total_cost_cents / 100 # Convert to USD
# Example: 4-step form filling task cost
example_cost = calculate_computer_use_cost(steps_count=4)
print(f"4-step form filling task cost: ${example_cost:.4f}") # ~$0.0016
6.2 Claude Computer Use with Project Glasswing
# Anthropic Claude Computer Use Implementation
# Source: Anthropic Project Glasswing
from anthropic import Anthropic
from tools import bash, read_file, write_file, browser
class ClaudeComputerAgent:
"""
Claude Computer Use agent
Supports browser, filesystem operations
Secure enterprise deployment with MCP
"""
def __init__(self, api_key: str):
self.client = Anthropic(api_key=api_key)
self.available_tools = {
"bash": bash,
"read_file": read_file,
"write_file": write_file,
"browser": browser
}
async def execute_with_tools(
self,
task: str,
tool_permissions: list[str] = None
) -> dict:
"""
Claude agent task with tool execution
"""
# Restrict available tools (security policy)
allowed_tools = tool_permissions or list(self.available_tools.keys())
tools = {
name: self.available_tools[name]
for name in allowed_tools
}
# Execute conversation
message = await self.client.messages.create(
model="claude-sonnet-4.6",
max_tokens=4096,
messages=[
{
"role": "user",
"content": task
}
],
tools=[
self._format_tool_def(name)
for name in allowed_tools
]
)
# Process tool calls
while message.stop_reason == "tool_use":
tool_results = []
for tool_use in message.tool_calls:
tool_name = tool_use.name
tool_args = tool_use.input
# Execute tool
result = await self._execute_tool(tool_name, tool_args)
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": result
})
# Continue conversation
message = await self.client.messages.create(
model="claude-sonnet-4.6",
max_tokens=4096,
messages=[
{"role": "user", "content": task},
message.to_dict(),
{"role": "user", "content": tool_results}
],
tools=[self._format_tool_def(name) for name in allowed_tools]
)
return {
"response": message.content,
"tool_calls_count": message.usage.total_tokens
}
VII. Industry Impact and Future Outlook
7.1 Industry Impact of Agentic AI
Google I/O 2026 and other releases this week mark the AI industry entering a new development phase:
Short-term Impact (0-6 months):
- Enterprise AI workflow automation costs drop significantly
- Multi-agent systems move from research to production
- Computer Use agents become standard for enterprise digital transformation
Medium-term Impact (6-18 months):
- AI Agent ecosystem standardization (MCP/Agents.md)
- Fundamental changes in search experience
- AI programming enters mainstream development process
Long-term Impact (18+ months):
- AI evolves from “assistant tool” to “digital colleague”
- World models drive physical AI development (robotics, autonomous driving)
- Self-evolving AI solves professional talent shortage problems
7.2 Technical Challenges and Risks
Technical challenges requiring attention:
- Reliability of multi-agent systems: Is the success of 93-agent collaboration reproducible?
- Security boundaries: Permission control for Computer Use agents
- Auditability: Enterprise compliance requirements
- Safety of self-evolving AI: Autonomous learning may cause unexpected behaviors
7.3 Developer Action Recommendations
Immediate Actions (This week):
- Evaluate Gemini 3.5 Flash migration possibilities
- Try Antigravity 2.0 CLI
- Understand MCP protocol ecosystem
Short-term Planning (30 days):
- Design multi-agent system architecture patterns
- Establish AI agent monitoring and logging mechanisms
- Evaluate Copilot Studio enterprise deployment plans
Medium-term Preparation (90 days):
- Reassess SEO and content strategies (AI search impact)
- Plan AI Agent product integration
- Participate in AAIF standard discussions
VIII. Conclusion
Google I/O 2026 showed us the next frontier of AI technology: the Agentic AI Era. In this era, AI is no longer a passive tool for answering questions, but an intelligent agent capable of proactive planning, collaborative execution, and continuous evolution.
Key Technical Takeaways:
- Gemini 3.5 Flash: Speed revolution, $1,000 for 2.6 billion tokens
- Antigravity 2.0: Multi-agent orchestration platform, 93 agents working in coordination
- Gemini Omni: World model, understanding physical laws
- Fujitsu Self-Evolution Technology: AI autonomous learning, 28 percentage point accuracy improvement
- Computer Use: Cross-system automation, $0.04/step enterprise-grade solution
Ecosystem Building Highlights:
- MCP Protocol: “USB-C” for AI interoperability
- Agents.md: Standardization of tool descriptions
- AAIF: Open standards led by Linux Foundation
The AI Agentic Era has arrived. Developers need to shift from “calling AI APIs” to “designing AI workflows,” from “single interactions” to “continuous collaboration.” This transformation will reshape software engineering practices and redefine the relationship between humans and AI.
References
- Google I/O 2026 Official Announcements - https://io.google/2026
- Digital Applied - “Eight Stories That Defined the AI Week of May 18-25” (2026-05-25)
- Fujitsu - “Self-evolving multi-AI agent technology” (2026-05-25)
- Microsoft TechCommunity - “Copilot Studio Computer Use GA” (2026-05-13)
- RankMeTop - “Google I/O 2026: The Rise of AI Agents” (2026-05-25)
- arXiv - EVE-Agent: Evidence-Verifiable Self-Evolving Agents (2605.22905)
- Linux Foundation AI Agent Foundation - https://linuxfoundation.org