
Non-blocking async operations and automatic resource cleanup with context managers.
Every synchronous SDK method has an async counterpart prefixed with a. Use async methods
inside any asyncio-based application — FastAPI endpoints, async workers, or concurrent scripts.
import asyncio
from guidedmind import Client, SearchMethod
async def main():
client = Client()
# Async RAG search
response = await client.asearch(
query="How does authentication work?",
limit=10,
search_method=SearchMethod.HYBRID,
)
# Async short memory
await client.memory.short.aadd_record(
session_id="s-123",
role="user",
content="Explain rate limiting",
)
records = await client.memory.short.aget_records(session_id="s-123")
# Async long memory
await client.memory.long.astore_record(
user_id="user-456",
role="user",
content="I prefer concise answers",
)
memories = await client.memory.long.asearch(
query="user preferences",
limit=5,
)
# Async document upload
result = await client.documents.aupload_and_process(file_path="spec.pdf")
print(f"Indexed {result.chunks_created} chunks")
asyncio.run(main())The Client supports both synchronous and asynchronous context managers, ensuring
HTTP connections are always properly closed — recommended in long-running applications
and serverless functions.
# Synchronous context manager
with Client() as client:
results = client.search(query="What is RAG?")
print(results.results[0].content)
# HTTP connection pool closed automatically here
# Asynchronous context manager
async def handle_request(query: str):
async with Client() as client:
results = await client.asearch(query=query)
return results.resultsUse asyncio.gather to fire multiple independent requests concurrently:
import asyncio
from guidedmind import Client
async def parallel_search(queries: list[str]):
async with Client() as client:
tasks = [client.asearch(query=q, limit=3) for q in queries]
responses = await asyncio.gather(*tasks)
for query, response in zip(queries, responses):
print(f"\n{query}")
for r in response.results:
print(f" [{r.score:.2f}] {r.content[:80]}")
asyncio.run(parallel_search([
"What is our refund policy?",
"How do I upgrade my plan?",
"Where can I find API docs?",
]))from contextlib import asynccontextmanager
from fastapi import FastAPI
from guidedmind import Client
client: Client | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global client
client = Client() # initialise on startup
yield
await client.aclose() # clean up on shutdown
app = FastAPI(lifespan=lifespan)
@app.get("/search")
async def search(q: str):
response = await client.asearch(query=q, limit=5)
return [{"score": r.score, "content": r.content} for r in response.results]Instantiate a single Client at application startup and reuse it for all requests. Creating a new client per request wastes connection pool resources.
| Sync method | Async counterpart |
|---|---|
client.search(...) | await client.asearch(...) |
client.memory.short.add_record(...) | await client.memory.short.aadd_record(...) |
client.memory.short.get_records(...) | await client.memory.short.aget_records(...) |
client.memory.long.store_record(...) | await client.memory.long.astore_record(...) |
client.memory.long.search(...) | await client.memory.long.asearch(...) |
client.documents.upload(...) | await client.documents.aupload(...) |
client.documents.upload_and_process(...) | await client.documents.aupload_and_process(...) |
client.close() | await client.aclose() |