Skip to main content

Installation

pip install mengram-ai
For async support:
pip install mengram-ai[async]

Initialize

from mengram import Mengram

# Pass API key directly
m = Mengram(api_key="om-your-key")

# Or use environment variable
# export MENGRAM_API_KEY=om-your-key
m = Mengram()

Core Methods

add(messages, …)

Add memories from a conversation. Automatically extracts entities, facts, episodes, and procedures.
result = m.add([
    {"role": "user", "content": "We fixed the OOM with Redis cache"},
    {"role": "assistant", "content": "Noted the Redis cache fix."},
])
# Returns: {"status": "accepted", "job_id": "job-..."}
ParameterTypeDefaultDescription
messageslist[dict]requiredChat messages with role and content
user_idstr"default"User identifier for multi-user isolation
agent_idstrNoneAgent identifier
run_idstrNoneSession/run identifier
app_idstrNoneApplication identifier
expiration_datestrNoneISO datetime — facts auto-expire
sourcestrNoneProvenance source (e.g. "discord", "slack", "email")
metadatadictNoneArbitrary provenance metadata
agent_modeboolFalseExtract from all speakers (user + assistant). Auto-enabled when agent_id is set.
# With provenance tracking
m.add(messages, source="slack", metadata={"channel": "#engineering"})

# Agent mode — extract from both user and assistant messages
m.add(messages, agent_mode=True)

add_text(text, …)

Add memories from plain text instead of chat messages.
m.add_text("Meeting notes: decided to migrate to PostgreSQL 16")

# With provenance
m.add_text("Bug fix deployed", source="discord", metadata={"server": "dev-team"})
Same parameters as add() except text instead of messages.

add_file(file_path, …)

Upload a file and extract structured memories. Supports PDF (vision AI extraction), DOCX, TXT, and MD. Each page/chunk counts as 1 add from your quota. Returns immediately — processing happens in background.
result = m.add_file("meeting-notes.pdf")
# {"status": "accepted", "job_id": "job-...", "file_type": "pdf", "page_count": 12, "quota_used": 12}

# Poll for completion
m.wait_for_job(result["job_id"])
ParameterTypeDefaultDescription
file_pathstrrequiredPath to file (.pdf, .docx, .txt, .md)
user_idstr"default"User identifier
agent_idstrNoneAgent identifier
run_idstrNoneSession/run identifier
app_idstrNoneApplication identifier
File size limits: Free 10MB, Pro 50MB, Business 100MB.

search(query, …)

Semantic search across the knowledge graph with re-ranking.
results = m.search("database preferences", limit=10)
for r in results:
    print(f"{r['entity']} — score: {r['score']:.2f}")
    for fact in r.get("facts", []):
        print(f"  - {fact}")
ParameterTypeDefaultDescription
querystrrequiredNatural language search query
limitint5Max results
graph_depthint2Knowledge graph traversal depth
user_idstr"default"User to search
agent_idstrNoneFilter by agent
run_idstrNoneFilter by run
app_idstrNoneFilter by app
filtersdictNoneMetadata filters

search_all(query, …)

Unified search across all 3 memory types.
results = m.search_all("deployment")
print(results["semantic"])     # entities
print(results["episodic"])     # events
print(results["procedural"])   # workflows

get_all() / get_all_full()

memories = m.get_all()           # list all entities
full = m.get_all_full()          # with full facts, relations, knowledge

get(name) / delete(name)

entity = m.get("PostgreSQL")     # get specific entity
m.delete("PostgreSQL")           # delete entity

Cognitive Profile

get_profile(…)

Generate a Cognitive Profile — a ready-to-use system prompt summarizing a user.
profile = m.get_profile()
print(profile["system_prompt"])   # "You are talking to Ali..."
print(profile["facts_used"])      # number of facts used
ParameterTypeDefaultDescription
user_idstr"default"User to profile
forceboolFalseRegenerate (bypass cache)

rules(…)

Generate a CLAUDE.md, .cursorrules, or .windsurfrules file from memory.
result = m.rules(format="claude_md")
print(result["content"])   # structured rules file
ParameterTypeDefaultDescription
formatstr"claude_md"claude_md, cursorrules, or windsurf
forceboolFalseRegenerate
user_idstr"default"User

Episodic Memory

episodes(…)

Search or list episodic memories (events, experiences, interactions).
events = m.episodes(query="auth bug", limit=5)
recent = m.episodes(limit=20)
jan = m.episodes(after="2026-01-01", before="2026-02-01")
ParameterTypeDefaultDescription
querystrNoneSearch query (omit to list)
limitint20Max results
afterstrNoneISO datetime start
beforestrNoneISO datetime end
user_idstr"default"User to query

Procedural Memory

procedures(…)

Search or list learned workflows.
procs = m.procedures(query="deploy")
all_procs = m.procedures(limit=50)

procedure_feedback(id, …)

Report success/failure. On failure with context, triggers experience-driven evolution.
m.procedure_feedback(proc_id, success=True)
m.procedure_feedback(proc_id, success=False,
    context="Build OOM", failed_at_step=3)

