OHMind uses a shared state object (AgentState) that flows through the agent graph. This state maintains conversation history, routing information, tool results, and task planning data. The state is managed by LangGraph and persisted using checkpointing.
Key principles:
Immutable updates: State is copied and modified, not mutated in place
Message accumulation: Messages are appended using LangGraph’s add_messages reducer
Centralized tracking: All agents read from and write to the same state structure
AgentState Schema
The AgentState is defined as a TypedDict with annotated fields:
defshould_continue_validation(state:AgentState)->bool:"""Check if validation is required and not yet approved/rejected."""returnstate["validation_required"]andstate["validation_approved"]isNonedefmark_validation_required(state:AgentState,operation_description:str,metadata:Dict[str,Any])->AgentState:"""Mark that an operation requires human validation."""updated=state.copy()updated["validation_required"]=Trueupdated["validation_approved"]=Noneupdated["validation_message"]=operation_descriptionupdated["operation_metadata"]=metadataupdated["next"]="FINISH"# Wait for user approval
returnupdateddefmark_validation_complete(state:AgentState,approved:bool,next_agent:str)->AgentState:"""Mark validation as complete with user's decision."""updated=state.copy()updated["validation_required"]=Falseupdated["validation_approved"]=approvedupdated["next"]=next_agentifapprovedelse"supervisor"returnupdated
MCP Result Helpers
defadd_mcp_result(state:AgentState,tool_name:str,result:Any)->AgentState:"""Add result from an MCP tool call."""updated=state.copy()existing_results=updated.get("mcp_results",{})mcp_results=existing_results.copy()ifexisting_resultselse{}mcp_results[tool_name]=resultupdated["mcp_results"]=mcp_resultsreturnupdated
Artifact Helpers
defadd_artifact(state:AgentState,artifact_id:str,artifact_data:Any)->AgentState:"""Add artifact for UI rendering."""updated=state.copy()existing_artifacts=updated.get("artifacts",{})artifacts=existing_artifacts.copy()ifexisting_artifactselse{}artifacts[artifact_id]=artifact_dataupdated["artifacts"]=artifactsreturnupdated
State Persistence
Checkpointing
LangGraph’s MemorySaver provides state persistence: