Exploring AI Agent Frameworks: Deconstructing OpenHands (9) --- AgentController
0x00 Overview
A mature agent system must provide external control interfaces without interfering with the agent’s autonomous decision-making. For example, users may need to pause tasks to adjust parameters, terminate execution upon detecting errors, or even switch agent roles mid-task. This full lifecycle controllability requires the workflow layer to design flexible state machines and event-triggered mechanisms to find a perfect balance between autonomy and controllability.
AgentController It is the core controller in the OpenHands framework that manages the event handling and state transition of intelligent agents. It is responsible for receiving event stream callbacks, forwarding events to child agents (delegates), processing core events such as actions and observations, and deciding whether to trigger the next operation of the intelligent agent. It is a key component for coordinating the rhythm of intelligent agent operation.
Because this series draws on a large number of articles, there may be some articles missing from the references. If so, please point them out.
0x01 Why is an AgentController needed?
Let’s look at why we need an AgentController.
1.1 Problem
1.1.1 Probability Bias
Agents tend to be obedient and compliant in controlled environments, but they often err once deployed to a real production line. Specifically, these deviations can be broadly categorized into the following basic types:
- Loss of focus – the conversation goes astray, the replies are lengthy, and the user’s real needs are drowned out.
- Misaligned scope – failing to distinguish which tasks belong to oneself, taking on tasks against one’s will, resulting in a mess.
- Illusion—fabricating facts out of thin air and packaging misinformation as if it were true.
- Compliance failure – the most dangerous one, touching the red line of business or the high-pressure line of law, such as giving financial or medical advice without authorization.
- Inaccurate instructions – the system ignores the “ironclad rules” written by the developers in the system prompts, and the critical logic fails instantly.
This exposes a fundamental shortcoming of current development methods—we are trying to use a soft, natural language prompt to strangle a black box driven by probability. The underlying reason is that the current mainstream “large language model agent” essentially deals with probability: the same question can elicit two completely different answers. As a result, the agent’s final behavior is very much like opening a blind box—before deployment, no one can predict which side it will get, making the production environment fraught with danger.
Some researchers have proposed that the operation of an agent is essentially a probability chain.
Since the essence is probability, the design work is no longer “teaching the model to speak,” but “manipulating probability.” Whether the agent is writing code, acting as customer service, or playing a game, its ultimate goal is the same: to maximize the probability of the “correct action sequence” occurring given the context.
We are not managing entities with rigorous logical reasoning, but rather a probabilistic device that operates on pattern matching. Therefore, attempting to eradicate these biases with an ever-expanding “gentleman’s agreement” of natural language has proven inefficient and unreliable. Only by breaking down the probability chain into measurable, calibrable, and verifiable links can agents truly be made to obey in the production line.
1.1.2 The Heartbeat and Survival of Long Missions
Unlike the traditional “request-response” model, AI agents often need to handle long-term tasks that last for minutes or even hours. The special feature of these tasks is that they need to cope with unexpected situations such as network fluctuations and resource scheduling during execution, while maintaining state continuity. This requires the underlying workflow architecture to have strong state persistence capabilities, accurately record every detail of the task’s progress, and ensure seamless recovery from the breakpoint even if an interruption occurs. This places stringent requirements on data storage and process design.
In intelligent systems supporting long-cycle tasks, the AgentController plays a core role as both the “heart” and the “command center.” It neither directly writes task code (that’s the Agent’s responsibility) nor builds the runtime environment (that’s the Runtime’s function). Instead, it focuses on ensuring the smooth progress of the task throughout its entire lifecycle, specifically addressing the engineering challenges faced by long-cycle tasks, including maintaining state continuity, lifecycle management, and ensuring system observability. Its core value lies in decoupling “scheduling logic” from “decision-making logic”: allowing the Agent to focus on the task’s own thinking and execution, while the AgentController controls the overall process, making the system more stable and controllable.
1.1.3 Is the model the same as the intelligent agent? Logical closed loop ≠ System closed loop
There’s a prevailing view in the industry that “every large language model (LLM) will become an intelligent agent in the future.” Judging from the surface of technological development, this assessment seems reasonable. Whether it’s Claude, ChatGPT, or Gemini, these mainstream large language models have already embedded capabilities such as tool calls, function execution, and even file processing, seemingly possessing the basic conditions for “thinking + acting.”
However, a deeper analysis reveals that these capabilities merely constitute a “logical closed loop,” far from reaching the level of a “system closed loop.” Based on this, I am more optimistic about the development potential of “agent runtime layers” like n8n, for three main reasons:
- Not all teams have the engineering capabilities to build complex systems.
- Many companies are unwilling to entrust their core data to closed platforms due to data security concerns.
- Intelligent agents inevitably encounter various problems during cloud execution, and not all of them can achieve error-free operation.
The most crucial difference lies in this: while models can “call” tools, they cannot truly “execute” system operations. Take Claude’s code interpreter and GPT’s function call functionality as examples; they essentially only return a call description, and the actual execution still relies on an external system like the AgentController. This is like the model being the “brain” of an intelligent agent, capable of making precise decisions, but lacking the “body” to implement those decisions; while the AgentController is the “execution layer” that transforms abstract instructions into concrete actions. Especially when application scenarios are highly customized and frequently used, the optimal solution is not to send requests to a large language model every time, but rather to build a complete system that can be repeatedly run, monitored in real-time, and maintained long-term.
Even the smartest models will hit a wall after the Nth call—the cost wall, the compliance wall, the illusion wall. The future competition won’t be about whose LLM is better at “thinking,” but about who can encapsulate that thinking into a long-lifecycle system that is “repeatable, verifiable, and auditable.” The key to success for agents isn’t an arms race in terms of parameters, but the engineering endurance to “keep the heartbeat, keep the process running smoothly, and stay within budget.” The brain can iterate, but the body must be resilient; those who control this “body” truly hold the ticket to the era of intelligent agents.
1.2 Solution Approach
An effective solution is to shift from “expecting the model to develop its own target” to “proactively building a framework for guidance.” Specifically, this involves using a well-designed external framework to proactively and systematically guide the model’s behavior, while simultaneously constraining and validating it.
The most crucial understanding in engineering practice is that what you need to do is not simply “call the model once,” but design a closed-loop system that can continuously interact in multiple rounds. The evolution from “large models” to “Agentic AI” is essentially a shift from simply “piling up computing power and intelligence” to the more complex “construction of systems engineering.” Its core principle is to design the agent as a “complete system,” rather than merely as “the superposition of model capabilities.”
Based on this principle, there are two key directions that need to be grasped:
- Equal importance should be given to model selection, tool design, process orchestration strategies, and governance during the operational phase.
- Tools should be viewed as units of capability with strict contracts, and should not be designed as “universal functions” with ambiguous functions.
The purpose of this is to ensure that the powerful capabilities of Large Language Models (LLMs) always function within a pre-defined safety framework. At the heart of this approach is the shift in AI Agent development from an experience-based “alchemy experiment” to a systematic engineering approach, and from an “artistic creation” that relies on experience and luck to a “rigorous engineering practice” that is predictable, manageable, and verifiable.
1.3 Anthropic Blog
Anthropic outlined the failure modes and solutions for long-running agents in his blog, as follows:
| question | Initialize Agent behavior | Encoding Agent Behavior |
|---|---|---|
| Claude prematurely declared the entire project a success. | Create a feature list file: Based on the input specifications, generate a structured JSON file that lists the end-to-end feature descriptions. | At the start of the session, the feature list file is read, and only one feature is selected to begin implementation. |
| Claude leaves behind environments with bugs or undocumented progress. | Initialize the Git repository and write a progress note file. | At the start of the session, read progress notes and Git commit logs, and run basic tests on the development server to catch undocumented defects; at the end of the session, commit Git and update progress. |
| Claude prematurely marked the feature as complete. | Create a feature list file. | Self-validate all features and only mark them as “pass” after careful testing. |
| Claude needs to spend time figuring out how to run the application. | Write an init.sh script that can start the development server. | The init.sh file is read at the start of the session. |
0x02 AgentController
Current AI agent systems generally follow a common architecture, mainly consisting of three components:
- The LLM backend responsible for “thinking”.
- The tool framework responsible for “execution”.
- Control loop responsible for coordination.
The AgentController class corresponds to the third part, which is the main entry point and coordinator for user queries. It is responsible for managing the entire event loop, receiving events generated by the execution logic, collaborating with other components to process and submit event operations, and forwarding the processed events upstream (such as the UI). Essentially, it drives the dialogue round by round based on the generated events.
2.1 Definition
The main code of AgentController is as follows:
class AgentController:
# 控制器唯一标识ID
id: str
# 被控制的Agent实例(核心决策组件)
agent: Agent
# Agent执行的最大迭代次数(防止无限循环)
max_iterations: int
# 事件流实例(组件间通信的核心枢纽)
event_stream: EventStream
# 当前系统状态(包含完整上下文信息)
state: State
# 动作确认模式开关(开启时需确认后才执行Agent动作)
confirmation_mode: bool
# Agent名称到LLM配置的映射(用于委托代理场景)
agent_to_llm_config: dict[str, LLMConfig]
# Agent名称到Agent配置的映射(用于委托代理场景)
agent_configs: dict[str, AgentConfig]
# 父控制器实例(存在层级委托时非空)
parent: 'AgentController | None' = None
# 委托的子控制器实例(当前控制器委托任务时非空)
delegate: 'AgentController | None' = None
# 待处理的动作信息:元组包含动作对象和时间戳(记录动作创建时间)
_pending_action_info: tuple[Action, float] | None = None
# 控制器关闭状态标记(True表示已关闭,不再处理任务)
_closed: bool = False
# 缓存的第一条用户消息(用于初始化上下文等场景)
_cached_first_user_message: MessageAction | None = None
def __init__(
self,
agent: Agent,
event_stream: EventStream,
conversation_stats: ConversationStats,
iteration_delta: int,
budget_per_task_delta: float | None = None,
agent_to_llm_config: dict[str, LLMConfig] | None = None,
agent_configs: dict[str, AgentConfig] | None = None,
sid: str | None = None,
file_store: FileStore | None = None,
user_id: str | None = None,
confirmation_mode: bool = False,
initial_state: State | None = None,
is_delegate: bool = False,
headless_mode: bool = True,
status_callback: Callable | None = None,
replay_events: list[Event] | None = None,
security_analyzer: 'SecurityAnalyzer | None' = None,
):
"""初始化AgentController类的新实例。
参数:
agent: 被控制的Agent实例。
event_stream: 用于发布事件的事件流实例。
conversation_stats: 对话统计信息实例(记录交互指标等)。
iteration_delta: Agent可执行的最大迭代次数。
budget_per_task_delta: 每个任务允许的最大预算(单位:美元),超出则停止Agent。
agent_to_llm_config: Agent名称到LLM配置的映射字典(用于委托给其他Agent时)。
agent_configs: Agent名称到Agent配置的映射字典(用于委托给其他Agent时)。
sid: Agent的会话ID。
file_store: 文件存储实例(用于状态持久化等)。
user_id: 用户唯一标识。
confirmation_mode: 是否启用Agent动作的确认模式。
initial_state: 控制器的初始状态。
is_delegate: 该控制器是否为委托控制器(子控制器)。
headless_mode: Agent是否以无头模式运行(无GUI交互)。
status_callback: 处理状态更新的可选回调函数。
replay_events: 用于回放的事件日志列表。
security_analyzer: 安全分析器实例(用于动作安全校验)。
"""
# 初始化控制器ID:优先使用传入的sid,否则使用事件流的sid
self.id = sid or event_stream.sid
# 记录用户ID
self.user_id = user_id
# 记录文件存储实例
self.file_store = file_store
# 绑定被控制的Agent
self.agent = agent
# 记录无头模式状态
self.headless_mode = headless_mode
# 标记当前控制器是否为委托控制器
self.is_delegate = is_delegate
# 绑定对话统计实例
self.conversation_stats = conversation_stats
# 先设置事件流,后续可能需要订阅事件
self.event_stream = event_stream
# 非委托控制器需要订阅事件流,以接收并处理系统事件
if not self.is_delegate:
self.event_stream.subscribe(
EventStreamSubscriber.AGENT_CONTROLLER, # 订阅者类型(标识为Agent控制器)
self.on_event, # 事件回调处理函数
self.id # 订阅者ID(当前控制器ID)
)
# 初始化状态跟踪器:负责状态的管理、持久化与恢复
self.state_tracker = StateTracker(sid, file_store, user_id)
# 设置初始状态:支持从历史会话状态、父Agent状态或全新状态初始化
self.set_initial_state(
state=initial_state, # 传入的初始状态(可能为None)
conversation_stats=conversation_stats, # 对话统计信息
max_iterations=iteration_delta, # 最大迭代次数
max_budget_per_task=budget_per_task_delta, # 任务最大预算
confirmation_mode=confirmation_mode, # 动作确认模式
)
# 将状态跟踪器中的状态赋值给控制器的state属性
# 注意:此处为了向后兼容暂时共享状态,后续应将状态逻辑统一迁移到状态管理器
self.state = self.state_tracker.state
# 初始化Agent到LLM配置的映射:无传入配置则设为空字典
self.agent_to_llm_config = agent_to_llm_config if agent_to_llm_config else {}
# 初始化Agent配置映射:无传入配置则设为空字典
self.agent_configs = agent_configs if agent_configs else {}
# 记录初始的最大迭代次数(用于后续重置等场景)
self._initial_max_iterations = iteration_delta
# 记录初始的任务最大预算(用于后续重置等场景)
self._initial_max_budget_per_task = budget_per_task_delta
# 初始化卡顿检测器:用于识别Agent是否陷入执行卡顿
self._stuck_detector = StuckDetector(self.state)
# 绑定状态回调函数(用于对外通知状态更新)
self.status_callback = status_callback
# 初始化回放管理器:用于处理事件回放场景
self._replay_manager = ReplayManager(replay_events)
# 记录动作确认模式状态
self.confirmation_mode = confirmation_mode
# 绑定安全分析器实例(用于动作的安全校验)
self.security_analyzer = security_analyzer
# 向事件流中添加系统消息(初始化Agent的系统上下文等)
self._add_system_message()
2.2 Core Responsibilities
The core responsibilities of AgentController can be summarized into three pillars:
- It listens to the event stream. As the core subscriber to the system’s event stream, it captures all critical changes—whether user commands, agent decision outputs, or environmental feedback. This capability makes it the system’s “sensory center,” ensuring comprehensive control over global dynamics.
- The management state machine is responsible for maintaining the current state of tasks and triggering precise state transitions based on received events. For example, when user confirmation is required during Agent execution, the AgentController will proactively pause system operation, wait for user input, and then resume the process, ensuring the continuity of task execution.
- State management: Maintain the agent’s running status (RUNNING, STOPPED, ERROR, FINISHED, etc.).
- Initialization control (creating and configuring agent instances).
- Operational control: Controls the execution steps and iterations of the agent.
- Shutdown process: Gracefully shut down the proxy and clean up resources.
- It drives the Agent to run. It propels the system forward iteratively through the
step()method. This method is not an infinite loop, but rather adopts an “event-triggered” mechanism—it is only activated after receiving a specific observation, prompting the Agent to consider the next action in response to new situations. This ensures both the efficiency of task progress and avoids unnecessary waste.
The architectural and deterministic approach introduced by AgentController is not intended to limit the potential of large language models. On the contrary, this approach builds a “container” for these potentials, allowing them to safely unleash their value in production environments. Its core function is to encapsulate probabilistic models with uncertain behavior into software components with relatively controllable behavior and predictable risks. For developers and decision-makers who hope to truly implement AI technology in mission-critical businesses, the core competitiveness in the future lies not only in the ability to use the most advanced models, but also in mastering mature methodologies and robust frameworks for managing these models. Giving AI agents the reins of “determinism,” making their behavior controllable and risks manageable, is the necessary path for them to move from “technical showmanship” to “actual value creation.”
In other words, the agent framework should do things that are orthogonal to the improvement of model capabilities, because as long as the model improves, the engineering work will be wasted.
2.3 Specific Functions
The specific functions of AgentController are as follows:
-
Agent lifecycle management
State Management: Maintains the agent’s running state (RUNNING, STOPPED, ERROR, FINISHED, etc.).
Initialization Control: Creates and configures agent instances.
Runtime Control: Controls the agent’s execution steps and iterations.
Shutdown Processing: Gracefully shuts down the agent and cleans up resources. -
Event Handling and Dispatch
Event Subscription: Subscribe to events in the EventStream.
Event Routing: Dispatch events to the appropriate processing logic.
Action Handling: Handle various actions generated by the delegate.
Observation Handling: Handle observations returned by the environment. -
Agent Execution Control
Step Execution: Controls the agent’s execution step by step using the
_step()method.
Iteration Limit: Controls the maximum number of execution steps for the agent.
Budget Management: Manages the budget limits for tasks (based on cost).
Die-out Detection: Detects and handles situations where the agent gets stuck in a loop. -
Delegation Mechanism Management
Sub-Agent Creation: Supports creating delegated agents to handle sub-tasks.
Hierarchical Management: Manages the hierarchical relationships between agents.
Result Aggregation: Collects and processes the execution results of delegated agents. -
Security and Confirmation Mechanisms
Security Analysis: Integrates a security analyzer to assess the risk of actions.
Confirmation Mode: Requests user confirmation before high-risk operations.
Access Control: Controls the types of operations that the agent can perform. -
State Tracking and Persistence
State Tracking: Tracks agent state via StateTracker.
History: Maintains agent execution history.
State Saving: Persists agent state for recovery. -
Error Handling and Recovery
Exception Handling: Capturing and handling various runtime exceptions.
Error States: Placing the agent in an appropriate error state.
Recovery Mechanism: Providing the ability to recover from error states. -
Metrics Monitoring
Cost Tracking: Track API call costs.
Token Usage: Monitor the use of prompts and completion tokens.
Performance Metrics: Collect execution performance data. -
Replay and Debugging
Event Replay: Supports replaying historical events for debugging.
Track Recording: Records the agent’s execution trajectory. -
Multi-agent coordination
Parent-child agent coordination: Manages message passing between parent and child agents.
Resource sharing: Shares resources and state among agents.
2.4 Organizational Structure
The architecture of AgentController is as follows.
OpenHands-AgentController-System