procedure_history(id) / procedure_evolution(id)

history = m.procedure_history(proc_id)   # all versions
evolution = m.procedure_evolution(proc_id)  # what changed and why

Knowledge Graph & Timeline

graph(…)

Get the full knowledge graph — nodes and edges.
g = m.graph()
print(g["nodes"])  # entities
print(g["edges"])  # relationships

timeline(…)

Temporal search — facts within a time range.
facts = m.timeline(after="2026-01-01", before="2026-02-01")
for f in facts:
    print(f["created_at"], f["entity"], f["fact"])

feed(…)

Activity feed — recent memory changes.
feed = m.feed(limit=20)

stats(…)

Get usage statistics.
s = m.stats()

Memory Management

m.dedup()                       # find and merge duplicate entities
m.dedup_all()                   # deduplicate facts across all entities
m.dedup_entity("PostgreSQL")    # deduplicate facts on one entity
m.merge("src", "target")       # merge two entities
m.merge_user()                  # merge "User" entity into primary person
m.archive_fact("Entity", "old fact")  # archive a fact
m.fix_entity_type("React", "technology")  # fix entity type
m.reindex()                     # re-embed all entities

Insights & Reflections

m.reflect()                    # trigger reflection
insights = m.insights()        # get AI insights
refs = m.reflections(scope="cross")  # scope: entity, cross, temporal

Agents

m.run_agents()                 # run all agents
m.run_agents(agent="curator")  # run specific agent
history = m.agent_history(limit=5)  # run history
status = m.agent_status()      # which agents are due

Smart Triggers

triggers = m.get_triggers()                    # get triggers
m.process_triggers()                           # fire pending triggers
m.dismiss_trigger(trigger_id)                  # dismiss without firing
m.detect_triggers(target_user_id="alice")      # detect triggers for user

Webhooks

m.create_webhook(
    url="https://example.com/hook",
    event_types=["memory_add", "memory_update"],
    secret="hmac-secret",
)
hooks = m.get_webhooks()
m.update_webhook(webhook_id=1, active=False)
m.delete_webhook(webhook_id=1)

Teams

team = m.create_team("Engineering", description="Shared memory")
code = team["invite_code"]      # share this code

m.join_team("abc123")           # join via invite code
teams = m.get_teams()           # list your teams
members = m.team_members(team_id=1)

m.share_memory("PostgreSQL", team_id=1)    # share with team
m.unshare_memory("PostgreSQL", team_id=1)  # make personal again
m.leave_team(team_id=1)
m.delete_team(team_id=1)        # owner only

API Keys

keys = m.list_keys()
new_key = m.create_key(name="production")
m.rename_key(key_id="...", name="staging")
m.revoke_key(key_id="...")

Billing

billing = m.get_billing()             # plan, usage, quotas
checkout = m.create_checkout("pro")   # upgrade to Pro
portal = m.create_portal()           # manage subscription

Jobs

status = m.job_status("job-abc123")          # check status
result = m.wait_for_job("job-abc123",        # poll until done
    poll_interval=1.0, max_wait=60.0)

Import Data

# Import ChatGPT export
m.import_chatgpt("~/Downloads/chatgpt-export.zip")

# Import Obsidian vault
m.import_obsidian("~/Documents/MyVault")

# Import text/markdown files
m.import_files(["notes.md", "journal.txt"])
All import methods support user_id, chunk_size/chunk_chars, and on_progress callback.

Error Handling

QuotaExceededError

Raised when your monthly plan limit is reached (HTTP 402). Not retried.
from mengram import Mengram
from mengram.cloud.client import QuotaExceededError

m = Mengram()

try:
    m.add([{"role": "user", "content": "Hello"}])
except QuotaExceededError as e:
    print(e.action)   # "add" or "search"
    print(e.limit)    # 50
    print(e.current)  # 50
    print(e.plan)     # "free"
    print(e)          # "Quota exceeded for 'add': 50/50 (plan: free). Upgrade at ..."
AttributeTypeDescription
actionstrWhich action was blocked (add, search, add_file)
limitintMonthly limit for this action
currentintCurrent usage count
planstrCurrent plan (free, starter, pro)

Retry Behavior

The client automatically retries on transient errors (429, 502, 503, 504) with exponential backoff up to 3 attempts. 402 (quota exceeded) is never retried.

Quota Usage

After any API call, check your current quota usage via the .quota property:
m.search("test")
print(m.quota)
# {"add": {"used": 5, "limit": 30}, "search": {"used": 12, "limit": 100}}

# Check before hitting the wall
if m.quota.get("search", {}).get("used", 0) >= m.quota.get("search", {}).get("limit", 1) * 0.8:
    print("Warning: approaching search quota limit")
The quota is read from response headers (X-Quota-Add-Used, X-Quota-Add-Limit, X-Quota-Search-Used, X-Quota-Search-Limit) and reflects usage at the time of the last API call.