Skip to main content

A guide for coding agents to use workers and messaging

Keywords: workers, messaging, distributed computing, send_and_receive, handle_message, Zone.nodes
Key Terms: See the definitions section of the main guide.
This guide shows how to create workers that process messages asynchronously, send messages between workers, and build distributed applications that scale across multiple pods.

Understanding Workers

Workers are message-processing components that follow the actor model. Each worker:
  • Has a unique name within its node
  • Receives messages asynchronously
  • Processes one message at a time
  • Can reply to the sender
  • Can run on the local node or on remote nodes in other containers and pods
Workers enable:
  • Asynchronous processing: Handle long-running tasks without blocking
  • Parallelism: Process multiple requests concurrently across workers
  • Distribution: Spread workload across multiple machines
  • Isolation: Each worker maintains its own state independently

Nodes and Containers

Important: Not all containers in a zone are Autonomy Nodes.
  • Autonomy Node containers: Containers that run Node.start() - these can run workers and exchange messages
  • Non-Node containers: Containers that don’t run Autonomy Nodes, such as MCP servers, databases, or other services
In the examples below:
  • The main and runner containers are Autonomy Nodes (they call Node.start())
  • You could also have containers running MCP servers or other services that don’t call Node.start()
  • Only containers that are Autonomy Nodes can run workers and participate in messaging

⚠️ Message Serialization

CRITICAL: Worker messages MUST be strings. For structured data, use JSON encoding. CORRECT:
import json
from autonomy import Node

class Calculator:
    async def handle_message(self, context, message: str):
        data = json.loads(message)  # Parse JSON string
        result = data["x"] + data["y"]
        await context.reply(json.dumps({"result": result}))  # Reply with JSON string

async def main(node):
    await node.start_worker("calc", Calculator())
    message = json.dumps({"x": 10, "y": 32})  # Convert dict to JSON string
    reply = await node.send_and_receive("calc", message)
    result = json.loads(reply)  # Parse JSON string back to dict
    print(f"Result: {result['result']}")  # 42
WRONG (will crash with TypeError):
# This will fail!
message = {"x": 10, "y": 32}  # Dict, not string
reply = await node.send_and_receive("calc", message)  # TypeError!

# This will also fail!
await context.reply({"result": 42})  # Dict, not string - TypeError!
Why: The messaging layer requires strings for network transmission. Always use json.dumps() to send and json.loads() to receive structured data.

Create a Basic Worker

Step 1: Define a Worker Class

Create a worker class with a handle_message method:
from autonomy import Node


class Echoer:
    async def handle_message(self, context, message):
        print(f"`echoer` received: {message}\n")
        await context.reply(message)


async def main(node):
    await node.start_worker("echoer", Echoer())
    reply = await node.send_and_receive("echoer", "hello")
    print(f"`main` received: {reply}\n")


Node.start(main)
How This Works:
  1. Worker Class: Echoer defines async def handle_message(self, context, message)
  2. Start Worker: await node.start_worker("echoer", Echoer()) creates a worker named “echoer”
  3. Send Message: await node.send_and_receive("echoer", "hello") sends “hello” and waits for reply
  4. Reply: await context.reply(message) sends the response back to the caller

Step 2: Deploy and Test

Create the complete file structure:
hello/
├── autonomy.yaml
└── images/main/
    ├── Dockerfile
    └── main.py
autonomy.yaml:
name: hello
pods:
  - name: main-pod
    public: true
    containers:
      - name: main
        image: main
images/main/Dockerfile:
FROM ghcr.io/build-trust/autonomy-python
COPY . .
ENTRYPOINT ["python", "main.py"]
images/main/main.py (use the Echoer example above) Deploy the zone:
autonomy zone deploy
Check the logs to see the messages. Follow the logging instructions. You should see output like:
`echoer` received: hello
`main` received: hello

Worker Message Patterns

This example demonstrates send-and-forget, timeouts, stateful workers, and cleanup:
import asyncio
from autonomy import Node


class Printer:
    async def handle_message(self, context, message):
        print(f"LOG: {message}")
        # No reply sent - fire and forget