2.5 Multiple Instances
As can be seen from the code, AgentController is not a singleton. Multiple instances can be created.
The AgentController class has a standard init method that allows the creation of multiple instances, each with a unique id attribute (usually a session ID):
- Delegation pattern: The code explicitly supports parent-delegate relationships, where one AgentController can create another AgentController as a delegate.
- One instance per session: Each user session or task typically creates a separate AgentController instance, distinguished by the sid (session ID) parameter.
- Instance properties: Each instance has its own independent properties such as state, event stream, and agent.
Therefore, AgentController is a regular class, not a singleton, and multiple instances can be created to manage different agent sessions. The number of AgentControllers in a project is dynamic and depends on:
- Number of concurrent sessions (one main controller per session).
- Number of delegated tasks per session (one delegated controller per delegated task).
The system dynamically creates and destroys AgentControllers based on actual usage. Each session has one main controller.
- Whenever a user starts a new session or task, a main AgentController instance is created to delegate the agent controller.
- When the main agent needs to delegate a subtask to another agent, a new AgentController instance is created for each delegated task. The delegate controller instance is created in the start_delegate method.
2.6 Workflow
The main workflow of AgentController is to initialize the agent, manage its state, and drive the main loop, gradually propelling the agent forward. Specifically:
- Initialization: Create a proxy instance and set its initial state.
- Event listeners: Subscribe to the event stream and process incoming events.
- Decision execution: Decide whether to execute the delegated steps based on the event.
- Action generation: Allows the agent to generate the next action.
- Results processing: Processing the observation results after the actions are performed.
- Status Update: Update the agent status and decide on the next action.
OpenHands-AgentController-Workflow

