Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

eko-core

A pure state-graph execution engine for building AI agents in Rust.


eko-core does one thing and does it well: drive state through a graph of nodes and edges you define. It contains no LLM calls, no tool implementations, no preset patterns — just the core execution engine.

Features

Pure State GraphDefine nodes, edges, and channels — the engine handles the rest
Deterministic ExecutionSuperstep model guarantees consistent results regardless of parallel task ordering
ChannelsPluggable merge strategies: last-write-wins, custom reducers, ephemeral, queues
Interrupt & ResumeHuman-in-the-loop: pause execution, collect feedback, resume seamlessly
CheckpointsSave and restore graph state at any point for fault tolerance
Fan-out & JoinParallel task dispatch with barrier-based synchronization
SubgraphsCompose graphs — embed a compiled graph as a node in another graph
Retry & TimeoutPer-node retry policies with exponential backoff; per-node timeouts
Derive Macro#[derive(State)] auto-generates channel mappings from struct definitions
Async NativeBuilt on Tokio, fully async from top to bottom

Install

Add to your Cargo.toml:

[dependencies]
eko-core = "0.1"

One-minute Overview

State  = a set of Channel values (the "shared blackboard")
Node   = reads State → does work → writes back to State
Edge   = decides which Node runs next
Graph  = Nodes + Edges + Channels
#![allow(unused)]
fn main() {
use eko_core::*;
use std::collections::HashMap;
use serde_json::json;

// 1. Define channels (merge strategies)
let mut channels = HashMap::new();
channels.insert("msg".into(), Box::new(LastValue::new()) as Box<dyn AnyChannel>);

// 2. Build the graph
let mut g = StateGraph::new(channels);
g.add_node_fn("greet", |mut s, _| async move {
    s.insert("msg".into(), json!("hello world"));
    Ok(NodeOutput::Update(s))
});
g.add_edge(START, "greet");
g.add_edge("greet", END);

// 3. Compile & execute
let compiled = g.compile(CompileConfig::default())?;
let result = compiled.invoke(HashMap::new(), &GraphConfig::default()).await?;
// result = GraphResult::Done { values: {"msg": "hello world"}, .. }
}

What’s Next

Quick Start

Lifecycle: Build → Compile → Execute

Every graph follows three phases:

#![allow(unused)]
fn main() {
// 1. Build (mutable)
let mut graph = StateGraph::new(channels);
graph.add_node_fn("a", ...);
graph.add_edge(START, "a");
graph.add_edge("a", END);

// 2. Compile (immutable) — validates topology
let compiled = graph.compile(CompileConfig {
    checkpointer: Some(saver),
    interrupt_before: vec!["tools".into()],
    retry_policy: Some(RetryPolicy::default()),
    ..Default::default()
})?;

// 3. Execute
let result = compiled.invoke(input, &GraphConfig {
    thread_id: Some("conversation-1".into()),
    step_timeout: Some(Duration::from_secs(30)),
    ..Default::default()
}).await?;
}

Example: Simple Linear Graph

#![allow(unused)]
fn main() {
use eko_core::*;
use std::collections::HashMap;
use serde_json::json;

let mut channels = HashMap::new();
channels.insert("msg".into(), Box::new(LastValue::new()) as Box<dyn AnyChannel>);

let mut g = StateGraph::new(channels);
g.add_node_fn("greet", |mut s, _| async move {
    s.insert("msg".into(), json!("hello world"));
    Ok(NodeOutput::Update(s))
});
g.add_edge(START, "greet");
g.add_edge("greet", END);

let compiled = g.compile(CompileConfig::default())?;
let result = compiled.invoke(HashMap::new(), &GraphConfig::default()).await?;
// result = GraphResult::Done { values: {"msg": "hello world"}, .. }
}

Example: Conditional Loop

#![allow(unused)]
fn main() {
g.add_node_fn("increment", |mut s, _| async move {
    let v = s.get("count").and_then(|v| v.as_i64()).unwrap_or(0);
    s.insert("count".into(), json!(v + 1));
    Ok(NodeOutput::Update(s))
});
g.add_edge(START, "increment");
g.add_conditional_edge("increment", |state| {
    let count = state.get("count").and_then(|v| v.as_i64()).unwrap_or(0);
    if count >= 5 {
        RouteDecision::End
    } else {
        RouteDecision::Next("increment".into())
    }
});
}

Example: Parallel Fan-out + Join

#![allow(unused)]
fn main() {
g.add_node_fn("dispatcher", |_s, _| async move {
    Ok(NodeOutput::Update(HashMap::new()))
});
g.add_node_fn("worker_a", |_s, _| async move { /* ... */ });
g.add_node_fn("worker_b", |_s, _| async move { /* ... */ });
g.add_node_fn("merger",   |_s, _| async move { /* ... */ });

g.add_edge(START, "dispatcher");
g.add_conditional_edge("dispatcher", |_| RouteDecision::Send(vec![
    Send::new("worker_a", json!({})),
    Send::new("worker_b", json!({})),
]));
g.add_edge_join(&["worker_a", "worker_b"], "merger");
g.add_edge("merger", END);
}

Example: Sequence Helper

#![allow(unused)]
fn main() {
g.add_sequence(vec![
    Box::new(FnNode::new("step1", |s, _| async move { /* ... */ })),
    Box::new(FnNode::new("step2", |s, _| async move { /* ... */ })),
    Box::new(FnNode::new("step3", |s, _| async move { /* ... */ })),
]);
g.add_edge(START, "step1");
g.add_edge("step3", END);
}

State

State is the shared blackboard of the graph: a HashMap<String, serde_json::Value> where each key maps to a Channel. Nodes read from and write to this state; the engine merges updates according to each channel’s strategy.

Nodes receive a snapshot of all channel values when they run. When a node returns NodeOutput::Update, those updates are merged back through each channel’s merge strategy.

Example: reading and writing a counter:

#![allow(unused)]
fn main() {
// Read state
let count = state.get("counter").and_then(|v| v.as_i64()).unwrap_or(0);

// Write back
state.insert("counter".into(), json!(count + 1));
Ok(NodeOutput::Update(state))
}

Channels

Channels define how values merge, not what is stored. When parallel nodes write to the same key simultaneously, the channel’s merge strategy determines the final value.

Channel TypeMerge StrategyTypical Use
LastValueLast write winsCounters, current step
BinaryOperatorAggregateCustom reducerMessage lists (append), log collection
EphemeralValueNot persisted, consumed on readOne-time intermediate values
TopicAppend to queue, drain clearsEvent queues, pending task lists
NamedBarrierValueTrack multi-source completionUsed internally by add_edge_join

Why channels?

Parallel nodes may write to the same key at the same time. Without a merge strategy, updates would conflict. Channels provide deterministic, configurable resolution.

Reducer channel example

#![allow(unused)]
fn main() {
let mut channels = HashMap::new();
channels.insert("messages".into(), Box::new(
    BinaryOperatorAggregate::new(|old, new| {
        match old {
            Value::Array(mut arr) => { arr.push(new); Ok(Value::Array(arr)) }
            _ => Ok(Value::Array(vec![old, new])),
        }
    })
) as Box<dyn AnyChannel>);
}

Nodes

Nodes implement the GraphNode trait or use FnNode / add_node_fn for closure-based nodes. Each node receives a snapshot of all channel values and a context providing stream_tx, cancel signal, and interrupt handling.

#![allow(unused)]
fn main() {
graph.add_node_fn("my_node", |state, ctx| async move {
    // state: snapshot of all channel values
    // ctx: provides stream_tx, cancel signal, interrupt

    let mut out = HashMap::new();
    out.insert("result".into(), json!("done"));
    Ok(NodeOutput::Update(out))
});
}

NodeOutput variants

  • Update(values) — Pure state update. The engine uses edges for routing; the next node is determined by the graph topology.
  • Command { goto, update } — The node decides where to go next. Bypasses edge routing; goto specifies the next node directly.

Edges

Edges define control flow between nodes. Eko supports three types:

Static edges

Simple unconditional transitions:

#![allow(unused)]
fn main() {
graph.add_edge("a", "b");
}

Conditional edges

Route based on state at runtime:

#![allow(unused)]
fn main() {
graph.add_conditional_edge("router", |state| {
    if state.get("done").and_then(|v| v.as_bool()) == Some(true) {
        RouteDecision::End
    } else {
        RouteDecision::Next("worker".into())
    }
});
}

Conditional edges with path map

Use a path map when the router returns keys that map to node names:

#![allow(unused)]
fn main() {
graph.add_conditional_edge_with_map("classifier", router_fn, path_map);
}

Superstep Execution Model

Eko executes graphs in supersteps. Each superstep runs a batch of tasks in parallel, then merges results and advances to the next batch.

Superstep Loop

Superstep 1: [task_A, task_B]  ← parallel execution
    ↓ merge writes into channels
    ↓ save checkpoint
    ↓ resolve edges → collect next batch of tasks
Superstep 2: [task_C]
    ↓ ...
Done (no more tasks)

Key Properties

  • Parallel within superstep: All tasks in a superstep execute concurrently.
  • Batch merge: Writes to channels are merged after the superstep completes.
  • Determinism: Results are independent of task completion order within a superstep.
  • Checkpoint per superstep: State is persisted after each superstep for fault tolerance.

Flow

  1. Execute – Run all tasks in the current batch in parallel.
  2. Merge – Apply channel writes (reducers, last-write-wins, etc.).
  3. Checkpoint – Persist graph state.
  4. Resolve edges – Evaluate conditional edges and collect the next task batch.
  5. Repeat – If tasks remain, start the next superstep; otherwise finish.

Fan-out & Join

Fan-out

Fan-out means one node triggers multiple parallel tasks. Each target becomes a PUSH task in the same superstep.

Method 1: Conditional Edge Returns Send

#![allow(unused)]
fn main() {
graph.add_conditional_edge("dispatcher", |_state| {
    RouteDecision::Send(vec![
        Send::new("worker", json!({"item": "alpha"})),
        Send::new("worker", json!({"item": "beta"})),
    ])
});
}

Method 2: Command.goto Inside Node

#![allow(unused)]
fn main() {
Ok(NodeOutput::command_with_sends(None, vec![
    Send::new("worker", json!({"id": 1})),
    Send::new("worker", json!({"id": 2})),
]))
}

Each Send creates a PUSH task. All PUSH tasks run in parallel in the same superstep.

Join

Join waits for multiple nodes to finish before continuing:

#![allow(unused)]
fn main() {
graph.add_edge_join(&["a", "b"], "merge");
}

This uses NamedBarrierValue internally. The “merge” node runs only after both “a” and “b” have completed.

Subgraphs

A subgraph is a compiled graph used as a node in another graph. Use this for modular, reusable workflows.

Embedding a Subgraph

#![allow(unused)]
fn main() {
let inner = inner_graph.compile(CompileConfig::default())?;
outer_graph.add_subgraph("inner", inner);
}

The outer graph invokes the inner graph as a single node. When the outer graph reaches “inner”, it runs the full inner graph to completion.

Namespace Isolation

Subgraphs use independent checkpoint namespaces. Their state is stored separately from the parent graph, which avoids key collisions and simplifies debugging.

Event Bubbling

Internal events from the subgraph surface to the parent via SubgraphEvent. The parent can observe or react to subgraph activity without coupling to its internals.

Interrupt & Resume

Eko supports three ways to interrupt execution and resume later.

1. Interrupt Before Node (Compile-time)

#![allow(unused)]
fn main() {
CompileConfig {
    interrupt_before: vec!["dangerous_node".into()],
    ..
}
}

Execution stops before entering the specified node. Resume to continue.

2. Interrupt After Node (Compile-time)

#![allow(unused)]
fn main() {
CompileConfig {
    interrupt_after: vec!["review_node".into()],
    ..
}
}

Execution stops after the node completes. Useful for review gates.

3. Runtime Interrupt Inside Node

Most flexible: interrupt from within node logic and pass data to the caller.

#![allow(unused)]
fn main() {
graph.add_node_fn("planner", |mut state, ctx| async move {
    let plan = create_plan(&state);
    let feedback = ctx.interrupt(json!({"plan": plan, "question": "Confirm?"}))?;
    execute_plan(&plan, &feedback);
    Ok(NodeOutput::Update(state))
});
}

Resuming

#![allow(unused)]
fn main() {
compiled.resume(&config, Some(Command {
    resume: Some(json!("approved")),
    update: None,
})).await?;
}

The resume value is passed to the node that called ctx.interrupt(), allowing human-in-the-loop flows.

Checkpoints

Checkpoints persist graph state for fault tolerance and inspection.

BaseCheckpointSaver Trait

Implement BaseCheckpointSaver to store and load checkpoints. The engine calls it after each superstep.

MemorySaver

For tests and prototyping, use MemorySaver—an in-memory implementation that does not persist across process restarts.

Auto-save Behavior

The engine automatically saves after each superstep. No extra configuration is required once a saver is provided.

Benefits

  • Fault tolerance: Restart from the last checkpoint after crashes.
  • State inspection: Load checkpoints to debug or analyze graph state.
  • Resume: Combine with interrupt/resume for human-in-the-loop workflows.

Retry & Timeout

Graph-level Retry Policy

#![allow(unused)]
fn main() {
CompileConfig {
    retry_policy: Some(RetryPolicy {
        max_attempts: 3,
        initial_interval_ms: 500,
        backoff_factor: 2.0,
        max_interval_ms: 128_000,
        jitter: false,
    }),
    ..Default::default()
}
}

Node-level Override

Implement GraphNode and override retry_policy:

#![allow(unused)]
fn main() {
fn retry_policy(&self) -> Option<&RetryPolicy> {
    Some(&self.custom_policy)
}
}

Node-level policy overrides the graph-level policy for that node.

Retryable Errors

Only AgentError::Llm and AgentError::Other are retried. Other error variants fail immediately.

Timeout

Graph-level

#![allow(unused)]
fn main() {
GraphConfig {
    step_timeout: Some(Duration::from_secs(30)),
    ..Default::default()
}
}

Node-level

#![allow(unused)]
fn main() {
fn timeout(&self) -> Option<Duration> {
    Some(Duration::from_secs(60))
}
}

Node timeout overrides graph timeout for that node.

Derive Macro

The #[derive(State)] macro generates channel mappings from struct fields.

Basic Usage

#![allow(unused)]
fn main() {
use eko_core::State;

#[derive(State, Debug, Clone, Serialize, Deserialize)]
struct MyState {
    // No attribute → LastValue (last-write-wins)
    counter: i64,

    // Custom reducer
    #[state(reducer = "append_messages")]
    messages: Vec<String>,

    // Not persisted
    #[state(ephemeral)]
    temp_data: Option<String>,
}

fn append_messages(old: Value, new: Value) -> Result<Value, AgentError> {
    // merge logic
}
}

Field Attributes

AttributeBehavior
(none)LastValue – last write wins
#[state(reducer = "fn_name")]Custom reducer function
#[state(ephemeral)]Not persisted in checkpoints

Reducer Signature

#![allow(unused)]
fn main() {
fn append_messages(old: Value, new: Value) -> Result<Value, AgentError>
}

The reducer receives the current channel value and the incoming write, and returns the merged value.

Architecture

Module Layout

eko-core/src/
├── channels/          # State channel system
│   ├── base.rs        #   AnyChannel trait
│   ├── last_value.rs  #   Last-write-wins
│   ├── binop.rs       #   Custom reducer
│   ├── ephemeral.rs   #   Temporary values (not persisted)
│   ├── topic.rs       #   Append queue
│   └── barrier.rs     #   Multi-source sync barrier
├── checkpoint/        # Checkpoint abstraction
│   ├── saver.rs       #   BaseCheckpointSaver trait
│   ├── memory.rs      #   In-memory implementation (for testing)
│   └── types.rs       #   Checkpoint / CheckpointTuple
├── core/              # Foundational contracts
│   ├── error.rs       #   AgentError / InterruptData
│   ├── llm.rs         #   LlmClient trait
│   ├── stream.rs      #   StreamEvent
│   ├── message.rs     #   Message / ToolCall
│   └── tool.rs        #   ToolDefinition trait
└── graph/             # State graph engine
    ├── builder.rs     #   StateGraph (build phase)
    ├── compiled.rs    #   CompiledGraph (execution phase)
    ├── node.rs        #   GraphNode trait / FnNode / NodeContext
    ├── command.rs     #   NodeOutput / GotoTarget / CommandGraph
    ├── edge.rs        #   Edge / RouteDecision / START / END
    ├── send.rs        #   Send (fan-out directive)
    ├── task.rs        #   Task (PULL / PUSH)
    ├── subgraph.rs    #   SubgraphNode
    └── types.rs       #   GraphResult / RetryPolicy / Command / ...

Design Philosophy

eko-core defines contracts (traits) — you provide the implementations:

eko-core (this crate)        Your implementations            Your application
─────────────────────        ────────────────────            ────────────────
StateGraph                   ReAct / Supervisor / Swarm      Assemble & run
CompiledGraph                LlmClient impl (OpenAI, etc.)
AnyChannel                   CheckpointSaver impl (SQLite)
GraphNode trait              Tool definitions & execution
LlmClient trait              ToolExecutor
BaseCheckpointSaver trait    Session management

The crate ships with:

  • MemorySaver — an in-memory checkpoint implementation for testing
  • #[derive(State)] — a proc-macro to auto-generate channel mappings

Everything else — LLM providers, persistent storage, tool systems — lives in your code or companion crates. This keeps eko-core focused, dependency-light, and easy to embed in any project.