Add server-side deduplication on ingest

This commit is contained in:
Agent Zero
2026-03-24 05:40:30 +00:00
parent 5d5c042dd1
commit 61d6448b44
8 changed files with 421 additions and 23 deletions

153
src/db.rs
View File

@@ -5,12 +5,12 @@
use anyhow::{Context, Result};
use deadpool_postgres::{Config, Pool, Runtime};
use pgvector::Vector;
use tokio_postgres::NoTls;
use tokio_postgres::{GenericClient, NoTls};
use tracing::info;
use uuid::Uuid;
use serde::Serialize;
use serde_json::Value;
use serde_json::{Map, Value};
use crate::config::DatabaseConfig;
/// Database wrapper with connection pool
@@ -42,6 +42,69 @@ pub struct MemoryMatch {
pub hybrid_score: f32,
}
#[derive(Debug, Clone)]
pub struct StoreMemoryResult {
pub id: Uuid,
pub deduplicated: bool,
pub expires_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone)]
struct DedupMatch {
id: Uuid,
metadata: Value,
expires_at: Option<chrono::DateTime<chrono::Utc>>,
}
fn merge_metadata(existing: &Value, incoming: &Value) -> Value {
match (existing, incoming) {
(Value::Object(existing), Value::Object(incoming)) => {
let mut merged = Map::with_capacity(existing.len() + incoming.len());
for (key, value) in existing {
merged.insert(key.clone(), value.clone());
}
for (key, value) in incoming {
merged.insert(key.clone(), value.clone());
}
Value::Object(merged)
}
(_, Value::Null) => existing.clone(),
_ => incoming.clone(),
}
}
async fn find_dedup_match<C>(
client: &C,
agent_id: &str,
embedding: &Vector,
threshold: f32,
) -> Result<Option<DedupMatch>>
where
C: GenericClient + Sync,
{
let row = client
.query_opt(
r#"
SELECT id, metadata, expires_at
FROM memories
WHERE agent_id = $1
AND (expires_at IS NULL OR expires_at > NOW())
AND (1 - (embedding <=> $2)) >= $3
ORDER BY (1 - (embedding <=> $2)) DESC, created_at DESC
LIMIT 1
"#,
&[&agent_id, embedding, &threshold],
)
.await
.context("Failed to check for duplicate memory")?;
Ok(row.map(|row| DedupMatch {
id: row.get("id"),
metadata: row.get("metadata"),
expires_at: row.get("expires_at"),
}))
}
impl Database {
/// Create a new database connection pool
pub async fn new(config: &DatabaseConfig) -> Result<Self> {
@@ -77,11 +140,38 @@ impl Database {
keywords: &[String],
metadata: serde_json::Value,
expires_at: Option<chrono::DateTime<chrono::Utc>>,
) -> Result<Uuid> {
dedup_threshold: f32,
) -> Result<StoreMemoryResult> {
let client = self.pool.get().await?;
let id = Uuid::new_v4();
let vector = Vector::from(embedding.to_vec());
if let Some(existing) = find_dedup_match(&client, agent_id, &vector, dedup_threshold).await? {
let merged_metadata = merge_metadata(&existing.metadata, &metadata);
let refreshed_expires_at = expires_at.or(existing.expires_at);
client
.execute(
r#"
UPDATE memories
SET metadata = $2,
created_at = NOW(),
expires_at = $3
WHERE id = $1
"#,
&[&existing.id, &merged_metadata, &refreshed_expires_at],
)
.await
.context("Failed to update deduplicated memory")?;
return Ok(StoreMemoryResult {
id: existing.id,
deduplicated: true,
expires_at: refreshed_expires_at,
});
}
let id = Uuid::new_v4();
client
.execute(
r#"
@@ -93,7 +183,11 @@ impl Database {
.await
.context("Failed to store memory")?;
Ok(id)
Ok(StoreMemoryResult {
id,
deduplicated: false,
expires_at,
})
}
/// Query memories by vector similarity
@@ -257,6 +351,7 @@ impl Database {
pub struct BatchStoreResult {
pub id: String,
pub status: String,
pub deduplicated: bool,
pub expires_at: Option<String>,
}
@@ -272,23 +367,51 @@ impl Database {
Vec<String>,
Option<chrono::DateTime<chrono::Utc>>,
)>,
dedup_threshold: f32,
) -> Result<Vec<BatchStoreResult>> {
let mut client = self.pool.get().await?;
let transaction = client.transaction().await?;
let mut results = Vec::with_capacity(entries.len());
for (content, metadata, embedding, keywords, expires_at) in entries {
let id = Uuid::new_v4();
let vector = Vector::from(embedding);
transaction.execute(
r#"INSERT INTO memories (id, agent_id, content, embedding, keywords, metadata, expires_at) VALUES ($1, $2, $3, $4, $5, $6, $7)"#,
&[&id, &agent_id, &content, &vector, &keywords, &metadata, &expires_at],
).await?;
results.push(BatchStoreResult {
id: id.to_string(),
status: "stored".to_string(),
expires_at: expires_at.map(|ts| ts.to_rfc3339()),
});
if let Some(existing) =
find_dedup_match(&transaction, agent_id, &vector, dedup_threshold).await?
{
let merged_metadata = merge_metadata(&existing.metadata, &metadata);
let refreshed_expires_at = expires_at.or(existing.expires_at);
transaction
.execute(
r#"
UPDATE memories
SET metadata = $2,
created_at = NOW(),
expires_at = $3
WHERE id = $1
"#,
&[&existing.id, &merged_metadata, &refreshed_expires_at],
)
.await
.context("Failed to update deduplicated batch memory")?;
results.push(BatchStoreResult {
id: existing.id.to_string(),
status: "deduplicated".to_string(),
deduplicated: true,
expires_at: refreshed_expires_at.map(|ts| ts.to_rfc3339()),
});
} else {
let id = Uuid::new_v4();
transaction.execute(
r#"INSERT INTO memories (id, agent_id, content, embedding, keywords, metadata, expires_at) VALUES ($1, $2, $3, $4, $5, $6, $7)"#,
&[&id, &agent_id, &content, &vector, &keywords, &metadata, &expires_at],
).await?;
results.push(BatchStoreResult {
id: id.to_string(),
status: "stored".to_string(),
deduplicated: false,
expires_at: expires_at.map(|ts| ts.to_rfc3339()),
});
}
}
transaction.commit().await?;
Ok(results)