0x03 Key Features
3.1 Agent Routing
In multi-agent systems, “routing” essentially means ensuring that tasks are accurately routed to the appropriate processing unit. In engineering, this step is usually implemented through tool invocation mechanisms. Specifically, when the system triggers a tool invocation, it first parses the AgentDelegateAction instruction—this instruction AgentController contains specialized processing logic. Once a match is found, a brand-new delegate agent is automatically launched. Subsequently, all related event flows are forwarded to this delegate agent, which then handles the subsequent processing.
OpenHands’ routing solution is better suited for simple, unidirectional task flows. For multi-level routing, nesting is necessary AgentController. However, in practical engineering, multi-agent routing is not recommended. The core reason is that this approach makes the system architecture bloated, increasing development and debugging complexity and introducing many uncontrollable factors—such as communication latency between agents and state synchronization errors.
In many cases, a single agent paired with multiple tools is sufficient to meet the needs, and there is absolutely no need to use a multi-agent approach. If you encounter a scenario where a single agent struggles to handle it, you can first try MicroAgent this type of prompt enhancement solution. By optimizing the instruction logic, you can improve the processing capabilities of the single agent and thus avoid the complex design brought about by a multi-agent approach.
3.1.1 Process
Action distribution and delegated agent management are the core logic of task flow and multi-agent collaboration in OpenHands multi-agent systems. Main functions include:
- Action type routing: Based on the type of the input action (state change, message, delegate start, task completion/rejection), it is dispatched to the corresponding processing method to achieve logical decoupling.
- Message processing: Distinguish between messages from users and agents, and perform log recording, dynamic recall strategies (first message recall context, non-first message recall knowledge base), and agent state switching.
- Delegated agent startup: Supports subtask splitting, allowing the creation of delegated agents to handle subdivided tasks, inheriting the parent agent’s configuration (iteration limits, budget, metrics), while maintaining independent session identifiers and event logging scope.
- State and metric management: Ensures synchronization of parent and child agent states and accumulation of global metrics, and supports state marking and result saving after task completion/rejection.
openhands-8-1

