Skip to main content
Autonomy is designed on principles from the actor model, a powerful pattern for building concurrent and distributed systems. Before we dive into agents, it’s helpful to understand this foundation of actors. Think of actors as independent members of a team. Each has their own workspace (state), and they communicate only by passing notes (messages) to each other. No one shares their workspace, this prevents contention and makes applications naturally parallelizable and scalable.
Actors are lightweight, stateful objects that communicate using messages. Each actor has:
  • A unique address that other actors use to send it messages.
  • A mailbox where incoming messages queue up and wait to be processed.
  • Internal state that only this actor can access or modify.
  • Behavior, logic that defines how this actor reacts to each message it receives.
When an actor receives a message, it can:
  1. Send messages to other actors (asynchronously, without waiting for responses).
  2. Create new actors and delegate work to them.
  3. Update its internal state which may affect how future messages are handled.

No shared state Each actor’s state is completely private. Other actors cannot directly access or modify it. This eliminates entire classes of concurrency bugs like race conditions and deadlocks. Actors can only influence each other by sending messages. Message Processing While many actors run in parallel across the system, each individual actor processes only one message at a time. Messages wait in the actor’s mailbox and are handled sequentially. This means you never need to worry about two threads modifying an actor’s state simultaneously; it simply can’t happen. Asynchronous message passing When Actor A sends a message to Actor B, it doesn’t have to wait for B to process it or respond. The message is queued in B’s mailbox, and A can continue with its work immediately. This non-blocking communication allows actors to work concurrently without waiting on each other. Location Transparency Sending or receiving a message from another actor looks the same whether that other actor is running on the same machine or across the network. This makes it natural to distribute work and scale horizontally.
In Autonomy:
  • Simple actors are called workers.
  • Agents are intelligent autonomous actors that use large language models.
  • Both follow the actor model.
When an actor is idle and there are no messages in its mailbox, it consumes no CPU. This design makes it easy to run thousands of concurrent stateful actors that make the most optimal use of available CPU cores. Agents, for example, spend a majority of their lifespan waiting for calls to language models or tools to finish. While one agent is waiting, the actor runtime automatically gives the CPU core to a different actor that has a new message to process. This enables highly efficient and horizontally scalable applications.

Workers

Workers are Autonomy’s implementation of actors. Autonomy’s actor runtime is implemented in Rust and exposed to Python code using the Node class. All nodes in a zone can create encrypted, mutually authenticated, secure communication channels with other nodes.
Here’s a simple worker that echoes messages back:
from autonomy import Node


class Greeter:
    async def handle_message(self, context, message):
        await context.reply(f"Hello, {message}!")


async def main(node):
    # Start the worker
    await node.start_worker("greeter", Greeter())
    
    # Send a message and wait for reply
    reply = await node.send_and_receive("greeter", "Alice", timeout=10)
    print(reply)  # "Hello, Alice!"


Node.start(main)
What’s happening:
  1. Define a worker class with handle_message().
  2. When Node.start(main) is called, it turns the main function itself into a worker that can send and receive messages like other workers. This is why main is able to communicate with greeter.
  3. Start the Greeter worker with a unique name (greeter).
  4. Send a message from main to the greeter worker.
  5. The greeter worker processes the message and replies.
  6. The main worker receives the reply.
Worker Lifecycle Workers can be started and stopped dynamically:
# Start a worker
await node.start_worker("worker_name", WorkerClass())

# List all workers
workers = await node.list_workers()

# Stop a worker
await node.stop_worker("worker_name")

Messages

Message Types Messages must be strings. This is because the messaging layer transmits messages across the network, and strings are simple, universal, and work everywhere.
# Correct - string message
await node.send_message("worker", "hello")

# Wrong - will raise TypeError
await node.send_message("worker", {"data": 123})  # ❌
Structured Data with JSON For structured data, use JSON serialization:
import json

class Calculator:
    async def handle_message(self, context, message):
        # Parse incoming JSON
        data = json.loads(message)
        result = data["x"] + data["y"]
        
        # Reply with JSON
        await context.reply(json.dumps({"result": result}))


