Code Execution
The CodeModeExecutor is the heart of MCP Codemode, running Python code that composes MCP tools.
Overview
Instead of making individual tool calls through LLM inference, agents write Python code that:
Important Architecture Note: The tools you call in your code (e.g., from generated.servers.example_mcp import tool) are actual MCP tools and communicate through the MCP protocol. The CodemodeToolset itself is a Pydantic AI toolset (not an MCP server), but it enables you to write Python code that orchestrates MCP tool calls. This is by design - MCP servers remain MCP servers, and codemode provides a code-first interface for composing them programmatically.
- Calls multiple tools in sequence or parallel
- Uses loops and conditionals for complex logic
- Handles errors with try/except
- Stores intermediate results in variables
Basic Execution
from mcp_codemode import CodeModeExecutor, ToolRegistry
# Set up registry (see Tool Discovery)
registry = ToolRegistry()
# ... add servers ...
await registry.discover_all()
# Create and set up executor
executor = CodeModeExecutor(registry)
await executor.setup()
# Execute code
result = await executor.execute("""
from generated.servers.filesystem import read_file
content = await read_file({"path": "/tmp/data.txt"})
print(f"Read {len(content)} characters")
""")
print(result.stdout) # "Read 42 characters"
print(result.stderr) # ""
print(result.text) # Main result (if any)
print(result.success) # True
Execution Result
The execute() method returns an Execution object:
result = await executor.execute(code)
# Check success
if result.success:
print("Code executed successfully")
else:
print(f"Error: {result.error}")
# Get output (stdout/stderr from the code)
print(result.stdout)
print(result.stderr)
# Get main result text
print(result.text)
Execution Options
Timeout
Set a maximum execution time:
result = await executor.execute(
code,
timeout=30.0 # 30 seconds
)
Working Directory
Specify where the code runs:
result = await executor.execute(
code,
working_dir="/workspace/project"
)
Environment Variables
Pass environment variables:
result = await executor.execute(
code,
env={"API_KEY": "secret", "DEBUG": "true"}
)
Tool Composition Patterns
Sequential Execution
Call tools one after another:
result = await executor.execute("""
from generated.servers.filesystem import read_file, write_file
# Read, transform, write
content = await read_file({"path": "/input.txt"})
transformed = content.upper()
await write_file({"path": "/output.txt", "content": transformed})
""")
Parallel Execution
Use asyncio.gather for concurrent tool calls:
result = await executor.execute("""
import asyncio
from generated.servers.filesystem import read_file
files = ["a.txt", "b.txt", "c.txt", "d.txt"]
# Read all files in parallel
contents = await asyncio.gather(*[
read_file({"path": f"/data/{f}"}) for f in files
])
total = sum(len(c) for c in contents)
print(f"Total: {total} characters from {len(files)} files")
""")
Conditional Logic
Use if/else for decision making:
result = await executor.execute("""
from generated.servers.filesystem import read_file, write_file
from generated.servers.web import fetch_url
# Try local file first, fall back to web
try:
content = await read_file({"path": "/cache/data.json"})
print("Using cached data")
except:
content = await fetch_url({"url": "https://api.example.com/data"})
await write_file({"path": "/cache/data.json", "content": content})
print("Fetched fresh data")
""")
Loops
Process collections of items:
result = await executor.execute("""
from generated.servers.filesystem import list_directory, read_file
entries = await list_directory({"path": "/documents"})
for entry in entries["entries"]:
if entry.endswith(".md"):
content = await read_file({"path": f"/documents/{entry}"})
word_count = len(content.split())
print(f"{entry}: {word_count} words")
""")
Error Handling
Use try/except for robust execution:
result = await executor.execute("""
from generated.servers.filesystem import read_file, write_file
files = ["important.txt", "optional.txt", "extra.txt"]
results = []
for f in files:
try:
content = await read_file({"path": f"/data/{f}"})
results.append({"file": f, "status": "success", "size": len(content)})
except Exception as e:
results.append({"file": f, "status": "failed", "error": str(e)})
successful = [r for r in results if r["status"] == "success"]
print(f"Processed {len(successful)}/{len(files)} files")
""")
Sandbox Environment
Code executes in an isolated sandbox for safety.
Tool Call Architecture: When your code calls await generate_random_text(...) (a tool from generated.servers.example_mcp), it:
- Executes in the sandbox environment
- Makes an actual MCP protocol request to the MCP server
- Returns the result to your code
This means you get the safety and isolation of MCP servers while writing natural Python code. The sandbox handles the MCP protocol communication transparently.
Sandbox Configuration
from mcp_codemode import CodeModeConfig, CodeModeExecutor
config = CodeModeConfig(
sandbox_variant="local", # "local" or "datalayer-runtime"
workspace_path="/workspace", # Working directory
generated_path="/tmp/generated", # Where to put generated bindings
default_timeout=30.0, # Default execution timeout
)
executor = CodeModeExecutor(registry, config=config)
Sandbox Variants
- local: Runs in a subprocess on the same machine
- datalayer-runtime: Runs in a cloud-based Datalayer Runtime
Available in Sandbox
The sandbox has access to:
- Standard Python libraries
asynciofor async operations- Generated tool bindings (
from generated.servers.X import Y) - Skills (
from skills.name import func)
Tool Call Tracking
Track which tools were called during execution:
result = await executor.execute(code)
# Get tool call history
for call in result.tool_calls:
print(f"Called: {call.tool_name}")
print(f" Args: {call.arguments}")
print(f" Result: {call.result}")
print(f" Duration: {call.duration_ms}ms")
State Persistence
Variables persist across executions within the same executor:
# First execution
await executor.execute("""
data = {"count": 0}
""")
# Second execution - uses state from first
await executor.execute("""
data["count"] += 1
print(f"Count is now {data['count']}") # Prints: Count is now 1
""")
Clearing State
# Clear all state
executor.clear_state()
# Or create a fresh executor
executor = CodeModeExecutor(registry)
await executor.setup()
Best Practices
1. Keep Code Focused
Write small, focused code blocks rather than one large script:
# Good: Focused on one task
result = await executor.execute("""
content = await read_file({"path": "/data.txt"})
lines = content.strip().split("\\n")
print(f"File has {len(lines)} lines")
""")
# Avoid: Too much in one execution
2. Use Parallel Execution for I/O
MCP tools are I/O-bound. Use asyncio.gather for parallelism:
# Good: Parallel reads
contents = await asyncio.gather(*[read_file({"path": f}) for f in files])
# Slow: Sequential reads
contents = []
for f in files:
contents.append(await read_file({"path": f}))
3. Handle Errors Gracefully
Always wrap tool calls that might fail:
try:
result = await risky_tool({"param": value})
except Exception as e:
print(f"Tool failed: {e}")
result = default_value
4. Log Progress
Use print statements for visibility:
print(f"Processing {len(items)} items...")
for i, item in enumerate(items):
if i % 10 == 0:
print(f"Progress: {i}/{len(items)}")
await process(item)
print("Done!")