3.1.2 Code
Main features
- Modular design: Action processing is split by type, with each type corresponding to independent logic, which is easy to extend and maintain (for example, adding a new action type only requires adding branch judgment).
- Multi-agent collaboration: Through delegation hierarchical labeling, shared event streams, and index reuse, efficient collaboration between parent and child agents is achieved, supporting the splitting and parallel processing of complex tasks.
- Dynamic adaptation capability: During message processing, the recall strategy is automatically switched based on whether it is the first user interaction, thereby improving the accuracy of information retrieval.
- Robustness guarantee: By using assertion verification, default configuration as a fallback, and state inheritance mechanism, uncontrollable factors in multi-agent collaboration are reduced, ensuring system stability.
- Traceability: The design of delegated agent session ID, event start ID, parent indicator snapshot, etc., facilitates task flow and problem troubleshooting.
The code is as follows.
async def _handle_action(self, action: Action) -> None:
"""处理来自智能体或委托智能体的动作。
根据动作的不同类型,分发到对应的处理逻辑,实现状态变更、消息处理、委托启动等核心功能。
"""
# 处理"更改智能体状态"动作:直接更新智能体状态
if isinstance(action, ChangeAgentStateAction):
await self.set_agent_state_to(action.agent_state) # type: ignore
# 处理"消息"动作:转发到专门的消息处理方法
elif isinstance(action, MessageAction):
await self._handle_message_action(action)
# 处理"启动委托智能体"动作:初始化并启动子智能体处理子任务
elif isinstance(action, AgentDelegateAction):
await self.start_delegate(action)
# 断言委托智能体已成功创建(确保后续逻辑安全执行)
assert self.delegate is not None
# 如果动作中包含"task"参数,向事件流添加任务消息,通知委托智能体
if 'task' in action.inputs:
self.event_stream.add_event(
MessageAction(content='TASK: ' + action.inputs['task']),
EventSource.USER,
)
# 将委托智能体状态设置为运行中,开始处理子任务
await self.delegate.set_agent_state_to(AgentState.RUNNING)
return # 委托启动后,当前动作处理结束
# 处理"智能体完成任务"动作:保存输出结果并标记状态为完成
elif isinstance(action, AgentFinishAction):
self.state.outputs = action.outputs
await self.set_agent_state_to(AgentState.FINISHED)
# 处理"智能体拒绝任务"动作:保存输出结果并标记状态为拒绝
elif isinstance(action, AgentRejectAction):
self.state.outputs = action.outputs
await self.set_agent_state_to(AgentState.REJECTED)
async def _handle_message_action(self, action: MessageAction) -> None:
"""处理事件流中的消息动作。
区分用户来源和智能体来源的消息,分别执行日志记录、信息召回、状态更新等逻辑。
参数:
action (MessageAction):待处理的消息动作对象,包含消息内容、来源等信息
"""
# 处理用户来源的消息
if action.source == EventSource.USER:
# 日志级别控制:如果开启了"记录所有事件",则用info级别,否则用debug级别
log_level = (
'info' if os.getenv('LOG_ALL_EVENTS') in ('true', '1') else 'debug'
)
# 输出日志,附加消息类型和事件来源元数据
self.log(
log_level,
str(action),
extra={'msg_type': 'ACTION', 'event_source': EventSource.USER},
)
# 判断当前消息是否为该智能体接收的第一条用户消息(影响后续召回策略)
first_user_message = self._first_user_message()
is_first_user_message = (
action.id == first_user_message.id if first_user_message else False
)
# 首次消息:召回工作空间上下文;非首次:召回知识库内容
recall_type = (
RecallType.WORKSPACE_CONTEXT
if is_first_user_message
else RecallType.KNOWLEDGE
)
# 创建召回动作,用于检索相关信息
recall_action = RecallAction(query=action.content, recall_type=recall_type)
# 记录待处理的召回动作
self._pending_action = recall_action
# 添加召回动作到事件流(来源标记为用户,因用户消息是召回触发源)
self.event_stream.add_event(recall_action, EventSource.USER)
# 如果智能体当前未处于运行状态,将其切换为运行状态
if self.get_agent_state() != AgentState.RUNNING:
await self.set_agent_state_to(AgentState.RUNNING)
# 处理智能体来源的消息
elif action.source == EventSource.AGENT:
# 如果智能体标记需要等待用户响应,将状态切换为"等待用户输入"
if action.wait_for_response:
await self.set_agent_state_to(AgentState.AWAITING_USER_INPUT)
async def start_delegate(self, action: AgentDelegateAction) -> None:
"""启动委托智能体以处理子任务。
OpenHands 是多智能体系统:
- 「任务(task)」:系统与用户之间的完整对话,始于用户初始输入(通常是任务描述),
终于智能体发起的完成动作、用户停止操作或错误触发。
- 「子任务(subtask)」:智能体与用户或其他智能体之间的对话。
若单个智能体即可完成任务,则任务与子任务合一;否则任务由多个子任务组成,每个子任务由独立智能体处理。
参数:
action (AgentDelegateAction):包含待启动委托智能体信息的动作对象
"""
# 根据动作中指定的智能体名称,获取对应的智能体类
agent_cls: Type[Agent] = Agent.get_cls(action.agent)
# 获取智能体配置:优先使用动作指定的配置,未指定则复用当前智能体的配置
agent_config = self.agent_configs.get(action.agent, self.agent.config)
# 创建委托智能体实例(确保父子智能体共享LLM注册信息)
# 注:父子智能体共享指标,实现全局指标累积
delegate_agent = agent_cls(
config=agent_config, llm_registry=self.agent.llm_registry
)
# 启动委托智能体前,创建初始状态(继承父智能体关键配置)
state = State(
session_id=self.id.removesuffix('-delegate'), # 会话ID:移除父智能体的委托后缀
user_id=self.user_id, # 继承用户ID,保持用户关联
inputs=action.inputs or {}, # 子任务输入参数(默认为空字典)
iteration_flag=self.state.iteration_flag, # 继承迭代控制标志(限制迭代次数)
budget_flag=self.state.budget_flag, # 继承预算控制标志(限制资源使用)
delegate_level=self.state.delegate_level + 1, # 委托层级+1(标识子智能体层级)
metrics=self.state.metrics, # 共享全局指标(父子智能体指标统一累积)
start_id=self.event_stream.get_latest_event_id() + 1, # 事件起始ID:从最新事件后开始记录
parent_metrics_snapshot=self.state_tracker.get_metrics_snapshot(), # 父智能体指标快照(用于后续对比)
parent_iteration=self.state.iteration_flag.current_value, # 父智能体当前迭代次数
)
# 输出调试日志:记录委托智能体启动信息
self.log(
'debug',
f'start delegate, creating agent {delegate_agent.name}',
)
# 创建委托智能体的控制器(核心:标记is_delegate=True,避免直接订阅事件流)
self.delegate = AgentController(
sid=self.id + '-delegate', # 会话ID:在父ID后添加委托后缀,唯一标识
file_store=self.file_store, # 继承文件存储对象(用于状态持久化)
user_id=self.user_id, # 继承用户ID
agent=delegate_agent, # 待管理的委托智能体实例
event_stream=self.event_stream, # 共享事件流(父子智能体事件互通)
conversation_stats=self.conversation_stats, # 继承对话统计信息
iteration_delta=self._initial_max_iterations, # 迭代次数增量(子任务的最大迭代限制)
budget_per_task_delta=self._initial_max_budget_per_task, # 单任务预算增量(子任务的资源限制)
agent_to_llm_config=self.agent_to_llm_config, # 继承LLM配置映射
agent_configs=self.agent_configs, # 继承智能体配置字典
initial_state=state, # 初始状态(继承父智能体配置后的状态)
is_delegate=True, # 标记为委托智能体(关键:避免重复订阅事件流)
headless_mode=self.headless_mode, # 继承无头模式(无交互界面)配置
security_analyzer=self.security_analyzer, # 继承安全分析器(用于安全校验)
)
3.2 Agent Lifecycle Management
For the Agent Controller, agent state management is a crucial task, such as maintaining the agent’s running state. This is what set_agent_state_to does. set_agent_state_to is called in various functions, such as: [function name missing], [function name missing], [function name missing] _react_to_exception, _handle_action, _handle_observation, _handle_message_action [function name missing].
3.2.1 Process
This method is the core interface for state management of OpenHands agents. It is responsible for uniformly handling the entire process of agent state changes, maintaining the agent’s running state (RUNNING, STOPPED, ERROR, FINISHED, etc.) to ensure state consistency, traceability, and system stability. Its main functions include:
- Status change verification: Avoids repeatedly setting the same status and reduces invalid operations.
- Related logic triggers: When the state switches to Stop/Error, a reset (relevant resources) is executed. When recovering from an error, the runtime adjusts control limits (such as the maximum number of iterations).
- Action confirmation processing: After the user confirms/rejects, the confirmation status of the pending action is updated and dispatched to the event stream, completing the action loop.
- Event dispatch:
AgentStateChangedObservationAn event is generated after a state change, carrying the error reason (in the case of an error state), so that other modules can subscribe to the response. - State persistence: Any state change is forcibly saved to prevent state loss due to crashes or accidents.
Main features
- Atomicity design: The state update process is completed in one go, updating the state first and then handling side effects, ensuring that subsequent logic is based on the latest state.
- Side effect closed loop: The logic related to state change, such as reset, limit adjustment, action handling, and event distribution, is uniformly encapsulated to avoid dispersion and redundancy.
- Traceability: State change logs, error reasons carried by events, and persistent storage form a complete state traceability chain.
- Robustness guarantees: duplicate state interception, crash protection (forced saving), and secure access to attributes (
hasattrverification) improve system stability. - Highly scalable: When adding a new state, it can be added to the enumeration without significant modification to the core process, adapting to different business scenarios.
openhands-8-2