async def main(node):
    await node.start_worker("calc", Calculator())
    
    # Send structured data as JSON
    data = json.dumps({"x": 10, "y": 32})
    reply = await node.send_and_receive("calc", data, timeout=10)
    
    # Parse the reply
    result = json.loads(reply)
    print(f"Result: {result['result']}")  # 42
Message Flow Messages flow asynchronously between workers:
  1. Sender sends message to worker by name
  2. Message is queued if worker is busy
  3. Worker processes messages one at a time
  4. Worker can optionally reply
  5. Sender receives reply (if waiting)

State

Stateful Workers Workers can maintain state across messages. This is safe because workers process one message at a time:
class Counter:
    def __init__(self):
        self.count = 0
    
    async def handle_message(self, context, message):
        if message == "increment":
            self.count += 1
        await context.reply(f"Count: {self.count}")


async def main(node):
    await node.start_worker("counter", Counter())
    
    # Each message sees the updated state
    print(await node.send_and_receive("counter", "increment", timeout=10))  # Count: 1
    print(await node.send_and_receive("counter", "increment", timeout=10))  # Count: 2
    print(await node.send_and_receive("counter", "increment", timeout=10))  # Count: 3
Use cases for stateful workers:
  • Session management
  • Connection pooling
  • Rate limiting
  • Caching
  • Accumulating results
State Isolation No shared state between workers - each worker is completely isolated:
# Each worker has its own state
await node.start_worker("counter-1", Counter())
await node.start_worker("counter-2", Counter())
# These two counters are completely independent
Workers are single-threaded - state is safe within a worker:
class Safe:
    def __init__(self):
        self.count = 0  # Safe - only one message at a time
    
    async def handle_message(self, context, message):
        self.count += 1  # No race conditions
Initialization Use __init__() for one-time setup:
class Database:
    def __init__(self):
        self.connection = create_connection()  # Setup once
    
    async def handle_message(self, context, message):
        result = self.connection.query(message)
        await context.reply(result)

Communication

Message Patterns

Fire and Forget Send a message without waiting for a reply:
# Just send it and move on
await node.send_message("logger", "Application started")
Request-Reply Send a message and wait for a response:
# Wait for the worker to respond (timeout in seconds)
reply = await node.send_and_receive("calculator", "2+2", timeout=10)
print(reply)  # "4"
Important: Always specify a timeout (in seconds) to prevent hanging forever if the worker doesn’t respond. Timeout Handling Handle timeouts gracefully:
try:
    reply = await node.send_and_receive("worker", message, timeout=10)
    print(f"Got reply: {reply}")
except RuntimeError as e:
    if "timeout" in str(e).lower():
        print("Worker didn't respond in time")
    else:
        raise

Error Handling

Timeout Errors Symptoms: RuntimeError with “timeout” in the message Possible causes:
  • Worker doing heavy computation (increase timeout)
  • Worker crashed (check logs)
  • Network issues (for distributed workers)
  • Worker stuck on previous message
# Use longer timeout for complex operations (timeout in seconds)
try:
    reply = await node.send_and_receive("worker", msg, timeout=30)
except RuntimeError as e:
    if "timeout" in str(e).lower():
        print("Operation timed out after 30 seconds")
    else:
        raise
Recovery Strategies Implement fallback strategies for resilience:
try:
    reply = await node.send_and_receive("primary", msg, timeout=5)
except RuntimeError as e:
    if "timeout" in str(e).lower():
        # Have a fallback strategy
        reply = await node.send_and_receive("backup", msg, timeout=5)
    else:
        raise

Distribution

Architecture

Nodes and Pods One of the most powerful features: workers can run on different machines and communicate seamlessly.
Main Pod (coordinator)
├── Node running main.py
└── Starts workers on remote nodes

Runner Pods (3 clones = 3 machines)
├── Runner 1: Workers processing tasks
├── Runner 2: Workers processing tasks
└── Runner 3: Workers processing tasks
Clones Configuration Use clones to create multiple machines running the same container:
autonomy.yaml
name: distribute
pods:
  - name: main-pod
    public: true
    containers:
      - name: main
        image: main
  
  - name: runner-pod
    clones: 3  # Creates 3 separate machines
    containers:
      - name: runner
        image: runner

