From 8d0f94451380f2a9eb5875a745b0cd6d6aa04b3b Mon Sep 17 00:00:00 2001 From: Agent Zero Date: Sat, 4 Apr 2026 04:07:58 +0000 Subject: [PATCH] feat(truth): add background truth scoring worker (#36) --- src/db.rs | 46 ++++++++++++++++++ src/lib.rs | 65 +++++++++++++++++++++++++ src/truth/mod.rs | 3 ++ src/truth/worker.rs | 112 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 226 insertions(+) create mode 100644 src/truth/worker.rs diff --git a/src/db.rs b/src/db.rs index e4a126b..e844a69 100644 --- a/src/db.rs +++ b/src/db.rs @@ -640,6 +640,52 @@ impl Database { coverage_pct, }) } + + /// Find memories related to the given embedding vector, excluding the source memory. + /// Used by the truth scoring worker for cross-referencing. + pub async fn find_related_memories( + &self, + candidate_embedding: &[f32], + exclude_id: Uuid, + limit: i64, + ) -> Result> { + let vector = pgvector::Vector::from(candidate_embedding.to_vec()); + let client = self.pool.get().await?; + let rows = client + .query( + r#" + SELECT id, content, truth_value, truth_confidence, + 1 - (embedding <=> $1) AS similarity + FROM memories + WHERE id != $2 + AND (expires_at IS NULL OR expires_at > NOW()) + ORDER BY embedding <=> $1 + LIMIT $3 + "#, + &[&vector, &exclude_id, &limit], + ) + .await + .context("Failed to find related memories")?; + + Ok(rows + .iter() + .map(|row| RelatedMemoryRow { + similarity: row.get("similarity"), + content: row.get("content"), + truth_value: row.get("truth_value"), + truth_confidence: row.get("truth_confidence"), + }) + .collect()) + } +} + +/// A row returned from the related memories query. +#[derive(Debug, Clone)] +pub struct RelatedMemoryRow { + pub similarity: f64, + pub content: String, + pub truth_value: Option, + pub truth_confidence: Option, } /// Result for a single batch entry diff --git a/src/lib.rs b/src/lib.rs index 52be22a..24fb76d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -143,6 +143,71 @@ pub async fn run_server(config: Config, db: Database) -> Result<()> { }); } + // Spawn truth scoring background worker if enabled + if config.truth.enabled { + let truth_state = state.clone(); + let truth_config = config.truth.clone(); + let scoring_interval = config.truth.scoring_interval_seconds; + info!( + "Truth scoring enabled (interval={}s, batch={})", + scoring_interval, truth_config.batch_size + ); + tokio::spawn(async move { + let mut interval = tokio::time::interval( + tokio::time::Duration::from_secs(scoring_interval), + ); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + // Wait for the embedding engine to be ready before starting + loop { + let readiness = truth_state.readiness.read().await; + match &*readiness { + ReadinessState::Ready => break, + ReadinessState::Failed(_) => { + error!("Embedding engine failed — truth scoring worker exiting"); + return; + } + _ => { + drop(readiness); + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + } + } + } + info!("Truth scoring worker started"); + + loop { + interval.tick().await; + // Acquire embedding reference for this cycle + let embedding_guard = truth_state.embedding.read().await; + let embedding = match &*embedding_guard { + Some(e) => e.clone(), + None => { + warn!("Embedding engine not available — skipping truth scoring cycle"); + continue; + } + }; + drop(embedding_guard); + + match truth::worker::run_scoring_cycle( + &truth_state.db, + &embedding, + &truth_config, + ) + .await + { + Ok(scored) if scored > 0 => { + info!("Truth scoring cycle complete: {} memories scored", scored); + } + Ok(_) => {} + Err(err) => { + error!("Truth scoring cycle failed: {:?}", err); + } + } + } + }); + } + + // Create MCP state for SSE transport let mcp_state = McpState::new(state.clone()); diff --git a/src/truth/mod.rs b/src/truth/mod.rs index 3cc2415..0406297 100644 --- a/src/truth/mod.rs +++ b/src/truth/mod.rs @@ -10,7 +10,10 @@ //! truth values from evidence chains. //! - **ECAN** (Economic Attention Network): Manages short-term and long-term //! importance of memories, enabling natural prioritization of verified knowledge. +//! - **Scorer**: Orchestrates PLN and ECAN into a unified scoring pipeline. +//! - **Worker**: Background daemon that periodically scores unscored and stale memories. pub mod ecan; pub mod pln; pub mod scorer; +pub mod worker; diff --git a/src/truth/worker.rs b/src/truth/worker.rs new file mode 100644 index 0000000..2995df3 --- /dev/null +++ b/src/truth/worker.rs @@ -0,0 +1,112 @@ +//! Background truth scoring worker. +//! +//! Periodically fetches unscored and stale memories, runs them through +//! the scoring pipeline (PLN + ECAN + cross-referencing), and writes +//! truth scores back to the database. + +use std::sync::Arc; + +use anyhow::Result; +use tracing::{debug, error, info, warn}; + +use crate::config::TruthConfig; +use crate::db::{Database, TruthScoreUpdate}; +use crate::embedding::EmbeddingEngine; +use crate::truth::ecan::EcanParams; +use crate::truth::scorer::{RelatedMemory, ScorerConfig, score_memory}; + +/// Run a single scoring cycle: fetch candidates, score them, write results. +/// +/// Returns the number of memories scored in this cycle. +pub async fn run_scoring_cycle( + db: &Database, + _embedding: &Arc, + config: &TruthConfig, +) -> Result { + let scorer_config = ScorerConfig { + pln_base_confidence: config.pln_base_confidence, + contradiction_threshold: config.contradiction_threshold, + verification_threshold: config.verification_threshold, + ecan: EcanParams::new(config.ecan_decay_rate, config.ecan_spread_factor), + }; + + let batch_size = config.batch_size as i64; + let rescore_after = config.rescore_after_seconds as i64; + let cross_ref_limit = config.cross_ref_limit as i64; + + // Fetch candidates: unscored first, then stale + let mut candidates = db.get_unscored_memories(batch_size).await?; + let unscored_count = candidates.len(); + + if candidates.len() < batch_size as usize { + let remaining = batch_size - candidates.len() as i64; + let stale = db.get_stale_memories(rescore_after, remaining).await?; + candidates.extend(stale); + } + + if candidates.is_empty() { + debug!("No memories to score this cycle"); + return Ok(0); + } + + info!( + "Scoring {} memories ({} unscored, {} stale)", + candidates.len(), + unscored_count, + candidates.len() - unscored_count + ); + + let mut updates: Vec = Vec::with_capacity(candidates.len()); + + for candidate in &candidates { + // Cross-reference: find related memories using vector similarity + let related_rows = match db + .find_related_memories(&candidate.embedding, candidate.id, cross_ref_limit) + .await + { + Ok(r) => r, + Err(err) => { + warn!( + "Failed to cross-reference memory {}: {:?}", + candidate.id, err + ); + Vec::new() + } + }; + + // Convert DB rows to scorer's RelatedMemory type + let related: Vec = related_rows + .into_iter() + .map(|row| RelatedMemory { + similarity: row.similarity, + content: row.content, + truth_value: row.truth_value, + truth_confidence: row.truth_confidence, + }) + .collect(); + + // Score the memory + let result = score_memory( + &scorer_config, + &candidate.content, + &related, + candidate.ecan_sti, + candidate.ecan_lti, + ); + + updates.push(TruthScoreUpdate { + id: candidate.id, + truth_value: result.truth_value, + truth_confidence: result.truth_confidence, + truth_category: result.category.to_string(), + ecan_sti: result.ecan_sti, + ecan_lti: result.ecan_lti, + }); + } + + // Batch write scores + let count = db.batch_update_truth_scores(&updates).await?; + info!("Updated truth scores for {} memories", count); + + Ok(count) +}