//! Database module for PostgreSQL with pgvector support //! //! Provides connection pooling and query helpers for vector operations. use anyhow::{Context, Result}; use deadpool_postgres::{Config, GenericClient, Pool, Runtime}; use pgvector::Vector; use tokio_postgres::NoTls; use tracing::info; use uuid::Uuid; use crate::config::DatabaseConfig; use serde::Serialize; use serde_json::{Map, Value}; /// Database wrapper with connection pool #[derive(Clone)] pub struct Database { pool: Pool, } /// A memory record stored in the database #[derive(Debug, Clone)] pub struct MemoryRecord { pub id: Uuid, pub agent_id: String, pub content: String, pub embedding: Vec, pub keywords: Vec, pub metadata: serde_json::Value, pub created_at: chrono::DateTime, pub expires_at: Option>, // Truth scoring fields (populated by background worker) pub truth_value: Option, pub truth_confidence: Option, pub truth_category: Option, pub truth_evaluated_at: Option>, pub ecan_sti: Option, pub ecan_lti: Option, } /// Query result with similarity score #[derive(Debug, Clone)] pub struct MemoryMatch { pub record: MemoryRecord, pub similarity: f32, pub vector_score: f32, pub text_score: f32, pub hybrid_score: f32, } #[derive(Debug, Clone)] pub struct StoreMemoryResult { pub id: Uuid, pub deduplicated: bool, pub expires_at: Option>, } #[derive(Debug, Clone)] struct DedupMatch { id: Uuid, metadata: Value, expires_at: Option>, } fn merge_metadata(existing: &Value, incoming: &Value) -> Value { match (existing, incoming) { (Value::Object(existing), Value::Object(incoming)) => { let mut merged = Map::with_capacity(existing.len() + incoming.len()); for (key, value) in existing { merged.insert(key.clone(), value.clone()); } for (key, value) in incoming { merged.insert(key.clone(), value.clone()); } Value::Object(merged) } (_, Value::Null) => existing.clone(), _ => incoming.clone(), } } async fn find_dedup_match( client: &C, auth_scope: &str, agent_id: &str, embedding: &Vector, threshold: f64, ) -> Result> where C: GenericClient + Sync, { let row = client .query_opt( r#" SELECT id, metadata, expires_at FROM memories WHERE auth_scope = $1 AND agent_id = $2 AND (expires_at IS NULL OR expires_at > NOW()) AND (1 - (embedding <=> $3)) >= $4 ORDER BY (1 - (embedding <=> $3)) DESC, created_at DESC LIMIT 1 "#, &[&auth_scope, &agent_id, embedding, &threshold], ) .await .context("Failed to check for duplicate memory")?; Ok(row.map(|row| DedupMatch { id: row.get("id"), metadata: row.get("metadata"), expires_at: row.get("expires_at"), })) } impl Database { /// Create a new database connection pool pub async fn new(config: &DatabaseConfig) -> Result { let mut cfg = Config::new(); cfg.host = Some(config.host.clone()); cfg.port = Some(config.port); cfg.dbname = Some(config.name.clone()); cfg.user = Some(config.user.clone()); cfg.password = Some(config.password.clone()); let pool = cfg .create_pool(Some(Runtime::Tokio1), NoTls) .context("Failed to create database pool")?; // Test connection let client = pool .get() .await .context("Failed to get database connection")?; client .simple_query("SELECT 1") .await .context("Failed to execute test query")?; info!( "Database connection pool created with {} connections", config.pool_size ); Ok(Self { pool }) } /// Store a memory record pub async fn store_memory( &self, auth_scope: &str, agent_id: &str, content: &str, embedding: &[f32], keywords: &[String], metadata: serde_json::Value, expires_at: Option>, dedup_threshold: f32, ) -> Result { let client = self.pool.get().await?; let vector = Vector::from(embedding.to_vec()); let dedup_threshold = dedup_threshold as f64; if let Some(existing) = find_dedup_match(&client, auth_scope, agent_id, &vector, dedup_threshold).await? { let merged_metadata = merge_metadata(&existing.metadata, &metadata); let refreshed_expires_at = expires_at.or(existing.expires_at); client .execute( r#" UPDATE memories SET metadata = $2, created_at = NOW(), expires_at = $3 WHERE id = $1 "#, &[&existing.id, &merged_metadata, &refreshed_expires_at], ) .await .context("Failed to update deduplicated memory")?; return Ok(StoreMemoryResult { id: existing.id, deduplicated: true, expires_at: refreshed_expires_at, }); } let id = Uuid::new_v4(); client .execute( r#" INSERT INTO memories (id, auth_scope, agent_id, content, embedding, keywords, metadata, expires_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) "#, &[&id, &auth_scope, &agent_id, &content, &vector, &keywords, &metadata, &expires_at], ) .await .context("Failed to store memory")?; Ok(StoreMemoryResult { id, deduplicated: false, expires_at, }) } /// Query memories by vector similarity pub async fn query_memories( &self, auth_scope: &str, source_agent_id: Option<&str>, query_text: &str, embedding: &[f32], limit: i64, threshold: f32, vector_weight: f32, text_weight: f32, ) -> Result> { let client = self.pool.get().await?; let vector = Vector::from(embedding.to_vec()); let rows = client .query( r#" WITH search_query AS ( SELECT NULLIF(plainto_tsquery('pg_catalog.english', $2)::text, '') AS query_text, plainto_tsquery('pg_catalog.english', $2) AS ts_query ), scored AS ( SELECT id, agent_id, content, keywords, metadata, created_at, expires_at, (1 - (embedding <=> $1))::real AS vector_score, CASE WHEN search_query.query_text IS NULL THEN 0::real WHEN memories.tsv @@ search_query.ts_query THEN ts_rank(memories.tsv, search_query.ts_query, 32)::real ELSE 0::real END AS text_score FROM memories CROSS JOIN search_query WHERE memories.auth_scope = $3 AND ($4::text IS NULL OR memories.agent_id = $4) AND (memories.expires_at IS NULL OR memories.expires_at > NOW()) ), ranked AS ( SELECT *, MAX(CASE WHEN text_score > 0 THEN 1 ELSE 0 END) OVER () AS has_text_match FROM scored ) SELECT id, agent_id, content, keywords, metadata, created_at, expires_at, vector_score, text_score, CASE WHEN has_text_match = 1 THEN (($6 * vector_score) + ($7 * text_score))::real ELSE vector_score END AS hybrid_score FROM ranked WHERE vector_score >= $5 OR text_score > 0 ORDER BY hybrid_score DESC, vector_score DESC LIMIT $8 "#, &[ &vector, &query_text, &auth_scope, &source_agent_id, &threshold, &vector_weight, &text_weight, &limit, ], ) .await .context("Failed to query memories")?; let matches = rows .iter() .map(|row| MemoryMatch { record: MemoryRecord { id: row.get("id"), agent_id: row.get("agent_id"), content: row.get("content"), // Query responses do not include raw embedding payloads. embedding: Vec::new(), keywords: row.get("keywords"), metadata: row.get("metadata"), created_at: row.get("created_at"), expires_at: row.get("expires_at"), // Truth fields will be populated by issue #39 truth_value: None, truth_confidence: None, truth_category: None, truth_evaluated_at: None, ecan_sti: None, ecan_lti: None, }, similarity: row.get("hybrid_score"), vector_score: row.get("vector_score"), text_score: row.get("text_score"), hybrid_score: row.get("hybrid_score"), }) .collect(); Ok(matches) } /// Delete memories visible to an auth scope with an optional provenance filter pub async fn purge_memories( &self, auth_scope: &str, source_agent_id: Option<&str>, before: Option>, ) -> Result { let client = self.pool.get().await?; let count = client .execute( r#" DELETE FROM memories WHERE auth_scope = $1 AND ($2::text IS NULL OR agent_id = $2) AND ($3::timestamptz IS NULL OR created_at < $3) "#, &[&auth_scope, &source_agent_id, &before], ) .await?; Ok(count) } /// Get memory count for a token-visible scope and optional provenance filter pub async fn count_memories( &self, auth_scope: &str, source_agent_id: Option<&str>, ) -> Result { let client = self.pool.get().await?; let row = client .query_one( r#" SELECT COUNT(*) as count FROM memories WHERE auth_scope = $1 AND ($2::text IS NULL OR agent_id = $2) AND (expires_at IS NULL OR expires_at > NOW()) "#, &[&auth_scope, &source_agent_id], ) .await?; Ok(row.get("count")) } /// Delete expired memories across all agents pub async fn cleanup_expired_memories(&self) -> Result { let client = self.pool.get().await?; let deleted = client .execute( "DELETE FROM memories WHERE expires_at IS NOT NULL AND expires_at <= NOW()", &[], ) .await .context("Failed to cleanup expired memories")?; Ok(deleted) } } /// Result for a single batch entry #[derive(Debug, Clone, Serialize)] pub struct BatchStoreResult { pub id: String, pub status: String, pub deduplicated: bool, pub expires_at: Option, } impl Database { /// Store multiple memories in a single transaction pub async fn batch_store_memories( &self, auth_scope: &str, agent_id: &str, entries: Vec<( String, Value, Vec, Vec, Option>, )>, dedup_threshold: f32, ) -> Result> { let mut client = self.pool.get().await?; let transaction = client.transaction().await?; let mut results = Vec::with_capacity(entries.len()); let dedup_threshold = dedup_threshold as f64; for (content, metadata, embedding, keywords, expires_at) in entries { let vector = Vector::from(embedding); if let Some(existing) = find_dedup_match(&transaction, auth_scope, agent_id, &vector, dedup_threshold) .await? { let merged_metadata = merge_metadata(&existing.metadata, &metadata); let refreshed_expires_at = expires_at.or(existing.expires_at); transaction .execute( r#" UPDATE memories SET metadata = $2, created_at = NOW(), expires_at = $3 WHERE id = $1 "#, &[&existing.id, &merged_metadata, &refreshed_expires_at], ) .await .context("Failed to update deduplicated batch memory")?; results.push(BatchStoreResult { id: existing.id.to_string(), status: "deduplicated".to_string(), deduplicated: true, expires_at: refreshed_expires_at.map(|ts| ts.to_rfc3339()), }); } else { let id = Uuid::new_v4(); transaction.execute( r#"INSERT INTO memories (id, auth_scope, agent_id, content, embedding, keywords, metadata, expires_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"#, &[&id, &auth_scope, &agent_id, &content, &vector, &keywords, &metadata, &expires_at], ).await?; results.push(BatchStoreResult { id: id.to_string(), status: "stored".to_string(), deduplicated: false, expires_at: expires_at.map(|ts| ts.to_rfc3339()), }); } } transaction.commit().await?; Ok(results) } }