3.2.2 Code
async def set_agent_state_to(self, new_state: AgentState) -> None:
"""更新智能体状态并处理副作用。
核心职责:同步状态变更、触发关联逻辑(重置、限制调整、事件分发)、持久化状态,
确保状态变更的一致性和可追溯性。
参数:
new_state (AgentState):智能体要切换到的新状态
"""
# 输出状态变更日志:包含智能体名称、旧状态、新状态
self.log(
'info',
f'Setting agent({self.agent.name}) state from {self.state.agent_state} to {new_state}',
)
# 状态未变更(新状态与当前状态一致):直接返回,避免重复处理
if new_state == self.state.agent_state:
return
# 保存旧状态:用于后续控制限制校验(如错误恢复后的限制调整)
old_state = self.state.agent_state
# 先更新状态:确保后续 _reset() 等方法能获取到最新状态
self.state.agent_state = new_state
# 状态切换为停止/错误时:执行重置逻辑(清空临时数据、释放资源)
if new_state in (AgentState.STOPPED, AgentState.ERROR):
self._reset()
# 错误状态恢复为运行状态时:尝试调整控制标志限制(如迭代次数上限、预算)
if old_state == AgentState.ERROR and new_state == AgentState.RUNNING:
self.state_tracker.maybe_increase_control_flags_limits(self.headless_mode)
# 待处理动作存在,且新状态为用户确认/拒绝:更新动作确认状态并分发事件
if self._pending_action is not None and (
new_state in (AgentState.USER_CONFIRMED, AgentState.USER_REJECTED)
):
# 清空动作的思考过程(若有该属性)
if hasattr(self._pending_action, 'thought'):
self._pending_action.thought = '' # type: ignore[union-attr]
# 根据新状态设置动作确认状态
confirmation_state = (
ActionConfirmationStatus.CONFIRMED
if new_state == AgentState.USER_CONFIRMED
else ActionConfirmationStatus.REJECTED
)
self._pending_action.confirmation_state = confirmation_state
self._pending_action._id = None # type: ignore[attr-defined] # 清空动作ID(避免重复)
# 将更新后的动作添加到事件流,供其他模块处理
self.event_stream.add_event(self._pending_action, EventSource.AGENT)
# 构建状态变更观察事件:错误状态需携带错误原因
reason = self.state.last_error if new_state == AgentState.ERROR else ""
self.event_stream.add_event(
AgentStateChangedObservation('', self.state.agent_state, reason),
EventSource.ENVIRONMENT, # 状态变更由环境触发
)
# 状态变更时强制保存状态:防止崩溃或意外情况导致状态丢失
self.save_state()
3.3 Proxy Execution Control
OpenHands uses the _step() method to control the step-by-step execution of the agent, the maximum number of execution steps of the agent, manage the budget limit of the task (based on cost), and detect and handle situations where the agent gets stuck in a loop.
3.3.1 Process
This code contains the core single-step execution logic of the OpenHands agent, encapsulating the complete single-step lifecycle of the agent and serving as the core entry point for system task execution. Its main functions include:
- Precondition interception: Verify the agent’s state (only the RUNNING state is executable) and pending actions (only continue if there are no pending actions) to avoid parallel execution conflicts.
- Core controllability verification: Synchronize budget and global indicators to ensure resources are not over-specified, detect whether the agent is trapped in a loop, and verify control flags such as iteration count/budget to prevent the system from going out of control.
- Action generation and playback adaptation: Supports playback mode (directly obtains actions from playback trajectory) and normal mode (calls the agent to generate actions), adapting to different use cases.
- Fine-grained exception handling: For errors such as incorrect action format, LLM response exceptions, and function call exceptions, add error events to the event stream; for context window overflows, support historical truncation or throwing a unified exception, and be compatible with error message formats of different LLMs.
- High-risk action security verification: For risky actions such as command execution, file operation, and interactive browsing, the security analyzer marks the risk level and triggers user confirmation in confirmation mode to ensure execution security.
- State and event management: Actions awaiting confirmation automatically switch the agent’s state to “waiting for user confirmation”. Non-empty actions are synchronized to the event stream for other modules to subscribe to, while front-end display metrics are prepared to meet both system collaboration and visualization needs.
openhands-8-3

