mirror of
https://gitea.ingwaz.work/Ingwaz/openbrain-mcp.git
synced 2026-06-15 22:07:08 +00:00
feat(truth): add background truth scoring worker (#36)
This commit is contained in:
46
src/db.rs
46
src/db.rs
@@ -640,6 +640,52 @@ impl Database {
|
||||
coverage_pct,
|
||||
})
|
||||
}
|
||||
|
||||
/// Find memories related to the given embedding vector, excluding the source memory.
|
||||
/// Used by the truth scoring worker for cross-referencing.
|
||||
pub async fn find_related_memories(
|
||||
&self,
|
||||
candidate_embedding: &[f32],
|
||||
exclude_id: Uuid,
|
||||
limit: i64,
|
||||
) -> Result<Vec<RelatedMemoryRow>> {
|
||||
let vector = pgvector::Vector::from(candidate_embedding.to_vec());
|
||||
let client = self.pool.get().await?;
|
||||
let rows = client
|
||||
.query(
|
||||
r#"
|
||||
SELECT id, content, truth_value, truth_confidence,
|
||||
1 - (embedding <=> $1) AS similarity
|
||||
FROM memories
|
||||
WHERE id != $2
|
||||
AND (expires_at IS NULL OR expires_at > NOW())
|
||||
ORDER BY embedding <=> $1
|
||||
LIMIT $3
|
||||
"#,
|
||||
&[&vector, &exclude_id, &limit],
|
||||
)
|
||||
.await
|
||||
.context("Failed to find related memories")?;
|
||||
|
||||
Ok(rows
|
||||
.iter()
|
||||
.map(|row| RelatedMemoryRow {
|
||||
similarity: row.get("similarity"),
|
||||
content: row.get("content"),
|
||||
truth_value: row.get("truth_value"),
|
||||
truth_confidence: row.get("truth_confidence"),
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
/// A row returned from the related memories query.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RelatedMemoryRow {
|
||||
pub similarity: f64,
|
||||
pub content: String,
|
||||
pub truth_value: Option<f32>,
|
||||
pub truth_confidence: Option<f32>,
|
||||
}
|
||||
|
||||
/// Result for a single batch entry
|
||||
|
||||
65
src/lib.rs
65
src/lib.rs
@@ -143,6 +143,71 @@ pub async fn run_server(config: Config, db: Database) -> Result<()> {
|
||||
});
|
||||
}
|
||||
|
||||
// Spawn truth scoring background worker if enabled
|
||||
if config.truth.enabled {
|
||||
let truth_state = state.clone();
|
||||
let truth_config = config.truth.clone();
|
||||
let scoring_interval = config.truth.scoring_interval_seconds;
|
||||
info!(
|
||||
"Truth scoring enabled (interval={}s, batch={})",
|
||||
scoring_interval, truth_config.batch_size
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(
|
||||
tokio::time::Duration::from_secs(scoring_interval),
|
||||
);
|
||||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
// Wait for the embedding engine to be ready before starting
|
||||
loop {
|
||||
let readiness = truth_state.readiness.read().await;
|
||||
match &*readiness {
|
||||
ReadinessState::Ready => break,
|
||||
ReadinessState::Failed(_) => {
|
||||
error!("Embedding engine failed — truth scoring worker exiting");
|
||||
return;
|
||||
}
|
||||
_ => {
|
||||
drop(readiness);
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("Truth scoring worker started");
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
// Acquire embedding reference for this cycle
|
||||
let embedding_guard = truth_state.embedding.read().await;
|
||||
let embedding = match &*embedding_guard {
|
||||
Some(e) => e.clone(),
|
||||
None => {
|
||||
warn!("Embedding engine not available — skipping truth scoring cycle");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
drop(embedding_guard);
|
||||
|
||||
match truth::worker::run_scoring_cycle(
|
||||
&truth_state.db,
|
||||
&embedding,
|
||||
&truth_config,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(scored) if scored > 0 => {
|
||||
info!("Truth scoring cycle complete: {} memories scored", scored);
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
error!("Truth scoring cycle failed: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
// Create MCP state for SSE transport
|
||||
let mcp_state = McpState::new(state.clone());
|
||||
|
||||
|
||||
@@ -10,7 +10,10 @@
|
||||
//! truth values from evidence chains.
|
||||
//! - **ECAN** (Economic Attention Network): Manages short-term and long-term
|
||||
//! importance of memories, enabling natural prioritization of verified knowledge.
|
||||
//! - **Scorer**: Orchestrates PLN and ECAN into a unified scoring pipeline.
|
||||
//! - **Worker**: Background daemon that periodically scores unscored and stale memories.
|
||||
|
||||
pub mod ecan;
|
||||
pub mod pln;
|
||||
pub mod scorer;
|
||||
pub mod worker;
|
||||
|
||||
112
src/truth/worker.rs
Normal file
112
src/truth/worker.rs
Normal file
@@ -0,0 +1,112 @@
|
||||
//! Background truth scoring worker.
|
||||
//!
|
||||
//! Periodically fetches unscored and stale memories, runs them through
|
||||
//! the scoring pipeline (PLN + ECAN + cross-referencing), and writes
|
||||
//! truth scores back to the database.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::config::TruthConfig;
|
||||
use crate::db::{Database, TruthScoreUpdate};
|
||||
use crate::embedding::EmbeddingEngine;
|
||||
use crate::truth::ecan::EcanParams;
|
||||
use crate::truth::scorer::{RelatedMemory, ScorerConfig, score_memory};
|
||||
|
||||
/// Run a single scoring cycle: fetch candidates, score them, write results.
|
||||
///
|
||||
/// Returns the number of memories scored in this cycle.
|
||||
pub async fn run_scoring_cycle(
|
||||
db: &Database,
|
||||
_embedding: &Arc<EmbeddingEngine>,
|
||||
config: &TruthConfig,
|
||||
) -> Result<usize> {
|
||||
let scorer_config = ScorerConfig {
|
||||
pln_base_confidence: config.pln_base_confidence,
|
||||
contradiction_threshold: config.contradiction_threshold,
|
||||
verification_threshold: config.verification_threshold,
|
||||
ecan: EcanParams::new(config.ecan_decay_rate, config.ecan_spread_factor),
|
||||
};
|
||||
|
||||
let batch_size = config.batch_size as i64;
|
||||
let rescore_after = config.rescore_after_seconds as i64;
|
||||
let cross_ref_limit = config.cross_ref_limit as i64;
|
||||
|
||||
// Fetch candidates: unscored first, then stale
|
||||
let mut candidates = db.get_unscored_memories(batch_size).await?;
|
||||
let unscored_count = candidates.len();
|
||||
|
||||
if candidates.len() < batch_size as usize {
|
||||
let remaining = batch_size - candidates.len() as i64;
|
||||
let stale = db.get_stale_memories(rescore_after, remaining).await?;
|
||||
candidates.extend(stale);
|
||||
}
|
||||
|
||||
if candidates.is_empty() {
|
||||
debug!("No memories to score this cycle");
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
info!(
|
||||
"Scoring {} memories ({} unscored, {} stale)",
|
||||
candidates.len(),
|
||||
unscored_count,
|
||||
candidates.len() - unscored_count
|
||||
);
|
||||
|
||||
let mut updates: Vec<TruthScoreUpdate> = Vec::with_capacity(candidates.len());
|
||||
|
||||
for candidate in &candidates {
|
||||
// Cross-reference: find related memories using vector similarity
|
||||
let related_rows = match db
|
||||
.find_related_memories(&candidate.embedding, candidate.id, cross_ref_limit)
|
||||
.await
|
||||
{
|
||||
Ok(r) => r,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"Failed to cross-reference memory {}: {:?}",
|
||||
candidate.id, err
|
||||
);
|
||||
Vec::new()
|
||||
}
|
||||
};
|
||||
|
||||
// Convert DB rows to scorer's RelatedMemory type
|
||||
let related: Vec<RelatedMemory> = related_rows
|
||||
.into_iter()
|
||||
.map(|row| RelatedMemory {
|
||||
similarity: row.similarity,
|
||||
content: row.content,
|
||||
truth_value: row.truth_value,
|
||||
truth_confidence: row.truth_confidence,
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Score the memory
|
||||
let result = score_memory(
|
||||
&scorer_config,
|
||||
&candidate.content,
|
||||
&related,
|
||||
candidate.ecan_sti,
|
||||
candidate.ecan_lti,
|
||||
);
|
||||
|
||||
updates.push(TruthScoreUpdate {
|
||||
id: candidate.id,
|
||||
truth_value: result.truth_value,
|
||||
truth_confidence: result.truth_confidence,
|
||||
truth_category: result.category.to_string(),
|
||||
ecan_sti: result.ecan_sti,
|
||||
ecan_lti: result.ecan_lti,
|
||||
});
|
||||
}
|
||||
|
||||
// Batch write scores
|
||||
let count = db.batch_update_truth_scores(&updates).await?;
|
||||
info!("Updated truth scores for {} memories", count);
|
||||
|
||||
Ok(count)
|
||||
}
|
||||
Reference in New Issue
Block a user