class Counter:
    def __init__(self):
        self.count = 0

    async def handle_message(self, context, message):
        if message == "increment":
            self.count += 1
        await context.reply(f"Count: {self.count}")


async def main(node):
    # Start workers
    await node.start_worker("printer", Printer())
    await node.start_worker("counter", Counter())

    # Send and forget (no reply expected)
    await node.send_message("printer", "Application started")

    # Stateful worker with multiple messages
    await node.send_and_receive("counter", "increment", timeout=1000)  # Count: 1
    await node.send_and_receive("counter", "increment", timeout=1000)  # Count: 2
    reply = await node.send_and_receive("counter", "get", timeout=1000)
    print(reply)  # Count: 2

    # Cleanup workers when done
    await node.stop_worker("printer")
    await node.stop_worker("counter")


Node.start(main)
Key Patterns:
  • Send-and-forget: send_message() for logging, notifications (no reply)
  • Timeouts: Always use timeout=1000 (milliseconds) in send_and_receive()
  • State: Workers maintain state across messages (Counter keeps count)
  • Cleanup: Call stop_worker() when done

Error Handling and Cleanup

Production-ready pattern for distributed worker management:
import asyncio
import json
from autonomy import Node, Zone


class DataProcessor:
    async def handle_message(self, context, message):
        data = json.loads(message)
        result = {"processed": len(data)}
        await context.reply(json.dumps(result))


async def process_with_cleanup(node):
    """Production-ready worker management with timeouts and cleanup"""
    runners = await Zone.nodes(node, filter="runner")
    workers = []

    try:
        # Create workers
        for i, runner in enumerate(runners):
            worker_name = f"worker_{i}"
            await runner.start_worker(worker_name, DataProcessor())
            workers.append((runner, worker_name))

        # Process items with error handling
        results = []
        for runner, worker_name in workers:
            try:
                message = json.dumps({"task": "process"})
                reply = await asyncio.wait_for(
                    runner.send_and_receive(worker_name, message),
                    timeout=10.0  # Adjust: 10-30s cold start, 1-5s warm
                )
                results.append(json.loads(reply))
            except asyncio.TimeoutError:
                print(f"Worker {worker_name} timed out")
            except Exception as e:
                print(f"Worker {worker_name} failed: {e}")
                continue  # Process other workers

        return results

    finally:
        # ALWAYS cleanup - runs even if exception occurred
        for runner, worker_name in workers:
            try:
                await asyncio.wait_for(
                    runner.stop_worker(worker_name),
                    timeout=5.0
                )
            except Exception as e:
                print(f"Warning: Could not stop {worker_name}: {e}")


async def main(node):
    results = await process_with_cleanup(node)
    print(f"Processed {len(results)} items")


Node.start(main)
Key Patterns:
  • asyncio.wait_for() prevents indefinite waiting
  • try/except handles individual failures gracefully
  • try/finally ensures cleanup always happens
  • Continue processing other workers on failure
  • Return partial results rather than failing completely

Distributed Workers Across Multiple Machines

Workers can run on different machines in your zone. This enables true parallel processing at scale.

Step 1: Configure Multiple Pods

Create an autonomy.yaml with multiple pods:
name: distribute
pods:
  - name: main-pod
    size: big
    public: true
    containers:
      - name: main
        image: main

  - name: runner-pod
    size: big
    clones: 5  # Creates 5 identical pods
    containers:
      - name: runner
        image: runner
Key Concepts:
  • main-pod: The primary pod that coordinates work
  • runner-pod: Worker pods that process tasks
  • clones: 5: Creates 5 separate machines running the runner container
  • Each clone is a separate machine with its own Node
  • Both main and runner containers are Autonomy Nodes (they call Node.start())
  • You could also add non-Node containers in a Zone (like MCP servers) that don’t call Node.start()
IMPORTANT: If your distributed workers need MCP tools, each pod must have its own MCP server container since agents and workers access MCP servers over localhost. See the tools guide section on multi-pod deployments for complete examples.

Step 2: Discover Nodes in a Zone

Use Zone.nodes() to find nodes running in other containers:
from autonomy import Node, Zone