Implementation

Node Discovery Discover and connect to remote nodes:
from autonomy import Node, Zone

async def main(node):
    # Discover all runner nodes
    runners = await Zone.nodes(node, filter="runner")
    print(f"Found {len(runners)} runner nodes")
    
    # Get all nodes (no filter)
    all_nodes = await Zone.nodes(node)
    print(f"Total nodes: {len(all_nodes)}")
Remote Worker Management Start and manage workers on remote nodes:
images/main/main.py
from autonomy import Node, Zone


class DataProcessor:
    async def handle_message(self, context, message):
        # Process data here
        result = f"Processed: {message}"
        await context.reply(result)


async def main(node):
    # Discover all runner nodes
    runners = await Zone.nodes(node, filter="runner")
    print(f"Found {len(runners)} runner nodes")
    
    # Start a worker on each runner
    for i, runner in enumerate(runners):
        await runner.start_worker(f"processor-{i}", DataProcessor())
    
    # Send work to each worker
    for i in range(len(runners)):
        result = await node.send_and_receive(f"processor-{i}", f"task-{i}", timeout=10)
        print(result)


Node.start(main)
What’s happening:
  1. Zone.nodes() discovers all nodes matching a filter
  2. Start workers on each remote node
  3. Send messages to workers by name—they run on different machines!
  4. Messages are automatically routed across the network

Complete Multi-Node Example

Here’s a complete working example that demonstrates communication between nodes running on different pods. Architecture:
Main Pod (client)
└── Sends messages to runner

Runner Pod (runner)
└── Runs greeter worker that responds to messages
Configuration:
autonomy.yaml
name: multinode
pods:
  - name: main-pod
    size: small
    public: true
    containers:
      - name: client
        image: client

  - name: runner-pod
    size: small
    containers:
      - name: runner
        image: runner
Runner Node:
images/runner/main.py
from autonomy import Node, info

class Greeter:
    async def handle_message(self, context, message):
        info(f"Greeter received: {message}")
        reply = f"Hello! You said: {message}"
        await context.reply(reply)

async def main(node):
    info("Starting runner node...")
    await node.start_worker("greeter", Greeter())
    info("Runner is ready with greeter worker")

Node.start(main)
Client Node:
images/client/main.py
from autonomy import Node, Zone, info
import asyncio

async def main(node):
    info("Client node started")

    # Wait for runner to be ready with retry loop
    info("Waiting for runner to be ready...")
    runners = []
    max_retries = 10
    for attempt in range(max_retries):
        runners = await Zone.nodes(node, filter="runner")
        if runners:
            break
        info(f"No runner nodes found yet, retrying... (attempt {attempt + 1}/{max_retries})")
        await asyncio.sleep(2)

    if not runners:
        info("No runner nodes found after waiting!")
        return

    runner = runners[0]
    info(f"Connected to runner: {runner.name}")

    # Send messages to the greeter worker
    messages = [
        "Hello from client",
        "How are you?",
        "This is a test message",
        "Goodbye!"
    ]

    for i, message in enumerate(messages, 1):
        info(f"Sending message {i}: {message}")

        try:
            reply = await runner.send_and_receive("greeter", message, timeout=30)
            info(f"Received reply {i}: {reply}")
        except Exception as e:
            info(f"Error sending/receiving message {i}: {e}")

        # Small delay between messages
        await asyncio.sleep(1)

    info("Client completed all messages")

Node.start(main)
Dockerfiles:
images/runner/Dockerfile
FROM ghcr.io/build-trust/autonomy-python
COPY . .
ENTRYPOINT ["python", "main.py"]
images/client/Dockerfile
FROM ghcr.io/build-trust/autonomy-python
COPY . .
ENTRYPOINT ["python", "main.py"]
Deploy and Test:
# Deploy the zone
autonomy zone deploy

# View logs to see the communication
autonomy zone inlet --to logs > /tmp/logs.log 2>&1 &
sleep 5

