From 61d6448b44681f5646d05c50b189b7abf3414682 Mon Sep 17 00:00:00 2001 From: Agent Zero Date: Tue, 24 Mar 2026 05:40:30 +0000 Subject: [PATCH] Add server-side deduplication on ingest --- .env.example | 5 + README.md | 30 +++++- src/config.rs | 19 ++++ src/db.rs | 153 ++++++++++++++++++++++++--- src/tools/batch_store.rs | 2 +- src/tools/mod.rs | 4 +- src/tools/store.rs | 12 ++- tests/e2e_mcp.rs | 219 +++++++++++++++++++++++++++++++++++++++ 8 files changed, 421 insertions(+), 23 deletions(-) diff --git a/.env.example b/.env.example index bf8ae92..09364df 100644 --- a/.env.example +++ b/.env.example @@ -26,6 +26,11 @@ OPENBRAIN__QUERY__TEXT_WEIGHT=0.4 # VECTOR_WEIGHT=0.6 # TEXT_WEIGHT=0.4 +# Ingest deduplication +OPENBRAIN__DEDUP__THRESHOLD=0.90 +# Backward-compatible plain env alias +# DEDUP_THRESHOLD=0.90 + # TTL / transient facts # Background cleanup interval in seconds. Set to 0 to disable automatic cleanup. OPENBRAIN__TTL__CLEANUP_INTERVAL_SECONDS=300 diff --git a/README.md b/README.md index 0548ca3..cd6c005 100644 --- a/README.md +++ b/README.md @@ -11,14 +11,15 @@ OpenBrain is a Model Context Protocol (MCP) server that provides AI agents with - 🐘 **PostgreSQL + pgvector**: Production-grade vector storage with HNSW indexing - 🔌 **MCP Protocol**: Streamable HTTP plus legacy HTTP+SSE compatibility - 🔐 **Multi-Agent Support**: Isolated memory namespaces per agent +- ♻️ **Deduplicated Ingest**: Near-duplicate facts are merged instead of stored repeatedly - ⚡ **High Performance**: Rust implementation with async I/O ## MCP Tools | Tool | Description | |------|-------------| -| `store` | Store a memory with automatic embedding generation and optional TTL for transient facts | -| `batch_store` | Store 1-50 memories atomically in a single call | +| `store` | Store a memory with automatic embedding generation, optional TTL, and automatic deduplication | +| `batch_store` | Store 1-50 memories atomically in a single call with the same deduplication rules | | `query` | Search memories by semantic similarity | | `purge` | Delete memories by agent ID or time range | @@ -123,6 +124,31 @@ In Gitea Actions, that means: If you want prod e2e coverage without leaving a standing CI key on the server, the workflow-generated ephemeral key handles that automatically. +### Deduplication on Ingest + +OpenBrain checks every `store` and `batch_store` write for an existing memory in +the same `agent_id` namespace whose vector similarity meets the configured +dedup threshold. + +Default behavior: + +- deduplication is always on +- only same-agent memories are considered +- expired memories are ignored +- if a duplicate is found, the existing memory is refreshed instead of inserting a new row +- metadata is merged with new keys overriding old values +- `created_at` is updated to `now()` +- `expires_at` is preserved unless the new write supplies a fresh TTL + +Configure the threshold with either: + +- `OPENBRAIN__DEDUP__THRESHOLD=0.90` +- `DEDUP_THRESHOLD=0.90` + +Tool responses expose whether a write deduplicated an existing row via the +`deduplicated` flag. `batch_store` also returns a `status` of either +`stored` or `deduplicated` per entry. + ## Agent Zero Developer Prompt For Agent Zero / A0, add the following section to the Developer agent role diff --git a/src/config.rs b/src/config.rs index b24e2c3..b444746 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,6 +12,7 @@ pub struct Config { pub database: DatabaseConfig, pub embedding: EmbeddingConfig, pub query: QueryConfig, + pub dedup: DedupConfig, pub ttl: TtlConfig, pub auth: AuthConfig, } @@ -56,6 +57,13 @@ pub struct QueryConfig { pub text_weight: f32, } +/// Deduplication configuration +#[derive(Debug, Clone, Deserialize)] +pub struct DedupConfig { + #[serde(default = "default_dedup_threshold")] + pub threshold: f32, +} + /// TTL / expiry configuration #[derive(Debug, Clone, Deserialize)] pub struct TtlConfig { @@ -106,6 +114,7 @@ fn default_model_path() -> String { "models/all-MiniLM-L6-v2".to_string() } fn default_embedding_dim() -> usize { 384 } fn default_vector_weight() -> f32 { 0.6 } fn default_text_weight() -> f32 { 0.4 } +fn default_dedup_threshold() -> f32 { 0.90 } fn default_cleanup_interval_seconds() -> u64 { 300 } fn default_auth_enabled() -> bool { false } @@ -128,6 +137,8 @@ impl Config { // Query settings .set_default("query.vector_weight", default_vector_weight() as f64)? .set_default("query.text_weight", default_text_weight() as f64)? + // Dedup settings + .set_default("dedup.threshold", default_dedup_threshold() as f64)? // TTL settings .set_default( "ttl.cleanup_interval_seconds", @@ -156,6 +167,11 @@ impl Config { config.query.text_weight = parsed; } } + if let Ok(dedup_threshold) = std::env::var("DEDUP_THRESHOLD") { + if let Ok(parsed) = dedup_threshold.parse::() { + config.dedup.threshold = parsed; + } + } Ok(config) } @@ -184,6 +200,9 @@ impl Default for Config { vector_weight: default_vector_weight(), text_weight: default_text_weight(), }, + dedup: DedupConfig { + threshold: default_dedup_threshold(), + }, ttl: TtlConfig { cleanup_interval_seconds: default_cleanup_interval_seconds(), }, diff --git a/src/db.rs b/src/db.rs index fac048e..c3f7c36 100644 --- a/src/db.rs +++ b/src/db.rs @@ -5,12 +5,12 @@ use anyhow::{Context, Result}; use deadpool_postgres::{Config, Pool, Runtime}; use pgvector::Vector; -use tokio_postgres::NoTls; +use tokio_postgres::{GenericClient, NoTls}; use tracing::info; use uuid::Uuid; use serde::Serialize; -use serde_json::Value; +use serde_json::{Map, Value}; use crate::config::DatabaseConfig; /// Database wrapper with connection pool @@ -42,6 +42,69 @@ pub struct MemoryMatch { pub hybrid_score: f32, } +#[derive(Debug, Clone)] +pub struct StoreMemoryResult { + pub id: Uuid, + pub deduplicated: bool, + pub expires_at: Option>, +} + +#[derive(Debug, Clone)] +struct DedupMatch { + id: Uuid, + metadata: Value, + expires_at: Option>, +} + +fn merge_metadata(existing: &Value, incoming: &Value) -> Value { + match (existing, incoming) { + (Value::Object(existing), Value::Object(incoming)) => { + let mut merged = Map::with_capacity(existing.len() + incoming.len()); + for (key, value) in existing { + merged.insert(key.clone(), value.clone()); + } + for (key, value) in incoming { + merged.insert(key.clone(), value.clone()); + } + Value::Object(merged) + } + (_, Value::Null) => existing.clone(), + _ => incoming.clone(), + } +} + +async fn find_dedup_match( + client: &C, + agent_id: &str, + embedding: &Vector, + threshold: f32, +) -> Result> +where + C: GenericClient + Sync, +{ + let row = client + .query_opt( + r#" + SELECT id, metadata, expires_at + FROM memories + WHERE agent_id = $1 + AND (expires_at IS NULL OR expires_at > NOW()) + AND (1 - (embedding <=> $2)) >= $3 + ORDER BY (1 - (embedding <=> $2)) DESC, created_at DESC + LIMIT 1 + "#, + &[&agent_id, embedding, &threshold], + ) + .await + .context("Failed to check for duplicate memory")?; + + Ok(row.map(|row| DedupMatch { + id: row.get("id"), + metadata: row.get("metadata"), + expires_at: row.get("expires_at"), + })) +} + impl Database { /// Create a new database connection pool pub async fn new(config: &DatabaseConfig) -> Result { @@ -77,11 +140,38 @@ impl Database { keywords: &[String], metadata: serde_json::Value, expires_at: Option>, - ) -> Result { + dedup_threshold: f32, + ) -> Result { let client = self.pool.get().await?; - let id = Uuid::new_v4(); let vector = Vector::from(embedding.to_vec()); + if let Some(existing) = find_dedup_match(&client, agent_id, &vector, dedup_threshold).await? { + let merged_metadata = merge_metadata(&existing.metadata, &metadata); + let refreshed_expires_at = expires_at.or(existing.expires_at); + + client + .execute( + r#" + UPDATE memories + SET metadata = $2, + created_at = NOW(), + expires_at = $3 + WHERE id = $1 + "#, + &[&existing.id, &merged_metadata, &refreshed_expires_at], + ) + .await + .context("Failed to update deduplicated memory")?; + + return Ok(StoreMemoryResult { + id: existing.id, + deduplicated: true, + expires_at: refreshed_expires_at, + }); + } + + let id = Uuid::new_v4(); + client .execute( r#" @@ -93,7 +183,11 @@ impl Database { .await .context("Failed to store memory")?; - Ok(id) + Ok(StoreMemoryResult { + id, + deduplicated: false, + expires_at, + }) } /// Query memories by vector similarity @@ -257,6 +351,7 @@ impl Database { pub struct BatchStoreResult { pub id: String, pub status: String, + pub deduplicated: bool, pub expires_at: Option, } @@ -272,23 +367,51 @@ impl Database { Vec, Option>, )>, + dedup_threshold: f32, ) -> Result> { let mut client = self.pool.get().await?; let transaction = client.transaction().await?; let mut results = Vec::with_capacity(entries.len()); for (content, metadata, embedding, keywords, expires_at) in entries { - let id = Uuid::new_v4(); let vector = Vector::from(embedding); - transaction.execute( - r#"INSERT INTO memories (id, agent_id, content, embedding, keywords, metadata, expires_at) VALUES ($1, $2, $3, $4, $5, $6, $7)"#, - &[&id, &agent_id, &content, &vector, &keywords, &metadata, &expires_at], - ).await?; - results.push(BatchStoreResult { - id: id.to_string(), - status: "stored".to_string(), - expires_at: expires_at.map(|ts| ts.to_rfc3339()), - }); + if let Some(existing) = + find_dedup_match(&transaction, agent_id, &vector, dedup_threshold).await? + { + let merged_metadata = merge_metadata(&existing.metadata, &metadata); + let refreshed_expires_at = expires_at.or(existing.expires_at); + transaction + .execute( + r#" + UPDATE memories + SET metadata = $2, + created_at = NOW(), + expires_at = $3 + WHERE id = $1 + "#, + &[&existing.id, &merged_metadata, &refreshed_expires_at], + ) + .await + .context("Failed to update deduplicated batch memory")?; + results.push(BatchStoreResult { + id: existing.id.to_string(), + status: "deduplicated".to_string(), + deduplicated: true, + expires_at: refreshed_expires_at.map(|ts| ts.to_rfc3339()), + }); + } else { + let id = Uuid::new_v4(); + transaction.execute( + r#"INSERT INTO memories (id, agent_id, content, embedding, keywords, metadata, expires_at) VALUES ($1, $2, $3, $4, $5, $6, $7)"#, + &[&id, &agent_id, &content, &vector, &keywords, &metadata, &expires_at], + ).await?; + results.push(BatchStoreResult { + id: id.to_string(), + status: "stored".to_string(), + deduplicated: false, + expires_at: expires_at.map(|ts| ts.to_rfc3339()), + }); + } } transaction.commit().await?; Ok(results) diff --git a/src/tools/batch_store.rs b/src/tools/batch_store.rs index c2b0578..bb14d43 100644 --- a/src/tools/batch_store.rs +++ b/src/tools/batch_store.rs @@ -103,7 +103,7 @@ pub async fn execute(state: &Arc, arguments: Value) -> Result // 5. Batch DB insert (single transaction for atomicity) let results = state .db - .batch_store_memories(agent_id, processed_entries) + .batch_store_memories(agent_id, processed_entries, state.config.dedup.threshold) .await .context("Failed to batch store memories")?; diff --git a/src/tools/mod.rs b/src/tools/mod.rs index 73e00da..e9358fb 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -15,7 +15,7 @@ pub fn get_tool_definitions() -> Vec { vec![ json!({ "name": "store", - "description": "Store a memory with automatic embedding generation and keyword extraction. The memory will be associated with the agent_id for isolated retrieval.", + "description": "Store a memory with automatic embedding generation and keyword extraction. Near-duplicate memories for the same agent are deduplicated automatically by similarity, with metadata merged and timestamps refreshed.", "inputSchema": { "type": "object", "properties": { @@ -41,7 +41,7 @@ pub fn get_tool_definitions() -> Vec { }), json!({ "name": "batch_store", - "description": "Store multiple memories with automatic embedding generation and keyword extraction. Accepts 1-50 entries and stores them atomically in a single transaction.", + "description": "Store multiple memories with automatic embedding generation and keyword extraction. Accepts 1-50 entries, stores them atomically in a single transaction, and applies the same automatic deduplication rules as single-store.", "inputSchema": { "type": "object", "properties": { diff --git a/src/tools/store.rs b/src/tools/store.rs index 520b3ed..e9b118d 100644 --- a/src/tools/store.rs +++ b/src/tools/store.rs @@ -60,20 +60,26 @@ pub async fn execute(state: &Arc, arguments: Value) -> Result &keywords, metadata, expires_at.clone(), + state.config.dedup.threshold, ) .await .context("Failed to store memory")?; - info!("Memory stored with ID: {}", id); + info!( + "Memory {} with ID: {}", + if id.deduplicated { "deduplicated" } else { "stored" }, + id.id + ); Ok(serde_json::json!({ "success": true, - "id": id.to_string(), + "id": id.id.to_string(), "agent_id": agent_id, + "deduplicated": id.deduplicated, "keywords": keywords, "embedding_dimension": embedding.len(), "ttl": ttl, - "expires_at": expires_at.as_ref().map(|ts| ts.to_rfc3339()) + "expires_at": id.expires_at.as_ref().map(|ts| ts.to_rfc3339()) }) .to_string()) } diff --git a/tests/e2e_mcp.rs b/tests/e2e_mcp.rs index f31a8d0..0176b06 100644 --- a/tests/e2e_mcp.rs +++ b/tests/e2e_mcp.rs @@ -1242,5 +1242,224 @@ async fn e2e_existing_store_unchanged() -> anyhow::Result<()> { .await; assert!(result["success"].as_bool().unwrap_or(false)); + assert_eq!(result["deduplicated"].as_bool(), Some(false)); + Ok(()) +} + + +// ============================================================================= +// Deduplication Tests (Issue #14) +// ============================================================================= + +#[tokio::test] +async fn e2e_store_deduplicates_and_merges_metadata() -> anyhow::Result<()> { + let base = base_url(); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(20)) + .build() + .expect("reqwest client"); + + ensure_schema().await; + wait_until_ready(&client, &base).await; + + let agent = format!("dedup_{}", uuid::Uuid::new_v4()); + let content = format!("Dedup fact {} prefers concise replies", uuid::Uuid::new_v4()); + let _ = call_tool( + &client, + &base, + "purge", + json!({ "agent_id": agent.clone(), "confirm": true }), + ) + .await; + + let first = call_tool(&client, &base, "store", json!({ + "agent_id": agent.clone(), + "content": content.clone(), + "metadata": { + "source": "first", + "keep": true, + "override": "old" + } + })) + .await; + + assert_eq!(first["deduplicated"].as_bool(), Some(false)); + + let first_query = call_tool(&client, &base, "query", json!({ + "agent_id": agent.clone(), + "query": content.clone(), + "limit": 5, + "threshold": 0.0 + })) + .await; + let first_created_at = first_query["results"] + .as_array() + .and_then(|items| items.first()) + .and_then(|item| item.get("created_at")) + .and_then(Value::as_str) + .expect("first created_at") + .to_string(); + + tokio::time::sleep(Duration::from_millis(1100)).await; + + let second = call_tool(&client, &base, "store", json!({ + "agent_id": agent.clone(), + "content": content.clone(), + "metadata": { + "override": "new", + "second": true + } + })) + .await; + + assert_eq!(second["deduplicated"].as_bool(), Some(true)); + assert_eq!(second["id"], first["id"]); + + let query = call_tool(&client, &base, "query", json!({ + "agent_id": agent.clone(), + "query": content.clone(), + "limit": 5, + "threshold": 0.0 + })) + .await; + + assert_eq!(query["count"].as_u64(), Some(1)); + let stored = query["results"] + .as_array() + .and_then(|items| items.first()) + .expect("dedup query result"); + + assert_eq!(stored["metadata"]["source"], "first"); + assert_eq!(stored["metadata"]["keep"], true); + assert_eq!(stored["metadata"]["override"], "new"); + assert_eq!(stored["metadata"]["second"], true); + + let second_created_at = stored["created_at"] + .as_str() + .expect("second created_at"); + assert!( + second_created_at > first_created_at.as_str(), + "deduplicated write should refresh created_at" + ); + + let _ = call_tool( + &client, + &base, + "purge", + json!({ "agent_id": agent, "confirm": true }), + ) + .await; + + Ok(()) +} + +#[tokio::test] +async fn e2e_store_dedup_is_agent_scoped() -> anyhow::Result<()> { + let base = base_url(); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(20)) + .build() + .expect("reqwest client"); + + ensure_schema().await; + wait_until_ready(&client, &base).await; + + let agent_a = format!("dedup_scope_a_{}", uuid::Uuid::new_v4()); + let agent_b = format!("dedup_scope_b_{}", uuid::Uuid::new_v4()); + let content = format!("Shared cross-agent fact {}", uuid::Uuid::new_v4()); + + let _ = call_tool(&client, &base, "purge", json!({ "agent_id": agent_a.clone(), "confirm": true })).await; + let _ = call_tool(&client, &base, "purge", json!({ "agent_id": agent_b.clone(), "confirm": true })).await; + + let first = call_tool(&client, &base, "store", json!({ + "agent_id": agent_a.clone(), + "content": content.clone() + })) + .await; + let second = call_tool(&client, &base, "store", json!({ + "agent_id": agent_b.clone(), + "content": content.clone() + })) + .await; + + assert_eq!(first["deduplicated"].as_bool(), Some(false)); + assert_eq!(second["deduplicated"].as_bool(), Some(false)); + assert_ne!(first["id"], second["id"]); + + let _ = call_tool(&client, &base, "purge", json!({ "agent_id": agent_a, "confirm": true })).await; + let _ = call_tool(&client, &base, "purge", json!({ "agent_id": agent_b, "confirm": true })).await; + + Ok(()) +} + +#[tokio::test] +async fn e2e_batch_store_deduplicates_within_batch() -> anyhow::Result<()> { + let base = base_url(); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(20)) + .build() + .expect("reqwest client"); + + ensure_schema().await; + wait_until_ready(&client, &base).await; + + let agent = format!("batch_dedup_{}", uuid::Uuid::new_v4()); + let content = format!("Batch dedup fact {}", uuid::Uuid::new_v4()); + let _ = call_tool( + &client, + &base, + "purge", + json!({ "agent_id": agent.clone(), "confirm": true }), + ) + .await; + + let result = call_tool(&client, &base, "batch_store", json!({ + "agent_id": agent.clone(), + "entries": [ + { + "content": content.clone(), + "metadata": { "source": "first", "keep": "yes" } + }, + { + "content": content.clone(), + "metadata": { "source": "second", "merged": "yes" } + } + ] + })) + .await; + + let results = result["results"].as_array().expect("batch results"); + assert_eq!(result["count"].as_u64(), Some(2)); + assert_eq!(results[0]["deduplicated"].as_bool(), Some(false)); + assert_eq!(results[0]["status"], "stored"); + assert_eq!(results[1]["deduplicated"].as_bool(), Some(true)); + assert_eq!(results[1]["status"], "deduplicated"); + assert_eq!(results[0]["id"], results[1]["id"]); + + let query = call_tool(&client, &base, "query", json!({ + "agent_id": agent.clone(), + "query": content.clone(), + "limit": 5, + "threshold": 0.0 + })) + .await; + + assert_eq!(query["count"].as_u64(), Some(1)); + let stored = query["results"] + .as_array() + .and_then(|items| items.first()) + .expect("batch dedup query result"); + assert_eq!(stored["metadata"]["source"], "second"); + assert_eq!(stored["metadata"]["keep"], "yes"); + assert_eq!(stored["metadata"]["merged"], "yes"); + + let _ = call_tool( + &client, + &base, + "purge", + json!({ "agent_id": agent, "confirm": true }), + ) + .await; + Ok(()) }