async def main(node):
    # Get all nodes, in this node's zone, matching the filter
    runners = await Zone.nodes(node, filter="runner")

    print(f"Found {len(runners)} runner nodes")
    for runner in runners:
        print(f"  - {runner.name}")


Node.start(main)
How This Works:
  1. Zone.nodes(node, filter=“runner”): Returns list of Node objects from pods containing “runner” in their name
  2. Each Node object represents a remote node running in a different pod
  3. You can interact with remote nodes the same way as the local node

Step 3: Start Workers on Remote Nodes

Start workers on remote nodes just like local workers:
from autonomy import Node, Zone
import secrets


class Worker:
    async def handle_message(self, context, message):
        await context.reply(f"Processed: {message}")


async def main(node):
    # Get remote nodes
    runners = await Zone.nodes(node, filter="runner")

    # Start a worker on each remote node
    for runner in runners:
        worker_name = secrets.token_hex(3)
        await runner.start_worker(worker_name, Worker())

        # Send message to worker on remote node
        reply = await runner.send_and_receive(
            worker_name,
            f"hello from {runner.name}",
            timeout=1000
        )
        print(f"Got reply: {reply}")


Node.start(main)
Important:
  • Use runner.start_worker() instead of node.start_worker() to start on remote node
  • Use runner.send_and_receive() to send messages to workers on that remote node
  • Each runner node operates independently

Step 4: Distribute Work Across Nodes

Process work in parallel across multiple machines:
from autonomy import Node, Zone
import asyncio
import secrets
import json


class DataProcessor:
    async def handle_message(self, context, message):
        data = json.loads(message)
        # Process data (expensive computation)
        result = {"processed": len(data), "items": data}
        await context.reply(json.dumps(result))


async def process_on_runner(runner, data):
    # Start worker on this runner
    worker_name = secrets.token_hex(3)
    await runner.start_worker(worker_name, DataProcessor())

    # Send data and get result
    data_json = json.dumps(data)
    result_json = await runner.send_and_receive(
        worker_name,
        data_json,
        timeout=10000
    )

    # Stop worker
    await runner.stop_worker(worker_name)

    return json.loads(result_json)


async def main(node):
    # Data to process
    all_data = [
        ["item1", "item2", "item3"],
        ["item4", "item5", "item6"],
        ["item7", "item8", "item9"],
        ["item10", "item11", "item12"],
    ]

    # Get runner nodes
    runners = await Zone.nodes(node, filter="runner")

    # Distribute work across runners
    futures = []
    for i, data in enumerate(all_data):
        runner = runners[i % len(runners)]  # Round-robin distribution
        future = process_on_runner(runner, data)
        futures.append(future)

    # Wait for all to complete
    results = await asyncio.gather(*futures)

    print(f"Processed {len(results)} chunks across {len(runners)} runners")
    for result in results:
        print(f"  - {result}")


Node.start(main)
How This Works:
  1. Round-robin: runners[i % len(runners)] distributes work evenly
  2. Parallel execution: asyncio.gather(*futures) runs all workers simultaneously
  3. Auto-cleanup: Each worker is stopped after processing
  4. Work runs on 5 different machines in parallel

Complete Example: Distributed Code Analyzer

Analyze GitHub repositories in parallel across multiple runner pods. File Structure:
analyzer/
├── autonomy.yaml
└── images/
    ├── main/
    │   ├── Dockerfile
    │   └── main.py
    └── runner/
        ├── Dockerfile
        └── main.py
autonomy.yaml:
name: analyzer
pods:
  - name: main-pod
    size: big
    public: true
    containers:
      - name: main
        image: main
  - name: runner-pod
    size: big
    clones: 3
    containers:
      - name: runner
        image: runner
images/runner/main.py:
from autonomy import Node
Node.start()  # Wait for work from main
images/main/main.py:
from autonomy import Agent, Model, Node, Zone
import asyncio
import secrets
import json