# Open logs in browser
open http://127.0.0.1:32101
Expected Output: Client logs show:
INFO node Waiting for runner to be ready...
INFO node Connected to runner: a9eb812238f753132652ae09963a05e9-multinode-runner
INFO node Sending message 1: Hello from client
INFO node Received reply 1: Hello! You said: Hello from client
INFO node Sending message 2: How are you?
INFO node Received reply 2: Hello! You said: How are you?
...
Runner logs show:
INFO node Starting runner node...
INFO node Runner is ready with greeter worker
INFO node Greeter received: Hello from client
INFO node Greeter received: How are you?
...
Key Points:
  • Use Zone.nodes(node, filter="runner") to discover nodes by pod name prefix
  • Implement a retry loop to wait for nodes to become available
  • Messages are automatically routed across pods and machines
  • Workers on remote nodes are accessed the same way as local workers
  • The filter parameter matches against pod names (e.g., “runner” matches “runner-pod”)

Patterns

Echo Worker - Simple message echo:
class Echo:
    async def handle_message(self, context, message):
        await context.reply(message)

await node.start_worker("echo", Echo())
reply = await node.send_and_receive("echo", "hello", timeout=10)
Stateful Counter - Maintains count across messages:
class Counter:
    def __init__(self):
        self.count = 0
    
    async def handle_message(self, context, message):
        if message == "increment":
            self.count += 1
        elif message == "decrement":
            self.count -= 1
        elif message == "reset":
            self.count = 0
        await context.reply(str(self.count))
JSON Message Handler - Structured message processing:
class CommandHandler:
    async def handle_message(self, context, message):
        command = json.loads(message)
        action = command.get("action")
        
        if action == "process":
            result = await self.process(command.get("data"))
        elif action == "validate":
            result = await self.validate(command.get("data"))
        else:
            result = {"error": "Unknown action"}
        
        await context.reply(json.dumps(result))
In-Memory Store - Key-value storage pattern:
class InMemoryStore:
    def __init__(self):
        self.store = {}
    
    async def handle_message(self, context, message):
        # Parse command: {"action": "get|set", "key": "...", "value": "..."}
        command = json.loads(message)
        action = command.get("action")
        key = command.get("key")
        
        if action == "set":
            self.store[key] = command.get("value")
            await context.reply(json.dumps({"status": "ok"}))
        elif action == "get":
            value = self.store.get(key)
            await context.reply(json.dumps({"value": value}))

# Usage
await node.start_worker("store", InMemoryStore())

# Set a value
await node.send_and_receive("store", json.dumps({
    "action": "set",
    "key": "name",
    "value": "Alice"
}), timeout=10)

# Get a value
reply = await node.send_and_receive("store", json.dumps({
    "action": "get",
    "key": "name"
}), timeout=10)

result = json.loads(reply)
print(result["value"])  # "Alice"
Distributed Processing - Parallel processing across multiple machines:
async def distributed_process(node, data_chunks):
    # Discover runner nodes
    runners = await Zone.nodes(node, filter="runner")
    
    # Start processor on each runner
    for i, runner in enumerate(runners):
        await runner.start_worker(f"processor-{i}", DataProcessor())
    
    # Distribute work
    results = []
    for i, chunk in enumerate(data_chunks):
        worker_id = i % len(runners)  # Round-robin distribution
        result = await node.send_and_receive(
            f"processor-{worker_id}", 
            json.dumps(chunk), 
            timeout=30
        )
        results.append(json.loads(result))
    
    return results
Fire-and-Forget Logger - No-reply logging pattern:
class Logger:
    def __init__(self):
        self.log_file = open("app.log", "a")
    
    async def handle_message(self, context, message):
        timestamp = datetime.now().isoformat()
        self.log_file.write(f"[{timestamp}] {message}\n")
        self.log_file.flush()
        # No reply needed