3.3.2 Code
async def _step(self) -> None:
"""执行父智能体或委托智能体的单步逻辑。
核心职责:检查智能体运行状态、拦截阻塞条件、执行控制标志校验、生成/回放动作、处理安全确认。
同时处理上下文溢出、动作异常等场景,确保单步执行的稳定性和安全性。
"""
# 检查智能体状态:非运行状态则跳过执行
if self.get_agent_state() != AgentState.RUNNING:
self.log(
'debug',
f'Agent not stepping because state is {self.get_agent_state()} (not RUNNING)',
extra={'msg_type': 'STEP_BLOCKED_STATE'},
)
return
# 检查是否有待处理动作:存在则跳过(避免并行执行冲突)
if self._pending_action:
action_id = getattr(self._pending_action, 'id', 'unknown')
action_type = type(self._pending_action).__name__
self.log(
'debug',
f'Agent not stepping because of pending action: {action_type} (id={action_id})',
extra={'msg_type': 'STEP_BLOCKED_PENDING_ACTION'},
)
return
# 输出步骤日志:包含委托层级、本地步骤数、全局迭代数(便于调试追踪)
self.log(
'debug',
f'LEVEL {self.state.delegate_level} LOCAL STEP {self.state.get_local_step()} GLOBAL STEP {self.state.iteration_flag.current_value}',
extra={'msg_type': 'STEP'},
)
# 1. 同步预算与指标:确保所有 LLM 服务的消耗不超过预算限制
self.state_tracker.sync_budget_flag_with_metrics()
# 2. 检查智能体是否陷入循环:是则抛出异常并处理
if self._is_stuck():
await self._react_to_exception(
AgentStuckInLoopError('Agent got stuck in a loop')
)
return
# 3. 执行控制标志校验:检查迭代次数、预算等限制是否超限
try:
self.state_tracker.run_control_flags()
except Exception as e:
logger.warning('Control flag limits hit')
await self._react_to_exception(e)
return
# 初始化动作:默认为空动作
action: Action = NullAction()
# 4. 处理回放模式:不执行智能体逻辑,直接从回放轨迹中获取动作
if self._replay_manager.should_replay():
action = self._replay_manager.step()
else:
# 非回放模式:调用智能体生成动作
try:
action = self.agent.step(self.state)
# 检查动作是否为空:空动作则抛出异常
if action is None:
raise LLMNoActionError('No action was returned')
# 标记动作来源为智能体
action._source = EventSource.AGENT
except (
LLMMalformedActionError,
LLMNoActionError,
LLMResponseError,
FunctionCallValidationError,
FunctionCallNotExistsError,
) as e:
# 处理动作生成相关异常:添加错误观察事件到事件流,直接返回
self.event_stream.add_event(
ErrorObservation(
content=str(e),
),
EventSource.AGENT,
)
return
except (ContextWindowExceededError, BadRequestError, OpenAIError) as e:
# 处理上下文窗口溢出相关异常(兼容不同 LLM 的错误提示格式)
error_str = str(e).lower()
# 通过关键词匹配判断是否为上下文溢出(因部分 LLM 未统一异常类型)
if (
'contextwindowexceedederror' in error_str
or 'prompt is too long' in error_str
or 'input length and `max_tokens` exceed context limit' in error_str
or 'please reduce the length of either one' in error_str
or 'the request exceeds the available context size' in error_str
or 'context length exceeded' in error_str
# OpenRouter 上下文溢出错误关键词
or (
'sambanovaexception' in error_str
and 'maximum context length' in error_str
)
# SambaNova 上下文溢出错误(需同时匹配两个关键词)
or isinstance(e, ContextWindowExceededError)
):
# 启用上下文截断:添加压缩请求动作,触发历史上下文截断
if self.agent.config.enable_history_truncation:
self.event_stream.add_event(
CondensationRequestAction(), EventSource.AGENT
)
return
else:
# 未启用截断:抛出系统统一的上下文溢出异常
raise LLMContextWindowExceedError()
else:
# 非上下文溢出错误:向上抛出原始异常
raise e
# 5. 处理可执行动作的安全确认逻辑
if action.runnable:
# 筛选需要安全确认的动作类型(命令执行、文件操作、交互式浏览等)
if self.state.confirmation_mode and (
type(action) is CmdRunAction
or type(action) is IPythonRunCellAction
or type(action) is BrowseInteractiveAction
or type(action) is FileEditAction
or type(action) is FileReadAction
):
# 调用安全分析器检测动作风险
await self._handle_security_analyzer(action)
# 获取动作的安全风险等级(由 LLM 或安全分析器标记)
security_risk = getattr(
action, 'security_risk', ActionSecurityRisk.UNKNOWN
)
# 定义需要用户确认的场景:高风险 或 未知风险且无安全分析器
is_high_security_risk = security_risk == ActionSecurityRisk.HIGH
is_ask_for_every_action = (
security_risk == ActionSecurityRisk.UNKNOWN
and not self.security_analyzer
)
# 命令行模式:强制标记为待确认状态(CLI 自行处理确认逻辑)
if self.agent.config.cli_mode:
action.confirmation_state = (
ActionConfirmationStatus.AWAITING_CONFIRMATION
)
# 非命令行模式:高风险/未知风险动作需用户确认
elif (
is_high_security_risk or is_ask_for_every_action
) and self.confirmation_mode:
logger.debug(
f'[non-CLI mode] Detected HIGH security risk in action: {action}. Ask for confirmation'
)
action.confirmation_state = (
ActionConfirmationStatus.AWAITING_CONFIRMATION
)
# 将可执行动作标记为待处理(避免重复执行)
self._pending_action = action
# 6. 非空动作的后续处理:状态更新、指标准备、事件分发
if not isinstance(action, NullAction):
# 待确认动作:将智能体状态切换为"等待用户确认"
if (
hasattr(action, 'confirmation_state')
and action.confirmation_state
== ActionConfirmationStatus.AWAITING_CONFIRMATION
):
await self.set_agent_state_to(AgentState.AWAITING_USER_CONFIRMATION)
# 准备前端展示的指标数据(如动作类型、执行时间等)
self._prepare_metrics_for_frontend(action)
# 将动作添加到事件流(供其他模块订阅处理)
self.event_stream.add_event(action, action._source)
# 输出动作日志:根据全局配置决定日志级别
log_level = 'info' if LOG_ALL_EVENTS else 'debug'
self.log(log_level, str(action), extra={'msg_type': 'ACTION'})
3.4 Callback
The AgentController registers an event callback function on_event, which sets an identifier _pending_action and polls the task _start_step_loop to call the Agent’s step method, which is used to predict the next action.
3.4.1 Core Features
- Event forwarding mechanism: When there is an active sub-agent, the event is automatically forwarded to the sub-agent for processing, ensuring the continuity of hierarchical agent collaboration.
- State judgment logic: By checking the state of the child agent (complete, error, rejection, etc.), it is determined whether to terminate the child agent and resume the parent agent’s processing flow.
- Event classification and handling: Differentiate between action events and observation events, and call the corresponding handling methods for each to ensure targeted event handling.
- Step trigger control: The
should_stepmethod determines whether to trigger the next action of the agent, and special logging is performed in combination with user messages and other scenarios to enhance the observability of the process.
3.4.2 Process
8-5

