Skip to content

Write your own LLM client

A custom LLM-side client comes in two flavors. Style A is what most clients should do: call dispatch tools, get JSON back. Style B is for clients that need to observe bus traffic the server didn’t originate on their behalf — progress events, dispatches from other LLMs, event broadcasts.

NeedStyle A (direct calls)Style B (custom subscription)
Read scene info, run code, download assetsYesYes
Listen for progress on long-running bakesNoYes
Observe dispatches you didn’t originateNoYes
Approval-workflow inboxNoYes
Multi-LLM coordination via broadcastsNoYes
Boilerplate to writeLow (just JWT + client)High (handler + JobWaiter + filter)

Start with Style A. Only fall back to Style B when the pattern requires it.

The minimal happy path. Three things: JWT, transport, tool call.

import requests
def login(base: str, username: str, password: str) -> dict:
r = requests.post(
f"{base}/auth/login",
json={"username": username, "password": password},
timeout=10,
)
r.raise_for_status()
return r.json() # {access_token, refresh_token, token_type, expires_in, user}
auth = login("http://localhost:8000", "demo", "demo")
token = auth["access_token"]

Stage 2 — Open an authenticated FastMCP transport

Section titled “Stage 2 — Open an authenticated FastMCP transport”
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport
transport = StreamableHttpTransport(
url="http://localhost:8000/mcp",
headers={"Authorization": f"Bearer {token}"},
)
import asyncio, json
async def main():
async with Client(transport) as client:
# Read the active scene. Returns a JSON string; parse it.
raw = await client.call_tool("blender_get_scene_info", {})
result = json.loads(raw.content[0].text)
print(result)
# {"status": "completed", "command": "get_scene_info",
# "target_uuid": "blender-demo01", "job_id": "j-...",
# "result": "{...scene info as JSON string...}", "error": ""}
# Run arbitrary Python in Blender.
raw = await client.call_tool("blender_execute_code", {
"code": "import bpy; print(len(bpy.data.objects))"
})
print(json.loads(raw.content[0].text)["result"]) # "3\n"
# Download a PolyHaven HDRI. Status probe first.
status = json.loads(
(await client.call_tool("blender_get_polyhaven_status", {})).content[0].text
)
if status["status"] == "completed":
await client.call_tool("blender_download_polyhaven_asset", {
"asset_id": "kloofendal_43d_clear_puresky",
"asset_type": "hdris",
"resolution": "2k",
})
asyncio.run(main())

That’s a complete Style A client. No registration, no from_uuid, no job_id. The server’s BlenderDispatchComponent handles all of it.

Auto-pick works when exactly one blender client is connected. With zero or more than one, dispatch tools return structured errors:

{"status": "no_client", "command": "get_scene_info",
"hint": "No Blender client connected to your bus. Open Blender, enable the BlenderMCP addon, click Login then Connect."}
{"status": "ambiguous_target", "command": "get_scene_info",
"candidates": ["blender-abc", "blender-def"],
"hint": "Multiple Blender clients connected; pass target_uuid=<one of candidates> to disambiguate."}

Pass target_uuid in the arguments dict to pick one:

await client.call_tool("blender_get_scene_info", {"target_uuid": "blender-abc"})

Every dispatch tool accepts _timeout (underscore prefix avoids collision with handler kwargs):

await client.call_tool("blender_download_polyhaven_asset", {
"asset_id": "...", "asset_type": "models",
"_timeout": 600.0, # default for downloads is 180s; bump for big models
})

Per-tool defaults: status probes 15s, most reads 30s, code exec / Rodin jobs 60s, downloads 180s.

Style B — Custom _message_bus subscription

Section titled “Style B — Custom _message_bus subscription”

For long-running progress streams, observing other LLMs’ dispatches, or receiving broadcasts. Five things: JWT, transport, message handler, client-side JobWaiter, register-on-the-bus.

Same as Style A.

Stage 2 — Open an authenticated FastMCP transport

Section titled “Stage 2 — Open an authenticated FastMCP transport”

Same as Style A.

Stage 3 — A message handler that filters bus traffic

Section titled “Stage 3 — A message handler that filters bus traffic”
import json
MESSAGE_BUS_LOGGER = "_message_bus"
async def on_message(message):
inner = getattr(message, "root", message)
if getattr(inner, "method", None) != "notifications/message":
return
params = getattr(inner, "params", None)
if params is None:
return
logger = getattr(params, "logger", None) or (
params.get("logger") if isinstance(params, dict) else None
)
if logger != MESSAGE_BUS_LOGGER:
return
data = getattr(params, "data", None) or (
params.get("data") if isinstance(params, dict) else None
)
if isinstance(data, str):
data = json.loads(data)
# data shape:
# {user_id, from_uuid, target_uuid, routing, payload,
# job_id, message_id, priority, timestamp}
handle_bus_record(data)

The same shape arrives at every bus subscriber. The filter on logger == "_message_bus" is what keeps other MCP log traffic from being misinterpreted.

import asyncio
class JobWaiter:
def __init__(self):
self._futures: dict[str, asyncio.Future] = {}
def register(self, job_id: str) -> asyncio.Future:
loop = asyncio.get_event_loop()
fut = loop.create_future()
self._futures[job_id] = fut
return fut
def resolve(self, data: dict):
# called from on_message when a job_update arrives
payload = data.get("payload", {})
if payload.get("kind") != "job_update":
return
fut = self._futures.pop(payload["job_id"], None)
if fut and not fut.done():
fut.set_result(payload)

Wire waiter.resolve(data) into your on_message.

async def main():
waiter = JobWaiter()
async def handler(message):
# reuse on_message from stage 3, plus:
# waiter.resolve(decoded_data)
...
async with Client(transport, message_handler=handler) as client:
# Subscribe to every priority level (default is "warning").
await client.set_logging_level("debug")
# Join the bus.
my_uuid = "llm-myclient-001"
await client.call_tool("blender_register_client", {
"client_uuid": my_uuid,
"client_type": "llm",
"is_persistent": False,
"capabilities": ["chat"],
})
# Dispatch a script.
job_id = "job-abc123"
fut = waiter.register(job_id)
await client.call_tool("blender_send_message", {
"target_uuid": "blender-demo01",
"from_uuid": my_uuid,
"priority": "info",
"payload": {
"message_type": "job_dispatch",
"job_id": job_id,
"script": "import bpy\nprint(len(bpy.data.objects))",
},
})
# Wait for the addon's job_update reply.
result = await asyncio.wait_for(fut, timeout=30)
print(result) # {kind:'job_update', status:'completed', result:'3\n', ...}
asyncio.run(main())
await client.call_tool("blender_unregister_client", {"client_uuid": my_uuid})

Ephemeral clients (is_persistent=False) age out of the bus when the session drops, but an explicit unregister keeps the client list clean and saves the next list_available_clients consumer from seeing stale entries.

Nothing stops you from doing both. A Style A client that occasionally needs to observe broadcasts can wire a Style B message_handler onto its Client and keep using dispatch tools for the request/response paths. The server doesn’t care which style produced a given tool call.