# Usage
await node.start_worker("logger", Logger())
await node.send_message("logger", "Application started")
await node.send_message("logger", "User logged in")
Error Handling with Cleanup - Always clean up resources:
async def process_with_cleanup(node):
    runners = await Zone.nodes(node, filter="runner")
    workers = []
    
    try:
        # Create workers
        for i, runner in enumerate(runners):
            worker_name = f"worker-{i}"
            await runner.start_worker(worker_name, Processor())
            workers.append((runner, worker_name))
        
        # Process with error handling
        results = []
        for runner, worker_name in workers:
            try:
                reply = await node.send_and_receive(worker_name, message, timeout=10)
                results.append(reply)
            except RuntimeError as e:
                if "timeout" in str(e).lower():
                    print(f"Worker {worker_name} timed out")
                    continue
                raise
        
        return results
    
    finally:
        # Always cleanup
        for runner, worker_name in workers:
            try:
                await runner.stop_worker(worker_name)
            except Exception as e:
                print(f"Cleanup warning: {e}")
Worker Monitoring - Get a real-time view of all workers:
from autonomy import Node, Zone


async def list_all_workers(node):
    all_nodes = await Zone.nodes(node)
    
    for n in all_nodes:
        workers = await n.list_workers()
        print(f"Node {n.name}: {len(workers)} workers")
        for worker in workers:
            print(f"  - {worker}")
Resource Management - Manage limited resources across workers:
class ResourcePool:
    def __init__(self, max_resources=10):
        self.available = max_resources
        self.in_use = {}
    
    async def handle_message(self, context, message):
        command = json.loads(message)
        action = command.get("action")
        client_id = command.get("client_id")
        
        if action == "acquire":
            if self.available > 0:
                self.available -= 1
                self.in_use[client_id] = 1
                await context.reply(json.dumps({"status": "granted"}))
            else:
                await context.reply(json.dumps({"status": "denied"}))
        
        elif action == "release":
            if client_id in self.in_use:
                self.available += self.in_use.pop(client_id)
                await context.reply(json.dumps({"status": "released"}))

Operations

Monitoring

Listing Workers - Monitor active workers across your system:
# List workers on current node
local_workers = await node.list_workers()
print(f"Local workers: {local_workers}")

# List workers on all nodes
all_nodes = await Zone.nodes(node)
for n in all_nodes:
    workers = await n.list_workers()
    print(f"{n.name}: {workers}")
Tracking Messages - Add logging to track message flow:
class TrackedWorker:
    async def handle_message(self, context, message):
        print(f"[{datetime.now()}] Received: {message[:100]}")
        
        # Process message
        result = await self.process(message)
        
        print(f"[{datetime.now()}] Replying: {result[:100]}")
        await context.reply(result)
System Health - Monitor system health and performance:
async def health_check(node):
    health_status = {}
    
    # Check all nodes are reachable
    try:
        nodes = await Zone.nodes(node)
        health_status["nodes"] = len(nodes)
    except Exception as e:
        health_status["nodes_error"] = str(e)
    
    # Check critical workers
    critical_workers = ["database", "cache", "processor"]
    for worker in critical_workers:
        try:
            reply = await node.send_and_receive(worker, "ping", timeout=5)
            health_status[worker] = "healthy"
        except:
            health_status[worker] = "unhealthy"
    
    return health_status

Troubleshooting

Worker Not Responding Symptoms: send_and_receive() times out Solutions:
  1. Check worker was started successfully: await node.list_workers()
  2. Increase timeout for complex operations
  3. Check logs for errors in worker’s handle_message()
  4. Verify message format (must be a string)
Timeout Issues - Debug timeout problems:
class DebugWorker:
    async def handle_message(self, context, message):
        print(f"Started processing: {message}")
        start_time = time.time()
        
        # Your processing here
        result = await self.process(message)
        
        elapsed = time.time() - start_time
        print(f"Completed in {elapsed:.2f} seconds")
        
        await context.reply(result)
State Problems - Debug state issues:
class DebugState:
    def __init__(self):
        self.count = 0
        self.history = []
    
    async def handle_message(self, context, message):
        self.count += 1
        self.history.append(message)
        
        # Log current state
        print(f"Message #{self.count}: {message}")
        print(f"History length: {len(self.history)}")
        
        await context.reply(f"Processed message #{self.count}")
Serialization Errors Symptom: TypeError when sending messages Cause: Trying to send non-string messages Solution:
# Wrong
await node.send_and_receive("worker", {"data": 123})  # ❌

# Right
await node.send_and_receive("worker", json.dumps({"data": 123}))  # ✅