3.4.3 Code
def on_event(self, event: Event) -> None:
"""事件流的回调方法,通知控制器有新事件传入。
参数:
event (Event): 待处理的传入事件。
"""
# 若存在子智能体且未完成/未出错,将事件转发给子智能体
if self.delegate is not None:
# 获取子智能体当前状态
delegate_state = self.delegate.get_agent_state()
# 判断子智能体是否仍活跃(未完成、未出错、未被拒绝),或因超迭代/超预算报错
if (
delegate_state
not in (
AgentState.FINISHED, # 已完成
AgentState.ERROR, # 错误
AgentState.REJECTED, # 被拒绝
)
or 'RuntimeError: Agent reached maximum iteration.'
in self.delegate.state.last_error # 达到最大迭代次数
or 'RuntimeError:Agent reached maximum budget for conversation'
in self.delegate.state.last_error # 达到对话最大预算
):
# 将事件转发给子智能体,跳过父智能体处理
asyncio.get_event_loop().run_until_complete(
self.delegate._on_event(event)
)
return
else:
# 子智能体已完成或出错,终止子智能体流程
self.end_delegate()
return
# 仅当无活跃子智能体时,继续父智能体的事件处理
asyncio.get_event_loop().run_until_complete(self._on_event(event))
async def _on_event(self, event: Event) -> None:
"""父智能体内部的事件处理方法(无活跃子智能体时调用)。"""
# 若事件为隐藏类型,则忽略处理
if hasattr(event, 'hidden') and event.hidden:
return
# 将事件添加到状态追踪器的历史记录中
self.state_tracker.add_history(event)
# 根据事件类型分发处理
if isinstance(event, Action):
# 处理动作类型事件
await self._handle_action(event)
elif isinstance(event, Observation):
# 处理观察结果类型事件
await self._handle_observation(event)
# 判断是否需要触发智能体下一步操作
should_step = self.should_step(event)
if should_step:
self.log(
'debug',
f'Stepping agent after event: {type(event).__name__}',
extra={'msg_type': 'STEPPING_AGENT'},
)
# 带异常处理的下一步操作
await self._step_with_exception_handling()
elif isinstance(event, MessageAction) and event.source == EventSource.USER:
# 若收到用户消息但未触发下一步,记录警告日志
self.log(
'warning',
f'Not stepping agent after user message. Current state: {self.get_agent_state()}',
extra={'msg_type': 'NOT_STEPPING_AFTER_USER_MESSAGE'},
)
3.5 End-to-end observability
State Its function is to store all events, execution status, task plans, and other information generated during the entire agent’s workflow.
At any point during task execution, users or developers need to have a clear understanding of the Agent’s running status: Is it in the decision-making stage, or is it executing specific commands? Is it waiting for external resource responses, or is it stuck due to parameter errors? This observability requires the workflow architecture to log, mark status, and provide real-time feedback for each step, building a transparent monitoring system to support troubleshooting and performance optimization.
OpenHands handles monitoring in start_delegate, _step, and end_delegate.
async def start_delegate(self, action: AgentDelegateAction) -> None:
# 创建委托智能体实例(确保父子智能体共享LLM注册信息)
# 注:父子智能体共享指标,实现全局指标累积
delegate_agent = agent_cls(
config=agent_config, llm_registry=self.agent.llm_registry
)
# 启动委托智能体前,创建初始状态(继承父智能体关键配置)
state = State(
session_id=self.id.removesuffix('-delegate'), # 会话ID:移除父智能体的委托后缀
user_id=self.user_id, # 继承用户ID,保持用户关联
inputs=action.inputs or {}, # 子任务输入参数(默认为空字典)
iteration_flag=self.state.iteration_flag, # 继承迭代控制标志(限制迭代次数)
budget_flag=self.state.budget_flag, # 继承预算控制标志(限制资源使用)
delegate_level=self.state.delegate_level + 1, # 委托层级+1(标识子智能体层级)
metrics=self.state.metrics, # 共享全局指标(父子智能体指标统一累积)
start_id=self.event_stream.get_latest_event_id() + 1, # 事件起始ID:从最新事件后开始记录
parent_metrics_snapshot=self.state_tracker.get_metrics_snapshot(), # 父智能体指标快照(用于后续对比)
parent_iteration=self.state.iteration_flag.current_value, # 父智能体当前迭代次数
)
def end_delegate(self) -> None:
# Calculate delegate-specific metrics before closing the delegate
delegate_metrics = self.state.get_local_metrics()
logger.info(f'Local metrics for delegate: {delegate_metrics}')
async def _step(self) -> None:
# Create and log metrics for frontend display
self._prepare_metrics_for_frontend(action)
def _prepare_metrics_for_frontend(self, action: Action) -> None:
"""Create a minimal metrics object for frontend display and log it.
To avoid performance issues with long conversations, we only keep:
- accumulated_cost: The current total cost
- accumulated_token_usage: Accumulated token statistics across all API calls
- max_budget_per_task: The maximum budget allowed for the task
This includes metrics from both the agent's LLM and the condenser's LLM if it exists.
Args:
action: The action to attach metrics to
"""
# Get metrics from agent LLM
metrics = self.conversation_stats.get_combined_metrics()
# Create a clean copy with only the fields we want to keep
clean_metrics = Metrics()
clean_metrics.accumulated_cost = metrics.accumulated_cost
clean_metrics._accumulated_token_usage = copy.deepcopy(
metrics.accumulated_token_usage
)
# Add max_budget_per_task to metrics
if self.state.budget_flag:
clean_metrics.max_budget_per_task = self.state.budget_flag.max_value
action.llm_metrics = clean_metrics
# Log the metrics information for debugging
# Get the latest usage directly from the agent's metrics
latest_usage = None
if self.state.metrics.token_usages:
latest_usage = self.state.metrics.token_usages[-1]
accumulated_usage = self.state.metrics.accumulated_token_usage
3.6 Taming the “Uncertainty” of Decision Making
The generation characteristics of LLMs inherently introduce randomness into their output. When an agent executes a task, it may suddenly perform invalid operations—such as calling a non-existent tool or repeating the same steps in a loop. If this non-determinism is not effectively controlled, it will directly lead to task failure. Therefore, the system must build an intelligent error correction mechanism capable of detecting abnormal behavior in real time and correcting the path through retries, rollbacks, or replanning. This requires a deep understanding of the task logic and model behavior.
3.6.1 Process
The OpenHands agent’s anomaly detection and unified handling mechanism is a key module for ensuring system stability and fault tolerance. Its main functions include:
- Exception wrapping:
_step_with_exception_handlingWrap the core business logic with the method_stepto capture all exceptions during execution and prevent system crashes. - Anomaly Classification and Handling: In
_react_to_exceptionthis system, error states are further subdivided according to the anomaly type (such as LLM authentication failure, service unavailability, budget exhaustion, etc.), providing accurate basis for error localization. - State and Error Storage: After an exception is captured, the agent’s state (e.g. ERROR, RATE_LIMITED) is updated, and error details are stored for subsequent troubleshooting.
- External notification:
status_callbackError status is synchronized to external systems via callback functions, supporting monitoring and alerts. - Fault tolerance optimization: Unknown exceptions are packaged and returned with user-friendly prompts; rate limit exceptions are distinguished into “retry exhaustion” and “retryable” scenarios to improve system flexibility.
The main features are:
- Comprehensive exception coverage: Captures all exception types to prevent system crashes caused by unhandled exceptions and ensure operational stability.
- Refined error classification: Further subdivide error statuses for common LLM-related anomalies (authentication, service, budget, content policy, etc.) to facilitate problem localization and monitoring.
- User-friendly experience: Unknown exceptions are automatically packaged into easy-to-understand prompts to reduce user confusion; rate limiting scenarios are handled differently and subsequent retries are supported.
- Highly scalable: Error notification logic is decoupled through callback functions, allowing for flexible integration with external systems (such as monitoring alarms, log reporting, etc.).
- Complete error tracing: Logs include session ID, exception stack, and error type, which, combined with error information stored in the status, facilitates problem investigation.
open-hands-8-4

