Documentation Index
Fetch the complete documentation index at: https://docs.mengram.io/llms.txt
Use this file to discover all available pages before exploring further.
Installation
For async support:
pip install mengram-ai[async]
Initialize
from mengram import Mengram
# Pass API key directly
m = Mengram(api_key="om-your-key")
# Or use environment variable
# export MENGRAM_API_KEY=om-your-key
m = Mengram()
Core Methods
add(messages, …)
Add memories from a conversation. Automatically extracts entities, facts, episodes, and procedures.
result = m.add([
{"role": "user", "content": "We fixed the OOM with Redis cache"},
{"role": "assistant", "content": "Noted the Redis cache fix."},
])
# Returns: {"status": "accepted", "job_id": "job-..."}
| Parameter | Type | Default | Description |
|---|
messages | list[dict] | required | Chat messages with role and content |
user_id | str | "default" | User identifier for multi-user isolation |
agent_id | str | None | Agent identifier |
run_id | str | None | Session/run identifier |
app_id | str | None | Application identifier |
expiration_date | str | None | ISO datetime — facts auto-expire |
source | str | None | Provenance source (e.g. "discord", "slack", "email") |
metadata | dict | None | Arbitrary provenance metadata |
agent_mode | bool | False | Extract from all speakers (user + assistant). Auto-enabled when agent_id is set. |
# With provenance tracking
m.add(messages, source="slack", metadata={"channel": "#engineering"})
# Agent mode — extract from both user and assistant messages
m.add(messages, agent_mode=True)
add_text(text, …)
Add memories from plain text instead of chat messages.
m.add_text("Meeting notes: decided to migrate to PostgreSQL 16")
# With provenance
m.add_text("Bug fix deployed", source="discord", metadata={"server": "dev-team"})
Same parameters as add() except text instead of messages.
add_file(file_path, …)
Upload a file and extract structured memories. Supports PDF (vision AI extraction), DOCX, TXT, and MD. Each page/chunk counts as 1 add from your quota. Returns immediately — processing happens in background.
result = m.add_file("meeting-notes.pdf")
# {"status": "accepted", "job_id": "job-...", "file_type": "pdf", "page_count": 12, "quota_used": 12}
# Poll for completion
m.wait_for_job(result["job_id"])
| Parameter | Type | Default | Description |
|---|
file_path | str | required | Path to file (.pdf, .docx, .txt, .md) |
user_id | str | "default" | User identifier |
agent_id | str | None | Agent identifier |
run_id | str | None | Session/run identifier |
app_id | str | None | Application identifier |
File size limits: Free 10MB, Pro 50MB, Business 100MB.
search(query, …)
Semantic search across the knowledge graph with re-ranking.
results = m.search("database preferences", limit=10)
for r in results:
print(f"{r['entity']} — score: {r['score']:.2f}")
for fact in r.get("facts", []):
print(f" - {fact}")
| Parameter | Type | Default | Description |
|---|
query | str | required | Natural language search query |
limit | int | 5 | Max results |
graph_depth | int | 2 | Knowledge graph traversal depth |
user_id | str | "default" | User to search |
agent_id | str | None | Filter by agent |
run_id | str | None | Filter by run |
app_id | str | None | Filter by app |
filters | dict | None | Metadata filters |
search_all(query, …)
Unified search across all 3 memory types.
results = m.search_all("deployment")
print(results["semantic"]) # entities
print(results["episodic"]) # events
print(results["procedural"]) # workflows
get_all() / get_all_full()
memories = m.get_all() # list all entities
full = m.get_all_full() # with full facts, relations, knowledge
get(name) / delete(name)
entity = m.get("PostgreSQL") # get specific entity
m.delete("PostgreSQL") # delete entity
Cognitive Profile
get_profile(…)
Generate a Cognitive Profile — a ready-to-use system prompt summarizing a user.
profile = m.get_profile()
print(profile["system_prompt"]) # "You are talking to Ali..."
print(profile["facts_used"]) # number of facts used
| Parameter | Type | Default | Description |
|---|
user_id | str | "default" | User to profile |
force | bool | False | Regenerate (bypass cache) |
rules(…)
Generate a CLAUDE.md, .cursorrules, or .windsurfrules file from memory.
result = m.rules(format="claude_md")
print(result["content"]) # structured rules file
| Parameter | Type | Default | Description |
|---|
format | str | "claude_md" | claude_md, cursorrules, or windsurf |
force | bool | False | Regenerate |
user_id | str | "default" | User |
Episodic Memory
episodes(…)
Search or list episodic memories (events, experiences, interactions).
events = m.episodes(query="auth bug", limit=5)
recent = m.episodes(limit=20)
jan = m.episodes(after="2026-01-01", before="2026-02-01")
| Parameter | Type | Default | Description |
|---|
query | str | None | Search query (omit to list) |
limit | int | 20 | Max results |
after | str | None | ISO datetime start |
before | str | None | ISO datetime end |
user_id | str | "default" | User to query |
Procedural Memory
procedures(…)
Search or list learned workflows.
procs = m.procedures(query="deploy")
all_procs = m.procedures(limit=50)
procedure_feedback(id, …)
Report success/failure. On failure with context, triggers experience-driven evolution.
m.procedure_feedback(proc_id, success=True)
m.procedure_feedback(proc_id, success=False,
context="Build OOM", failed_at_step=3)
procedure_history(id) / procedure_evolution(id)
history = m.procedure_history(proc_id) # all versions
evolution = m.procedure_evolution(proc_id) # what changed and why
Knowledge Graph & Timeline
graph(…)
Get the full knowledge graph — nodes and edges.
g = m.graph()
print(g["nodes"]) # entities
print(g["edges"]) # relationships
timeline(…)
Temporal search — facts within a time range.
facts = m.timeline(after="2026-01-01", before="2026-02-01")
for f in facts:
print(f["created_at"], f["entity"], f["fact"])
feed(…)
Activity feed — recent memory changes.
stats(…)
Get usage statistics.
Memory Management
m.dedup() # find and merge duplicate entities
m.dedup_all() # deduplicate facts across all entities
m.dedup_entity("PostgreSQL") # deduplicate facts on one entity
m.merge("src", "target") # merge two entities
m.merge_user() # merge "User" entity into primary person
m.archive_fact("Entity", "old fact") # archive a fact
m.fix_entity_type("React", "technology") # fix entity type
m.reindex() # re-embed all entities
Insights & Reflections
m.reflect() # trigger reflection
insights = m.insights() # get AI insights
refs = m.reflections(scope="cross") # scope: entity, cross, temporal
Agents
m.run_agents() # run all agents
m.run_agents(agent="curator") # run specific agent
history = m.agent_history(limit=5) # run history
status = m.agent_status() # which agents are due
Smart Triggers
triggers = m.get_triggers() # get triggers
m.process_triggers() # fire pending triggers
m.dismiss_trigger(trigger_id) # dismiss without firing
m.detect_triggers(target_user_id="alice") # detect triggers for user
Webhooks
m.create_webhook(
url="https://example.com/hook",
event_types=["memory_add", "memory_update"],
secret="hmac-secret",
)
hooks = m.get_webhooks()
m.update_webhook(webhook_id=1, active=False)
m.delete_webhook(webhook_id=1)
Teams
team = m.create_team("Engineering", description="Shared memory")
code = team["invite_code"] # share this code
m.join_team("abc123") # join via invite code
teams = m.get_teams() # list your teams
members = m.team_members(team_id=1)
m.share_memory("PostgreSQL", team_id=1) # share with team
m.unshare_memory("PostgreSQL", team_id=1) # make personal again
m.leave_team(team_id=1)
m.delete_team(team_id=1) # owner only
API Keys
keys = m.list_keys()
new_key = m.create_key(name="production")
m.rename_key(key_id="...", name="staging")
m.revoke_key(key_id="...")
Billing
billing = m.get_billing() # plan, usage, quotas
checkout = m.create_checkout("pro") # upgrade to Pro
portal = m.create_portal() # manage subscription
Jobs
status = m.job_status("job-abc123") # check status
result = m.wait_for_job("job-abc123", # poll until done
poll_interval=1.0, max_wait=60.0)
Import Data
# Import ChatGPT export
m.import_chatgpt("~/Downloads/chatgpt-export.zip")
# Import Obsidian vault
m.import_obsidian("~/Documents/MyVault")
# Import text/markdown files
m.import_files(["notes.md", "journal.txt"])
All import methods support user_id, chunk_size/chunk_chars, and on_progress callback.
Error Handling
QuotaExceededError
Raised when your monthly plan limit is reached (HTTP 402). Not retried.
from mengram import Mengram
from mengram.cloud.client import QuotaExceededError
m = Mengram()
try:
m.add([{"role": "user", "content": "Hello"}])
except QuotaExceededError as e:
print(e.action) # "add" or "search"
print(e.limit) # 50
print(e.current) # 50
print(e.plan) # "free"
print(e) # "Quota exceeded for 'add': 50/50 (plan: free). Upgrade at ..."
| Attribute | Type | Description |
|---|
action | str | Which action was blocked (add, search, add_file) |
limit | int | Monthly limit for this action |
current | int | Current usage count |
plan | str | Current plan (free, starter, pro) |
Retry Behavior
The client automatically retries on transient errors (429, 502, 503, 504) with exponential backoff up to 3 attempts. 402 (quota exceeded) is never retried.
Quota Usage
After any API call, check your current quota usage via the .quota property:
m.search("test")
print(m.quota)
# {"add": {"used": 5, "limit": 30}, "search": {"used": 12, "limit": 100}}
# Check before hitting the wall
if m.quota.get("search", {}).get("used", 0) >= m.quota.get("search", {}).get("limit", 1) * 0.8:
print("Warning: approaching search quota limit")
The quota is read from response headers (X-Quota-Add-Used, X-Quota-Add-Limit, X-Quota-Search-Used, X-Quota-Search-Limit) and reflects usage at the time of the last API call.