class CodeAnalyzer:
    def __init__(self, node):
        self.node = node

    async def handle_message(self, context, message):
        repos = json.loads(message)
        results = []

        for org, repo in repos:
            agent = await Agent.start(
                node=self.node,
                instructions="Analyze the repository and summarize its purpose.",
                model=Model("nova-micro-v1"),
            )
            receiver = await agent.send_message(f"Analyze {org}/{repo}")
            analysis = await receiver.receive_message(timeout=10000)
            await Agent.stop(self.node, agent.name)

            results.append({"repo": f"{org}/{repo}", "summary": str(analysis[0].content)})

        await context.reply(json.dumps(results))


async def main(node):
    repos = [("pallets", "flask"), ("psf", "requests"), ("pandas-dev", "pandas")]
    runners = await Zone.nodes(node, filter="runner")

    # Split repos across runners
    chunk_size = len(repos) // len(runners) + 1
    chunks = [repos[i:i + chunk_size] for i in range(0, len(repos), chunk_size)]

    # Process in parallel
    futures = []
    workers = []

    try:
        for runner, chunk in zip(runners, chunks):
            worker_name = secrets.token_hex(3)
            await runner.start_worker(worker_name, CodeAnalyzer(runner))
            workers.append((runner, worker_name))

            future = runner.send_and_receive(worker_name, json.dumps(chunk), timeout=60000)
            futures.append(future)

        results = await asyncio.gather(*futures)

        print(f"Analyzed {len(repos)} repositories:")
        for result_json in results:
            for result in json.loads(result_json):
                print(f"  - {result['repo']}: {result['summary'][:80]}...")

    finally:
        for runner, worker_name in workers:
            await runner.stop_worker(worker_name)


Node.start(main)
Dockerfiles (both images/main/Dockerfile and images/runner/Dockerfile):
FROM ghcr.io/build-trust/autonomy-python
COPY . .
ENTRYPOINT ["python", "main.py"]
Deploy:
autonomy zone deploy

List Workers on Nodes

Check which workers are running:
from autonomy import Node, Zone
import asyncio


async def main(node):
    # List workers on local node
    local_workers = await node.list_workers()
    print(f"Local workers: {[w['name'] for w in local_workers]}")

    # List workers on all runner nodes
    runners = await Zone.nodes(node, filter="runner")
    futures = [runner.list_workers() for runner in runners]
    workers_per_runner = await asyncio.gather(*futures)

    for runner, workers in zip(runners, workers_per_runner):
        print(f"{runner.name}: {[w['name'] for w in workers]}")


Node.start(main)

Best Practices

Do:
  • Always use JSON: Use json.dumps() to send, json.loads() to receive structured data
  • Use timeouts: Specify timeouts in send_and_receive() to prevent hanging (10-30s for cold, 1-5s for warm)
  • Use try/finally: Always clean up workers, even when errors occur
  • Handle errors: Use asyncio.wait_for() and try/except for graceful degradation
  • Unique names: Use secrets.token_hex(3) for worker names
  • Test progressively: Start simple (nodes → workers → messages) before full distribution
  • Distribute work: Use multiple runners with asyncio.gather() for parallelism
Don’t:
  • Send dicts/objects: Never send {"data": 123} - use json.dumps({"data": 123}) instead
  • Skip cleanup: Always stop workers in finally block to prevent resource leaks
  • Omit error handling: Always use timeouts and exception handling on async operations
  • Test immediately: Wait 2-3 minutes after deployment before testing
  • Use same name twice: Each worker needs a unique name on its node
  • Assume order: Workers run concurrently, not sequentially

Key Concepts Summary

Workers run on Nodes:
  • Each pod has one or more containers
  • Containers that call Node.start() are Autonomy Nodes
  • Not all containers are Nodes (e.g., MCP servers, databases)
  • Each Autonomy Node can run multiple workers
  • Workers have unique names within their Node
Message Flow:
  • Caller sends message to worker
  • Worker processes message asynchronously
  • Worker optionally replies
  • Caller receives reply
Distribution:
  • Use clones in autonomy.yaml to create multiple pods
  • Use Zone.nodes() to discover nodes in the zone
  • Start workers on remote nodes with remote_node.start_worker()
  • Work executes in parallel across all nodes