3.6.2 Code
async def _step(self) -> None:
"""智能体核心执行步骤(实际业务逻辑,如处理动作、调用 LLM 等)"""
pass
async def _react_to_exception(
self,
e: Exception,
) -> None:
"""处理异常:将智能体状态设置为错误,并发送状态消息。
根据异常类型细分错误状态,便于外部监控和问题定位。
参数:
e (Exception):捕获到的异常实例
"""
# 在设置智能体状态前,先存储错误原因(异常类型 + 异常信息)
self.state.last_error = f'{type(e).__name__}: {str(e)}'
# 如果存在状态回调函数,触发回调通知外部系统错误状态
if self.status_callback is not None:
# 默认错误状态为通用错误
runtime_status = RuntimeStatus.ERROR
# 根据异常类型细分错误状态
if isinstance(e, AuthenticationError):
# LLM 认证失败(如密钥无效、权限不足)
runtime_status = RuntimeStatus.ERROR_LLM_AUTHENTICATION
self.state.last_error = runtime_status.value # 更新错误信息为标准化描述
elif isinstance(
e,
(
ServiceUnavailableError,
APIConnectionError,
APIError,
),
):
# LLM 服务不可用(如服务宕机、网络连接失败)
runtime_status = RuntimeStatus.ERROR_LLM_SERVICE_UNAVAILABLE
self.state.last_error = runtime_status.value
elif isinstance(e, InternalServerError):
# LLM 内部服务器错误
runtime_status = RuntimeStatus.ERROR_LLM_INTERNAL_SERVER_ERROR
self.state.last_error = runtime_status.value
elif isinstance(e, BadRequestError) and 'ExceededBudget' in str(e):
# LLM 预算耗尽(通过错误信息中的关键词识别)
runtime_status = RuntimeStatus.ERROR_LLM_OUT_OF_CREDITS
self.state.last_error = runtime_status.value
elif isinstance(e, ContentPolicyViolationError) or (
isinstance(e, BadRequestError)
and 'ContentPolicyViolationError' in str(e)
):
# 内容违反 LLM 政策(直接匹配异常类型或错误信息关键词)
runtime_status = RuntimeStatus.ERROR_LLM_CONTENT_POLICY_VIOLATION
self.state.last_error = runtime_status.value
elif isinstance(e, RateLimitError):
# 速率限制异常:判断是否已耗尽所有重试次数
if (
hasattr(e, 'retry_attempt')
and hasattr(e, 'max_retries')
and e.retry_attempt >= e.max_retries
):
# 所有重试均失败,设置为最终错误状态并更新错误信息
self.state.last_error = (
RuntimeStatus.AGENT_RATE_LIMITED_STOPPED_MESSAGE.value
)
await self.set_agent_state_to(AgentState.ERROR)
else:
# 仍有重试次数,设置为速率限制状态(后续可自动重试)
await self.set_agent_state_to(AgentState.RATE_LIMITED)
return # 速率限制异常处理完毕,直接返回
# 触发回调函数,将错误状态和错误信息通知外部
self.status_callback('error', runtime_status, self.state.last_error)
# 无论是否有回调,最终将智能体状态设置为错误
await self.set_agent_state_to(AgentState.ERROR)
async def _step_with_exception_handling(self) -> None:
"""带异常处理的智能体核心步骤执行方法。
包裹 _step 方法(核心业务逻辑),捕获所有异常并统一处理,确保系统稳定性。
"""
try:
# 执行智能体核心业务逻辑(如处理动作、调用 LLM、任务执行等)
await self._step()
except Exception as e:
# 构建对外上报的异常实例:优先使用已知异常类型,未知类型包装为通用 RuntimeError
if (
isinstance(e, Timeout)
or isinstance(e, APIError)
or isinstance(e, BadRequestError)
or isinstance(e, NotFoundError)
or isinstance(e, InternalServerError)
or isinstance(e, AuthenticationError)
or isinstance(e, RateLimitError)
or isinstance(e, ContentPolicyViolationError)
or isinstance(e, LLMContextWindowExceedError)
):
# 已知异常类型:直接上报原始异常
reported = e
else:
# 未知异常类型:输出警告日志,并包装为用户友好的 RuntimeError
reported = RuntimeError(f'Unhandled exception in _step: {type(e).__name__}: {e}')
# 调用异常响应方法,统一处理异常(设置状态、通知外部等)
await self._react_to_exception(reported)
0xFF Reference
https://docs.all-hands.dev/openhands/usage/architecture/backend
As AI agents evolve from “toys” to “tools,” what should we focus on? Openhands Architecture Analysis [Part 2: Core Concepts Related to Agents] by Kerry
As AI agents evolve from “toys” to “tools,” what should we focus on? Openhands Architecture Analysis [Part 1: Series Introduction] by Kerry
Coding Agent Openhands Analysis (with code) Arrow
OpenHands Source Code Analysis by Yi Lihui
After dissecting over a dozen agent platforms, I discovered that the one most practically applicable isn’t LangChain, Manus, or Coze, but rather n8n Bayesian Analyzer.
AI Programmer’s Guide to OpenDevin Source Code Analysis (goofy)
Your Agent May Be Designed Wrong: UIUC, Stanford, and Others Jointly Publish Paper on Reconstructing Agents to Adapt to 2x2
In-depth analysis of the open-source agent framework Parlant: Giving “deterministic reins” to runaway AI
Effective harnesses for long-running agents