Documentation Index
Fetch the complete documentation index at: https://docs.mengram.io/llms.txt
Use this file to discover all available pages before exploring further.
Installation
pip install mengram-ai[async]
This installs httpx for non-blocking HTTP.
Initialize
from mengram import AsyncMengram
m = AsyncMengram(api_key="om-your-key")
# Or use environment variable
m = AsyncMengram()
Context Manager
async with AsyncMengram() as m:
results = await m.search("deployment")
profile = await m.get_profile()
# Client automatically closed
All Methods
Every method is async and must be await-ed:
import asyncio
from mengram import AsyncMengram
async def main():
m = AsyncMengram()
# Add memories
result = await m.add([
{"role": "user", "content": "Deployed on Railway with PostgreSQL"},
])
# Upload a file (PDF, DOCX, TXT, MD)
await m.add_file("meeting-notes.pdf")
# Search
results = await m.search("deployment")
# Unified search
all_results = await m.search_all("issues")
# Profile
profile = await m.get_profile()
# Episodes & procedures
events = await m.episodes(query="deployment")
procs = await m.procedures(query="deploy")
# Feedback
await m.procedure_feedback(proc_id, success=True)
# Graph & timeline
graph = await m.graph()
facts = await m.timeline(after="2026-01-01")
# Agents & insights
await m.run_agents()
await m.reflect()
insights = await m.insights()
# Memory management
await m.dedup()
await m.merge("source", "target")
await m.archive_fact("Entity", "old fact")
# Triggers
triggers = await m.get_triggers()
# Webhooks
await m.create_webhook(url="https://example.com/hook")
hooks = await m.get_webhooks()
# Jobs
status = await m.job_status("job-abc")
result = await m.wait_for_job("job-abc")
# Close when done
await m.close()
asyncio.run(main())
Available Methods
| Method | Description |
|---|
add(messages, ...) | Add memories from conversation |
add_text(text, ...) | Add memories from plain text |
add_file(file_path, ...) | Upload file (PDF, DOCX, TXT, MD) |
search(query, ...) | Semantic search |
search_all(query, ...) | Unified search (all 3 types) |
get_all(...) | List all entities |
get(name, ...) | Get specific entity |
delete(name, ...) | Delete entity |
stats(...) | Usage statistics |
get_profile(...) | Cognitive Profile |
episodes(...) | Episodic memories |
procedures(...) | Procedural memories |
procedure_feedback(id, ...) | Record success/failure |
graph(...) | Knowledge graph |
timeline(...) | Temporal search |
run_agents(...) | Run memory agents |
reflect(...) | Trigger reflection |
insights(...) | Get AI insights |
dedup(...) | Deduplicate entities |
merge(src, target, ...) | Merge entities |
archive_fact(entity, fact, ...) | Archive a fact |
get_triggers(...) | Get smart triggers |
create_webhook(url, ...) | Create webhook |
get_webhooks() | List webhooks |
job_status(id) | Check job status |
wait_for_job(id, ...) | Poll until job completes |
Retry & Error Handling
The async client automatically retries on transient errors (429, 502, 503, 504) and network failures, with exponential backoff up to 3 attempts. 402 (quota exceeded) is never retried — it raises QuotaExceededError immediately.
from mengram import AsyncMengram
from mengram.cloud.async_client import QuotaExceededError
async def safe_add(m, messages):
try:
return await m.add(messages)
except QuotaExceededError as e:
print(f"Quota hit: {e.action} {e.current}/{e.limit} ({e.plan})")
# e.action = "add" | "search" | "add_file"
# e.plan = "free" | "starter" | "pro"