Files
openbrain-mcp/src/db.rs
Agent Zero 5f9d884187 feat(db): add truth scoring columns migration (#31)
Add V5__truth_scoring.sql migration:
- truth_value (REAL): 0.0-1.0 truth score from PLN reasoning
- truth_confidence (REAL): 0.0-1.0 confidence in the score
- truth_category (TEXT): verified/plausible/unverified/contradicted
- truth_evaluated_at (TIMESTAMPTZ): when last scored
- ecan_sti (REAL): Short-Term Importance (recency-weighted)
- ecan_lti (REAL): Long-Term Importance (reliability)

Partial indexes for efficient unscored memory queries.
Update MemoryRecord struct with optional truth fields.

Part of #29
2026-04-04 02:10:15 +00:00

461 lines
15 KiB
Rust

//! Database module for PostgreSQL with pgvector support
//!
//! Provides connection pooling and query helpers for vector operations.
use anyhow::{Context, Result};
use deadpool_postgres::{Config, GenericClient, Pool, Runtime};
use pgvector::Vector;
use tokio_postgres::NoTls;
use tracing::info;
use uuid::Uuid;
use crate::config::DatabaseConfig;
use serde::Serialize;
use serde_json::{Map, Value};
/// Database wrapper with connection pool
#[derive(Clone)]
pub struct Database {
pool: Pool,
}
/// A memory record stored in the database
#[derive(Debug, Clone)]
pub struct MemoryRecord {
pub id: Uuid,
pub agent_id: String,
pub content: String,
pub embedding: Vec<f32>,
pub keywords: Vec<String>,
pub metadata: serde_json::Value,
pub created_at: chrono::DateTime<chrono::Utc>,
pub expires_at: Option<chrono::DateTime<chrono::Utc>>,
// Truth scoring fields (populated by background worker)
pub truth_value: Option<f32>,
pub truth_confidence: Option<f32>,
pub truth_category: Option<String>,
pub truth_evaluated_at: Option<chrono::DateTime<chrono::Utc>>,
pub ecan_sti: Option<f32>,
pub ecan_lti: Option<f32>,
}
/// Query result with similarity score
#[derive(Debug, Clone)]
pub struct MemoryMatch {
pub record: MemoryRecord,
pub similarity: f32,
pub vector_score: f32,
pub text_score: f32,
pub hybrid_score: f32,
}
#[derive(Debug, Clone)]
pub struct StoreMemoryResult {
pub id: Uuid,
pub deduplicated: bool,
pub expires_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone)]
struct DedupMatch {
id: Uuid,
metadata: Value,
expires_at: Option<chrono::DateTime<chrono::Utc>>,
}
fn merge_metadata(existing: &Value, incoming: &Value) -> Value {
match (existing, incoming) {
(Value::Object(existing), Value::Object(incoming)) => {
let mut merged = Map::with_capacity(existing.len() + incoming.len());
for (key, value) in existing {
merged.insert(key.clone(), value.clone());
}
for (key, value) in incoming {
merged.insert(key.clone(), value.clone());
}
Value::Object(merged)
}
(_, Value::Null) => existing.clone(),
_ => incoming.clone(),
}
}
async fn find_dedup_match<C>(
client: &C,
auth_scope: &str,
agent_id: &str,
embedding: &Vector,
threshold: f64,
) -> Result<Option<DedupMatch>>
where
C: GenericClient + Sync,
{
let row = client
.query_opt(
r#"
SELECT id, metadata, expires_at
FROM memories
WHERE auth_scope = $1
AND agent_id = $2
AND (expires_at IS NULL OR expires_at > NOW())
AND (1 - (embedding <=> $3)) >= $4
ORDER BY (1 - (embedding <=> $3)) DESC, created_at DESC
LIMIT 1
"#,
&[&auth_scope, &agent_id, embedding, &threshold],
)
.await
.context("Failed to check for duplicate memory")?;
Ok(row.map(|row| DedupMatch {
id: row.get("id"),
metadata: row.get("metadata"),
expires_at: row.get("expires_at"),
}))
}
impl Database {
/// Create a new database connection pool
pub async fn new(config: &DatabaseConfig) -> Result<Self> {
let mut cfg = Config::new();
cfg.host = Some(config.host.clone());
cfg.port = Some(config.port);
cfg.dbname = Some(config.name.clone());
cfg.user = Some(config.user.clone());
cfg.password = Some(config.password.clone());
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context("Failed to create database pool")?;
// Test connection
let client = pool
.get()
.await
.context("Failed to get database connection")?;
client
.simple_query("SELECT 1")
.await
.context("Failed to execute test query")?;
info!(
"Database connection pool created with {} connections",
config.pool_size
);
Ok(Self { pool })
}
/// Store a memory record
pub async fn store_memory(
&self,
auth_scope: &str,
agent_id: &str,
content: &str,
embedding: &[f32],
keywords: &[String],
metadata: serde_json::Value,
expires_at: Option<chrono::DateTime<chrono::Utc>>,
dedup_threshold: f32,
) -> Result<StoreMemoryResult> {
let client = self.pool.get().await?;
let vector = Vector::from(embedding.to_vec());
let dedup_threshold = dedup_threshold as f64;
if let Some(existing) =
find_dedup_match(&client, auth_scope, agent_id, &vector, dedup_threshold).await?
{
let merged_metadata = merge_metadata(&existing.metadata, &metadata);
let refreshed_expires_at = expires_at.or(existing.expires_at);
client
.execute(
r#"
UPDATE memories
SET metadata = $2,
created_at = NOW(),
expires_at = $3
WHERE id = $1
"#,
&[&existing.id, &merged_metadata, &refreshed_expires_at],
)
.await
.context("Failed to update deduplicated memory")?;
return Ok(StoreMemoryResult {
id: existing.id,
deduplicated: true,
expires_at: refreshed_expires_at,
});
}
let id = Uuid::new_v4();
client
.execute(
r#"
INSERT INTO memories (id, auth_scope, agent_id, content, embedding, keywords, metadata, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
"#,
&[&id, &auth_scope, &agent_id, &content, &vector, &keywords, &metadata, &expires_at],
)
.await
.context("Failed to store memory")?;
Ok(StoreMemoryResult {
id,
deduplicated: false,
expires_at,
})
}
/// Query memories by vector similarity
pub async fn query_memories(
&self,
auth_scope: &str,
source_agent_id: Option<&str>,
query_text: &str,
embedding: &[f32],
limit: i64,
threshold: f32,
vector_weight: f32,
text_weight: f32,
) -> Result<Vec<MemoryMatch>> {
let client = self.pool.get().await?;
let vector = Vector::from(embedding.to_vec());
let rows = client
.query(
r#"
WITH search_query AS (
SELECT
NULLIF(plainto_tsquery('pg_catalog.english', $2)::text, '') AS query_text,
plainto_tsquery('pg_catalog.english', $2) AS ts_query
),
scored AS (
SELECT
id,
agent_id,
content,
keywords,
metadata,
created_at,
expires_at,
(1 - (embedding <=> $1))::real AS vector_score,
CASE
WHEN search_query.query_text IS NULL THEN 0::real
WHEN memories.tsv @@ search_query.ts_query
THEN ts_rank(memories.tsv, search_query.ts_query, 32)::real
ELSE 0::real
END AS text_score
FROM memories
CROSS JOIN search_query
WHERE memories.auth_scope = $3
AND ($4::text IS NULL OR memories.agent_id = $4)
AND (memories.expires_at IS NULL OR memories.expires_at > NOW())
),
ranked AS (
SELECT
*,
MAX(CASE WHEN text_score > 0 THEN 1 ELSE 0 END) OVER () AS has_text_match
FROM scored
)
SELECT
id,
agent_id,
content,
keywords,
metadata,
created_at,
expires_at,
vector_score,
text_score,
CASE
WHEN has_text_match = 1
THEN (($6 * vector_score) + ($7 * text_score))::real
ELSE vector_score
END AS hybrid_score
FROM ranked
WHERE vector_score >= $5 OR text_score > 0
ORDER BY hybrid_score DESC, vector_score DESC
LIMIT $8
"#,
&[
&vector,
&query_text,
&auth_scope,
&source_agent_id,
&threshold,
&vector_weight,
&text_weight,
&limit,
],
)
.await
.context("Failed to query memories")?;
let matches = rows
.iter()
.map(|row| MemoryMatch {
record: MemoryRecord {
id: row.get("id"),
agent_id: row.get("agent_id"),
content: row.get("content"),
// Query responses do not include raw embedding payloads.
embedding: Vec::new(),
keywords: row.get("keywords"),
metadata: row.get("metadata"),
created_at: row.get("created_at"),
expires_at: row.get("expires_at"),
// Truth fields will be populated by issue #39
truth_value: None,
truth_confidence: None,
truth_category: None,
truth_evaluated_at: None,
ecan_sti: None,
ecan_lti: None,
},
similarity: row.get("hybrid_score"),
vector_score: row.get("vector_score"),
text_score: row.get("text_score"),
hybrid_score: row.get("hybrid_score"),
})
.collect();
Ok(matches)
}
/// Delete memories visible to an auth scope with an optional provenance filter
pub async fn purge_memories(
&self,
auth_scope: &str,
source_agent_id: Option<&str>,
before: Option<chrono::DateTime<chrono::Utc>>,
) -> Result<u64> {
let client = self.pool.get().await?;
let count = client
.execute(
r#"
DELETE FROM memories
WHERE auth_scope = $1
AND ($2::text IS NULL OR agent_id = $2)
AND ($3::timestamptz IS NULL OR created_at < $3)
"#,
&[&auth_scope, &source_agent_id, &before],
)
.await?;
Ok(count)
}
/// Get memory count for a token-visible scope and optional provenance filter
pub async fn count_memories(
&self,
auth_scope: &str,
source_agent_id: Option<&str>,
) -> Result<i64> {
let client = self.pool.get().await?;
let row = client
.query_one(
r#"
SELECT COUNT(*) as count
FROM memories
WHERE auth_scope = $1
AND ($2::text IS NULL OR agent_id = $2)
AND (expires_at IS NULL OR expires_at > NOW())
"#,
&[&auth_scope, &source_agent_id],
)
.await?;
Ok(row.get("count"))
}
/// Delete expired memories across all agents
pub async fn cleanup_expired_memories(&self) -> Result<u64> {
let client = self.pool.get().await?;
let deleted = client
.execute(
"DELETE FROM memories WHERE expires_at IS NOT NULL AND expires_at <= NOW()",
&[],
)
.await
.context("Failed to cleanup expired memories")?;
Ok(deleted)
}
}
/// Result for a single batch entry
#[derive(Debug, Clone, Serialize)]
pub struct BatchStoreResult {
pub id: String,
pub status: String,
pub deduplicated: bool,
pub expires_at: Option<String>,
}
impl Database {
/// Store multiple memories in a single transaction
pub async fn batch_store_memories(
&self,
auth_scope: &str,
agent_id: &str,
entries: Vec<(
String,
Value,
Vec<f32>,
Vec<String>,
Option<chrono::DateTime<chrono::Utc>>,
)>,
dedup_threshold: f32,
) -> Result<Vec<BatchStoreResult>> {
let mut client = self.pool.get().await?;
let transaction = client.transaction().await?;
let mut results = Vec::with_capacity(entries.len());
let dedup_threshold = dedup_threshold as f64;
for (content, metadata, embedding, keywords, expires_at) in entries {
let vector = Vector::from(embedding);
if let Some(existing) =
find_dedup_match(&transaction, auth_scope, agent_id, &vector, dedup_threshold)
.await?
{
let merged_metadata = merge_metadata(&existing.metadata, &metadata);
let refreshed_expires_at = expires_at.or(existing.expires_at);
transaction
.execute(
r#"
UPDATE memories
SET metadata = $2,
created_at = NOW(),
expires_at = $3
WHERE id = $1
"#,
&[&existing.id, &merged_metadata, &refreshed_expires_at],
)
.await
.context("Failed to update deduplicated batch memory")?;
results.push(BatchStoreResult {
id: existing.id.to_string(),
status: "deduplicated".to_string(),
deduplicated: true,
expires_at: refreshed_expires_at.map(|ts| ts.to_rfc3339()),
});
} else {
let id = Uuid::new_v4();
transaction.execute(
r#"INSERT INTO memories (id, auth_scope, agent_id, content, embedding, keywords, metadata, expires_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"#,
&[&id, &auth_scope, &agent_id, &content, &vector, &keywords, &metadata, &expires_at],
).await?;
results.push(BatchStoreResult {
id: id.to_string(),
status: "stored".to_string(),
deduplicated: false,
expires_at: expires_at.map(|ts| ts.to_rfc3339()),
});
}
}
transaction.commit().await?;
Ok(results)
}
}