diff --git a/.env.example b/.env.example index e53ce88..bf8ae92 100644 --- a/.env.example +++ b/.env.example @@ -26,6 +26,10 @@ OPENBRAIN__QUERY__TEXT_WEIGHT=0.4 # VECTOR_WEIGHT=0.6 # TEXT_WEIGHT=0.4 +# TTL / transient facts +# Background cleanup interval in seconds. Set to 0 to disable automatic cleanup. +OPENBRAIN__TTL__CLEANUP_INTERVAL_SECONDS=300 + # Authentication (optional) OPENBRAIN__AUTH__ENABLED=false # Comma-separated list of persistent API keys diff --git a/README.md b/README.md index d876f03..0548ca3 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ OpenBrain is a Model Context Protocol (MCP) server that provides AI agents with | Tool | Description | |------|-------------| -| `store` | Store a memory with automatic embedding generation and keyword extraction | +| `store` | Store a memory with automatic embedding generation and optional TTL for transient facts | | `batch_store` | Store 1-50 memories atomically in a single call | | `query` | Search memories by semantic similarity | | `purge` | Delete memories by agent ID or time range | @@ -86,6 +86,30 @@ OPENBRAIN_E2E_BASE_URL=https://ob.ingwaz.work OPENBRAIN__AUTH__ENABLED=true ``` +### TTL / Expiry + +Transient facts can be stored with an optional `ttl` string on `store`, or on +either the batch itself or individual entries for `batch_store`. + +Supported units: + +- `s` seconds +- `m` minutes +- `h` hours +- `d` days +- `w` weeks + +Examples: + +- `30s` +- `15m` +- `1h` +- `7d` + +Expired memories are filtered from `query` immediately, even before the +background cleanup loop deletes them physically. The cleanup interval is +configured with `OPENBRAIN__TTL__CLEANUP_INTERVAL_SECONDS` and defaults to 300. + The CI workflow uses this remote mode after `main` deploys so e2e coverage validates the VPS deployment rather than the local runner host. It now generates a random per-run e2e key, temporarily appends it to the deployed `OPENBRAIN__AUTH__API_KEYS`, runs the suite, then removes the key and restarts the service. For live deployments, keep `OPENBRAIN__AUTH__API_KEYS` for persistent non-test access only. The server accepts a comma-separated key list, so a practical split is: @@ -190,6 +214,7 @@ client runtime supports streamable HTTP. Codex should use `/mcp`. "arguments": { "content": "The user prefers dark mode and uses vim keybindings", "agent_id": "assistant-1", + "ttl": "7d", "metadata": {"source": "preferences"} } } @@ -229,6 +254,7 @@ client runtime supports streamable HTTP. Codex should use `/mcp`. "entries": [ { "content": "The user prefers dark mode", + "ttl": "24h", "metadata": {"category": "preference"} }, { diff --git a/migrations/V3__transient_fact_ttl.sql b/migrations/V3__transient_fact_ttl.sql new file mode 100644 index 0000000..f97b432 --- /dev/null +++ b/migrations/V3__transient_fact_ttl.sql @@ -0,0 +1,6 @@ +ALTER TABLE memories + ADD COLUMN IF NOT EXISTS expires_at TIMESTAMPTZ; + +CREATE INDEX IF NOT EXISTS idx_memories_expires_at + ON memories (expires_at) + WHERE expires_at IS NOT NULL; diff --git a/src/config.rs b/src/config.rs index f73a685..b24e2c3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,6 +12,7 @@ pub struct Config { pub database: DatabaseConfig, pub embedding: EmbeddingConfig, pub query: QueryConfig, + pub ttl: TtlConfig, pub auth: AuthConfig, } @@ -55,6 +56,13 @@ pub struct QueryConfig { pub text_weight: f32, } +/// TTL / expiry configuration +#[derive(Debug, Clone, Deserialize)] +pub struct TtlConfig { + #[serde(default = "default_cleanup_interval_seconds")] + pub cleanup_interval_seconds: u64, +} + /// Authentication configuration #[derive(Debug, Clone, Deserialize)] pub struct AuthConfig { @@ -98,6 +106,7 @@ fn default_model_path() -> String { "models/all-MiniLM-L6-v2".to_string() } fn default_embedding_dim() -> usize { 384 } fn default_vector_weight() -> f32 { 0.6 } fn default_text_weight() -> f32 { 0.4 } +fn default_cleanup_interval_seconds() -> u64 { 300 } fn default_auth_enabled() -> bool { false } impl Config { @@ -119,6 +128,11 @@ impl Config { // Query settings .set_default("query.vector_weight", default_vector_weight() as f64)? .set_default("query.text_weight", default_text_weight() as f64)? + // TTL settings + .set_default( + "ttl.cleanup_interval_seconds", + default_cleanup_interval_seconds() as i64, + )? // Auth settings .set_default("auth.enabled", default_auth_enabled())? // Load from environment with OPENBRAIN_ prefix @@ -170,6 +184,9 @@ impl Default for Config { vector_weight: default_vector_weight(), text_weight: default_text_weight(), }, + ttl: TtlConfig { + cleanup_interval_seconds: default_cleanup_interval_seconds(), + }, auth: AuthConfig { enabled: default_auth_enabled(), api_keys: Vec::new(), diff --git a/src/db.rs b/src/db.rs index 3ae7f5e..fac048e 100644 --- a/src/db.rs +++ b/src/db.rs @@ -29,6 +29,7 @@ pub struct MemoryRecord { pub keywords: Vec, pub metadata: serde_json::Value, pub created_at: chrono::DateTime, + pub expires_at: Option>, } /// Query result with similarity score @@ -75,6 +76,7 @@ impl Database { embedding: &[f32], keywords: &[String], metadata: serde_json::Value, + expires_at: Option>, ) -> Result { let client = self.pool.get().await?; let id = Uuid::new_v4(); @@ -83,10 +85,10 @@ impl Database { client .execute( r#" - INSERT INTO memories (id, agent_id, content, embedding, keywords, metadata) - VALUES ($1, $2, $3, $4, $5, $6) + INSERT INTO memories (id, agent_id, content, embedding, keywords, metadata, expires_at) + VALUES ($1, $2, $3, $4, $5, $6, $7) "#, - &[&id, &agent_id, &content, &vector, &keywords, &metadata], + &[&id, &agent_id, &content, &vector, &keywords, &metadata, &expires_at], ) .await .context("Failed to store memory")?; @@ -123,6 +125,7 @@ impl Database { keywords, metadata, created_at, + expires_at, (1 - (embedding <=> $1))::real AS vector_score, CASE WHEN search_query.query_text IS NULL THEN 0::real @@ -133,6 +136,7 @@ impl Database { FROM memories CROSS JOIN search_query WHERE memories.agent_id = $3 + AND (memories.expires_at IS NULL OR memories.expires_at > NOW()) ), ranked AS ( SELECT @@ -147,6 +151,7 @@ impl Database { keywords, metadata, created_at, + expires_at, vector_score, text_score, CASE @@ -184,6 +189,7 @@ impl Database { keywords: row.get("keywords"), metadata: row.get("metadata"), created_at: row.get("created_at"), + expires_at: row.get("expires_at"), }, similarity: row.get("hybrid_score"), vector_score: row.get("vector_score"), @@ -224,12 +230,25 @@ impl Database { let client = self.pool.get().await?; let row = client .query_one( - "SELECT COUNT(*) as count FROM memories WHERE agent_id = $1", + "SELECT COUNT(*) as count FROM memories WHERE agent_id = $1 AND (expires_at IS NULL OR expires_at > NOW())", &[&agent_id], ) .await?; Ok(row.get("count")) } + + /// Delete expired memories across all agents + pub async fn cleanup_expired_memories(&self) -> Result { + let client = self.pool.get().await?; + let deleted = client + .execute( + "DELETE FROM memories WHERE expires_at IS NOT NULL AND expires_at <= NOW()", + &[], + ) + .await + .context("Failed to cleanup expired memories")?; + Ok(deleted) + } } @@ -238,6 +257,7 @@ impl Database { pub struct BatchStoreResult { pub id: String, pub status: String, + pub expires_at: Option, } impl Database { @@ -245,20 +265,30 @@ impl Database { pub async fn batch_store_memories( &self, agent_id: &str, - entries: Vec<(String, Value, Vec, Vec)>, + entries: Vec<( + String, + Value, + Vec, + Vec, + Option>, + )>, ) -> Result> { let mut client = self.pool.get().await?; let transaction = client.transaction().await?; let mut results = Vec::with_capacity(entries.len()); - for (content, metadata, embedding, keywords) in entries { + for (content, metadata, embedding, keywords, expires_at) in entries { let id = Uuid::new_v4(); let vector = Vector::from(embedding); transaction.execute( - r#"INSERT INTO memories (id, agent_id, content, embedding, keywords, metadata) VALUES ($1, $2, $3, $4, $5, $6)"#, - &[&id, &agent_id, &content, &vector, &keywords, &metadata], + r#"INSERT INTO memories (id, agent_id, content, embedding, keywords, metadata, expires_at) VALUES ($1, $2, $3, $4, $5, $6, $7)"#, + &[&id, &agent_id, &content, &vector, &keywords, &metadata, &expires_at], ).await?; - results.push(BatchStoreResult { id: id.to_string(), status: "stored".to_string() }); + results.push(BatchStoreResult { + id: id.to_string(), + status: "stored".to_string(), + expires_at: expires_at.map(|ts| ts.to_rfc3339()), + }); } transaction.commit().await?; Ok(results) diff --git a/src/lib.rs b/src/lib.rs index 808f9c6..c8a77ea 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ pub mod config; pub mod db; pub mod embedding; pub mod migrations; +pub mod ttl; pub mod tools; pub mod transport; @@ -115,6 +116,30 @@ pub async fn run_server(config: Config, db: Database) -> Result<()> { } }); + if config.ttl.cleanup_interval_seconds > 0 { + let cleanup_state = state.clone(); + let cleanup_interval_seconds = config.ttl.cleanup_interval_seconds; + tokio::spawn(async move { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs( + cleanup_interval_seconds, + )); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + match cleanup_state.db.cleanup_expired_memories().await { + Ok(deleted) if deleted > 0 => { + info!("Cleaned up {} expired memories", deleted); + } + Ok(_) => {} + Err(err) => { + error!("Failed to cleanup expired memories: {:?}", err); + } + } + } + }); + } + // Create MCP state for SSE transport let mcp_state = McpState::new(state.clone()); diff --git a/src/tools/batch_store.rs b/src/tools/batch_store.rs index cb190b5..c2b0578 100644 --- a/src/tools/batch_store.rs +++ b/src/tools/batch_store.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use tracing::info; use crate::embedding::extract_keywords; +use crate::ttl::expires_at_from_ttl; use crate::AppState; /// Maximum number of entries allowed per batch store call @@ -18,6 +19,7 @@ const MAX_BATCH_SIZE: usize = 50; /// /// Accepts: /// - `agent_id`: Optional agent identifier (defaults to "default") +/// - `ttl`: Optional default TTL string applied to entries without their own ttl /// - `entries`: Array of 1-50 entries, each with `content` (required) and `metadata` (optional) /// /// Returns: @@ -41,6 +43,7 @@ pub async fn execute(state: &Arc, arguments: Value) -> Result .get("entries") .and_then(|v| v.as_array()) .context("Missing required parameter: entries")?; + let default_ttl = arguments.get("ttl").and_then(|v| v.as_str()); // 3. Validate batch size if entries.is_empty() { @@ -79,6 +82,12 @@ pub async fn execute(state: &Arc, arguments: Value) -> Result .get("metadata") .cloned() .unwrap_or(serde_json::json!({})); + let ttl = entry + .get("ttl") + .and_then(|v| v.as_str()) + .or(default_ttl); + let expires_at = expires_at_from_ttl(ttl) + .with_context(|| format!("Invalid ttl for entry at index {}", idx))?; // Generate embedding for this entry let embedding = embedding_engine @@ -88,7 +97,7 @@ pub async fn execute(state: &Arc, arguments: Value) -> Result // Extract keywords let keywords = extract_keywords(content, 10); - processed_entries.push((content.to_string(), metadata, embedding, keywords)); + processed_entries.push((content.to_string(), metadata, embedding, keywords, expires_at)); } // 5. Batch DB insert (single transaction for atomicity) diff --git a/src/tools/mod.rs b/src/tools/mod.rs index c88733f..73e00da 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -30,6 +30,10 @@ pub fn get_tool_definitions() -> Vec { "metadata": { "type": "object", "description": "Optional metadata to attach to the memory" + }, + "ttl": { + "type": "string", + "description": "Optional time-to-live for transient facts, like 30s, 15m, 1h, 7d, or 2w" } }, "required": ["content"] @@ -45,6 +49,10 @@ pub fn get_tool_definitions() -> Vec { "type": "string", "description": "Unique identifier for the agent storing the memories (default: 'default')" }, + "ttl": { + "type": "string", + "description": "Optional default time-to-live applied to entries without their own ttl" + }, "entries": { "type": "array", "description": "Array of 1-50 memory entries to store atomically", @@ -58,6 +66,10 @@ pub fn get_tool_definitions() -> Vec { "metadata": { "type": "object", "description": "Optional metadata to attach to the memory" + }, + "ttl": { + "type": "string", + "description": "Optional per-entry time-to-live override like 30s, 15m, 1h, 7d, or 2w" } }, "required": ["content"] diff --git a/src/tools/query.rs b/src/tools/query.rs index 5eb8a66..609e888 100644 --- a/src/tools/query.rs +++ b/src/tools/query.rs @@ -81,7 +81,8 @@ pub async fn execute(state: &Arc, arguments: Value) -> Result "hybrid_score": m.hybrid_score, "keywords": m.record.keywords, "metadata": m.record.metadata, - "created_at": m.record.created_at.to_rfc3339() + "created_at": m.record.created_at.to_rfc3339(), + "expires_at": m.record.expires_at.as_ref().map(|ts| ts.to_rfc3339()) }) }) .collect(); diff --git a/src/tools/store.rs b/src/tools/store.rs index 2b8f5c5..520b3ed 100644 --- a/src/tools/store.rs +++ b/src/tools/store.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use tracing::info; use crate::embedding::extract_keywords; +use crate::ttl::expires_at_from_ttl; use crate::AppState; /// Execute the store tool @@ -32,6 +33,9 @@ pub async fn execute(state: &Arc, arguments: Value) -> Result .cloned() .unwrap_or(serde_json::json!({})); + let ttl = arguments.get("ttl").and_then(|v| v.as_str()); + let expires_at = expires_at_from_ttl(ttl).context("Invalid ttl")?; + info!( "Storing memory for agent '{}': {} chars", agent_id, @@ -49,7 +53,14 @@ pub async fn execute(state: &Arc, arguments: Value) -> Result // Store in database let id = state .db - .store_memory(agent_id, content, &embedding, &keywords, metadata) + .store_memory( + agent_id, + content, + &embedding, + &keywords, + metadata, + expires_at.clone(), + ) .await .context("Failed to store memory")?; @@ -60,7 +71,9 @@ pub async fn execute(state: &Arc, arguments: Value) -> Result "id": id.to_string(), "agent_id": agent_id, "keywords": keywords, - "embedding_dimension": embedding.len() + "embedding_dimension": embedding.len(), + "ttl": ttl, + "expires_at": expires_at.as_ref().map(|ts| ts.to_rfc3339()) }) .to_string()) } diff --git a/src/ttl.rs b/src/ttl.rs new file mode 100644 index 0000000..24a3146 --- /dev/null +++ b/src/ttl.rs @@ -0,0 +1,49 @@ +use anyhow::{Result, anyhow}; +use chrono::{DateTime, Duration, Utc}; + +pub fn parse_ttl_spec(ttl: &str) -> Result { + let ttl = ttl.trim(); + if ttl.is_empty() { + return Err(anyhow!("ttl must not be empty")); + } + + let (value, multiplier_seconds) = match ttl.chars().last() { + Some('s') | Some('S') => (&ttl[..ttl.len() - 1], 1i64), + Some('m') | Some('M') => (&ttl[..ttl.len() - 1], 60i64), + Some('h') | Some('H') => (&ttl[..ttl.len() - 1], 60i64 * 60), + Some('d') | Some('D') => (&ttl[..ttl.len() - 1], 60i64 * 60 * 24), + Some('w') | Some('W') => (&ttl[..ttl.len() - 1], 60i64 * 60 * 24 * 7), + _ => { + return Err(anyhow!( + "invalid ttl '{ttl}'. Use a positive duration like 30s, 15m, 1h, 7d, or 2w" + )); + } + }; + + let value: i64 = value + .trim() + .parse() + .map_err(|_| anyhow!("invalid ttl '{ttl}'. Duration value must be a positive integer"))?; + if value <= 0 { + return Err(anyhow!("invalid ttl '{ttl}'. Duration value must be greater than zero")); + } + + let total_seconds = value + .checked_mul(multiplier_seconds) + .ok_or_else(|| anyhow!("invalid ttl '{ttl}'. Duration is too large"))?; + + Ok(Duration::seconds(total_seconds)) +} + +pub fn expires_at_from_ttl(ttl: Option<&str>) -> Result>> { + match ttl { + Some(ttl) => { + let duration = parse_ttl_spec(ttl)?; + Utc::now() + .checked_add_signed(duration) + .map(Some) + .ok_or_else(|| anyhow!("ttl '{ttl}' overflows supported timestamp range")) + } + None => Ok(None), + } +} diff --git a/tests/e2e_mcp.rs b/tests/e2e_mcp.rs index 3d2b178..f31a8d0 100644 --- a/tests/e2e_mcp.rs +++ b/tests/e2e_mcp.rs @@ -73,11 +73,45 @@ Install pgvector for your active PostgreSQL major version, then run: CREATE EXTE embedding vector(384) NOT NULL, keywords TEXT[] DEFAULT '{}', metadata JSONB DEFAULT '{}', - created_at TIMESTAMPTZ DEFAULT NOW() + created_at TIMESTAMPTZ DEFAULT NOW(), + expires_at TIMESTAMPTZ ); + ALTER TABLE memories ADD COLUMN IF NOT EXISTS expires_at TIMESTAMPTZ; + ALTER TABLE memories ADD COLUMN IF NOT EXISTS tsv tsvector; + CREATE OR REPLACE FUNCTION memories_tsv_trigger() + RETURNS trigger + LANGUAGE plpgsql + AS $$ + BEGIN + NEW.tsv := + setweight(to_tsvector('pg_catalog.english', COALESCE(NEW.content, '')), 'A') || + setweight( + to_tsvector('pg_catalog.english', COALESCE(array_to_string(NEW.keywords, ' '), '')), + 'B' + ); + RETURN NEW; + END; + $$; + UPDATE memories + SET tsv = + setweight(to_tsvector('pg_catalog.english', COALESCE(content, '')), 'A') || + setweight( + to_tsvector('pg_catalog.english', COALESCE(array_to_string(keywords, ' '), '')), + 'B' + ) + WHERE tsv IS NULL; + DROP TRIGGER IF EXISTS memories_tsv_update ON memories; + CREATE TRIGGER memories_tsv_update + BEFORE INSERT OR UPDATE OF content, keywords ON memories + FOR EACH ROW + EXECUTE FUNCTION memories_tsv_trigger(); CREATE INDEX IF NOT EXISTS idx_memories_agent ON memories(agent_id); CREATE INDEX IF NOT EXISTS idx_memories_embedding ON memories USING hnsw (embedding vector_cosine_ops); + CREATE INDEX IF NOT EXISTS idx_memories_tsv ON memories + USING GIN (tsv); + CREATE INDEX IF NOT EXISTS idx_memories_expires_at ON memories (expires_at) + WHERE expires_at IS NOT NULL; "#, ) .await