Write your own LLM client
A custom LLM-side client comes in two flavors. Style A is what most clients should do: call dispatch tools, get JSON back. Style B is for clients that need to observe bus traffic the server didn’t originate on their behalf — progress events, dispatches from other LLMs, event broadcasts.
Pick a style
Section titled “Pick a style”| Need | Style A (direct calls) | Style B (custom subscription) |
|---|---|---|
| Read scene info, run code, download assets | Yes | Yes |
| Listen for progress on long-running bakes | No | Yes |
| Observe dispatches you didn’t originate | No | Yes |
| Approval-workflow inbox | No | Yes |
| Multi-LLM coordination via broadcasts | No | Yes |
| Boilerplate to write | Low (just JWT + client) | High (handler + JobWaiter + filter) |
Start with Style A. Only fall back to Style B when the pattern requires it.
Style A — direct dispatch tool calls
Section titled “Style A — direct dispatch tool calls”The minimal happy path. Three things: JWT, transport, tool call.
Stage 1 — Get a JWT
Section titled “Stage 1 — Get a JWT”import requests
def login(base: str, username: str, password: str) -> dict: r = requests.post( f"{base}/auth/login", json={"username": username, "password": password}, timeout=10, ) r.raise_for_status() return r.json() # {access_token, refresh_token, token_type, expires_in, user}
auth = login("http://localhost:8000", "demo", "demo")token = auth["access_token"]Stage 2 — Open an authenticated FastMCP transport
Section titled “Stage 2 — Open an authenticated FastMCP transport”from fastmcp import Clientfrom fastmcp.client.transports import StreamableHttpTransport
transport = StreamableHttpTransport( url="http://localhost:8000/mcp", headers={"Authorization": f"Bearer {token}"},)Stage 3 — Call dispatch tools
Section titled “Stage 3 — Call dispatch tools”import asyncio, json
async def main(): async with Client(transport) as client: # Read the active scene. Returns a JSON string; parse it. raw = await client.call_tool("blender_get_scene_info", {}) result = json.loads(raw.content[0].text) print(result) # {"status": "completed", "command": "get_scene_info", # "target_uuid": "blender-demo01", "job_id": "j-...", # "result": "{...scene info as JSON string...}", "error": ""}
# Run arbitrary Python in Blender. raw = await client.call_tool("blender_execute_code", { "code": "import bpy; print(len(bpy.data.objects))" }) print(json.loads(raw.content[0].text)["result"]) # "3\n"
# Download a PolyHaven HDRI. Status probe first. status = json.loads( (await client.call_tool("blender_get_polyhaven_status", {})).content[0].text ) if status["status"] == "completed": await client.call_tool("blender_download_polyhaven_asset", { "asset_id": "kloofendal_43d_clear_puresky", "asset_type": "hdris", "resolution": "2k", })
asyncio.run(main())That’s a complete Style A client. No registration, no from_uuid, no job_id. The server’s BlenderDispatchComponent handles all of it.
When target_uuid matters
Section titled “When target_uuid matters”Auto-pick works when exactly one blender client is connected. With zero or more than one, dispatch tools return structured errors:
{"status": "no_client", "command": "get_scene_info", "hint": "No Blender client connected to your bus. Open Blender, enable the BlenderMCP addon, click Login then Connect."}{"status": "ambiguous_target", "command": "get_scene_info", "candidates": ["blender-abc", "blender-def"], "hint": "Multiple Blender clients connected; pass target_uuid=<one of candidates> to disambiguate."}Pass target_uuid in the arguments dict to pick one:
await client.call_tool("blender_get_scene_info", {"target_uuid": "blender-abc"})Tuning timeouts
Section titled “Tuning timeouts”Every dispatch tool accepts _timeout (underscore prefix avoids collision with handler kwargs):
await client.call_tool("blender_download_polyhaven_asset", { "asset_id": "...", "asset_type": "models", "_timeout": 600.0, # default for downloads is 180s; bump for big models})Per-tool defaults: status probes 15s, most reads 30s, code exec / Rodin jobs 60s, downloads 180s.
Style B — Custom _message_bus subscription
Section titled “Style B — Custom _message_bus subscription”For long-running progress streams, observing other LLMs’ dispatches, or receiving broadcasts. Five things: JWT, transport, message handler, client-side JobWaiter, register-on-the-bus.
Stage 1 — Get a JWT
Section titled “Stage 1 — Get a JWT”Same as Style A.
Stage 2 — Open an authenticated FastMCP transport
Section titled “Stage 2 — Open an authenticated FastMCP transport”Same as Style A.
Stage 3 — A message handler that filters bus traffic
Section titled “Stage 3 — A message handler that filters bus traffic”import json
MESSAGE_BUS_LOGGER = "_message_bus"
async def on_message(message): inner = getattr(message, "root", message) if getattr(inner, "method", None) != "notifications/message": return params = getattr(inner, "params", None) if params is None: return
logger = getattr(params, "logger", None) or ( params.get("logger") if isinstance(params, dict) else None ) if logger != MESSAGE_BUS_LOGGER: return
data = getattr(params, "data", None) or ( params.get("data") if isinstance(params, dict) else None ) if isinstance(data, str): data = json.loads(data)
# data shape: # {user_id, from_uuid, target_uuid, routing, payload, # job_id, message_id, priority, timestamp} handle_bus_record(data)The same shape arrives at every bus subscriber. The filter on logger == "_message_bus" is what keeps other MCP log traffic from being misinterpreted.
Stage 4 — Client-side JobWaiter
Section titled “Stage 4 — Client-side JobWaiter”import asyncio
class JobWaiter: def __init__(self): self._futures: dict[str, asyncio.Future] = {}
def register(self, job_id: str) -> asyncio.Future: loop = asyncio.get_event_loop() fut = loop.create_future() self._futures[job_id] = fut return fut
def resolve(self, data: dict): # called from on_message when a job_update arrives payload = data.get("payload", {}) if payload.get("kind") != "job_update": return fut = self._futures.pop(payload["job_id"], None) if fut and not fut.done(): fut.set_result(payload)Wire waiter.resolve(data) into your on_message.
Stage 5 — Register, dispatch, await
Section titled “Stage 5 — Register, dispatch, await”async def main(): waiter = JobWaiter()
async def handler(message): # reuse on_message from stage 3, plus: # waiter.resolve(decoded_data) ...
async with Client(transport, message_handler=handler) as client: # Subscribe to every priority level (default is "warning"). await client.set_logging_level("debug")
# Join the bus. my_uuid = "llm-myclient-001" await client.call_tool("blender_register_client", { "client_uuid": my_uuid, "client_type": "llm", "is_persistent": False, "capabilities": ["chat"], })
# Dispatch a script. job_id = "job-abc123" fut = waiter.register(job_id)
await client.call_tool("blender_send_message", { "target_uuid": "blender-demo01", "from_uuid": my_uuid, "priority": "info", "payload": { "message_type": "job_dispatch", "job_id": job_id, "script": "import bpy\nprint(len(bpy.data.objects))", }, })
# Wait for the addon's job_update reply. result = await asyncio.wait_for(fut, timeout=30) print(result) # {kind:'job_update', status:'completed', result:'3\n', ...}
asyncio.run(main())Cleanup
Section titled “Cleanup”await client.call_tool("blender_unregister_client", {"client_uuid": my_uuid})Ephemeral clients (is_persistent=False) age out of the bus when the session drops, but an explicit unregister keeps the client list clean and saves the next list_available_clients consumer from seeing stale entries.
Mixing styles
Section titled “Mixing styles”Nothing stops you from doing both. A Style A client that occasionally needs to observe broadcasts can wire a Style B message_handler onto its Client and keep using dispatch tools for the request/response paths. The server doesn’t care which style produced a given tool call.
Reference
Section titled “Reference”- Dispatch tools — all 24 signatures, return shapes, timeouts
- Bus tools —
register_client,send_message,job_update,list_available_clients,unregister_client - Priority levels — what
priority="info"actually means - Routing modes — direct, group, type-filter, broadcast
- ClientInfo — what
register_clientwrites - Command dispatch vs script dispatch — the underlying design