Skip to main content

Installation

pip install mengram-ai[async]
This installs httpx for non-blocking HTTP.

Initialize

from mengram import AsyncMengram

m = AsyncMengram(api_key="om-your-key")

# Or use environment variable
m = AsyncMengram()

Context Manager

async with AsyncMengram() as m:
    results = await m.search("deployment")
    profile = await m.get_profile()
# Client automatically closed

All Methods

Every method is async and must be await-ed:
import asyncio
from mengram import AsyncMengram

async def main():
    m = AsyncMengram()

    # Add memories
    result = await m.add([
        {"role": "user", "content": "Deployed on Railway with PostgreSQL"},
    ])

    # Upload a file (PDF, DOCX, TXT, MD)
    await m.add_file("meeting-notes.pdf")

    # Search
    results = await m.search("deployment")

    # Unified search
    all_results = await m.search_all("issues")

    # Profile
    profile = await m.get_profile()

    # Episodes & procedures
    events = await m.episodes(query="deployment")
    procs = await m.procedures(query="deploy")

    # Feedback
    await m.procedure_feedback(proc_id, success=True)

    # Graph & timeline
    graph = await m.graph()
    facts = await m.timeline(after="2026-01-01")

    # Agents & insights
    await m.run_agents()
    await m.reflect()
    insights = await m.insights()

    # Memory management
    await m.dedup()
    await m.merge("source", "target")
    await m.archive_fact("Entity", "old fact")

    # Triggers
    triggers = await m.get_triggers()

    # Webhooks
    await m.create_webhook(url="https://example.com/hook")
    hooks = await m.get_webhooks()

    # Jobs
    status = await m.job_status("job-abc")
    result = await m.wait_for_job("job-abc")

    # Close when done
    await m.close()

asyncio.run(main())

Available Methods

MethodDescription
add(messages, ...)Add memories from conversation
add_text(text, ...)Add memories from plain text
add_file(file_path, ...)Upload file (PDF, DOCX, TXT, MD)
search(query, ...)Semantic search
search_all(query, ...)Unified search (all 3 types)
get_all(...)List all entities
get(name, ...)Get specific entity
delete(name, ...)Delete entity
stats(...)Usage statistics
get_profile(...)Cognitive Profile
episodes(...)Episodic memories
procedures(...)Procedural memories
procedure_feedback(id, ...)Record success/failure
graph(...)Knowledge graph
timeline(...)Temporal search
run_agents(...)Run memory agents
reflect(...)Trigger reflection
insights(...)Get AI insights
dedup(...)Deduplicate entities
merge(src, target, ...)Merge entities
archive_fact(entity, fact, ...)Archive a fact
get_triggers(...)Get smart triggers
create_webhook(url, ...)Create webhook
get_webhooks()List webhooks
job_status(id)Check job status
wait_for_job(id, ...)Poll until job completes

Retry & Error Handling

The async client automatically retries on transient errors (429, 502, 503, 504) and network failures, with exponential backoff up to 3 attempts. 402 (quota exceeded) is never retried — it raises QuotaExceededError immediately.
from mengram import AsyncMengram
from mengram.cloud.async_client import QuotaExceededError

async def safe_add(m, messages):
    try:
        return await m.add(messages)
    except QuotaExceededError as e:
        print(f"Quota hit: {e.action} {e.current}/{e.limit} ({e.plan})")
        # e.action = "add" | "search" | "add_file"
        # e.plan = "free" | "starter" | "pro"