diff --git a/crates/core/src/ampc/coordinator.rs b/crates/core/src/ampc/coordinator.rs index ad1b6b4a4..37f6a1c30 100644 --- a/crates/core/src/ampc/coordinator.rs +++ b/crates/core/src/ampc/coordinator.rs @@ -24,6 +24,8 @@ use super::{DhtConn, Finisher, Job, JobScheduled, RemoteWorker, Setup, Worker, W use crate::{distributed::retry_strategy::ExponentialBackoff, Result}; use anyhow::anyhow; +/// A coordinator is responsible for scheduling jobs on workers and coordinating +/// between rounds of computation. pub struct Coordinator where J: Job, diff --git a/crates/core/src/ampc/dht/mod.rs b/crates/core/src/ampc/dht/mod.rs index e3910068d..5dc6f0e8e 100644 --- a/crates/core/src/ampc/dht/mod.rs +++ b/crates/core/src/ampc/dht/mod.rs @@ -21,7 +21,7 @@ //! with multiple shards. Each shard cluster //! is a Raft cluster, and each key is then routed to the correct //! cluster based on hash(key) % number_of_shards. The keys -//! are currently *not* rebalanced if the number of shards change, so +//! are currently *not* re-balanced if the number of shards change, so //! if an entire shard becomes unavailable or a new shard is added, all //! keys in the entire DHT is essentially lost as the //! keys might hash incorrectly. diff --git a/crates/core/src/ampc/dht/store.rs b/crates/core/src/ampc/dht/store.rs index 6457e8e7f..cc41a150f 100644 --- a/crates/core/src/ampc/dht/store.rs +++ b/crates/core/src/ampc/dht/store.rs @@ -13,6 +13,7 @@ // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see + use std::collections::BTreeMap; use std::fmt::Debug; use std::io::Cursor; diff --git a/crates/core/src/ampc/finisher.rs b/crates/core/src/ampc/finisher.rs index 72a9dd92e..0fd83c24c 100644 --- a/crates/core/src/ampc/finisher.rs +++ b/crates/core/src/ampc/finisher.rs @@ -16,6 +16,8 @@ use super::prelude::Job; +/// A finisher is responsible for determining if the computation is finished +/// or if another round of computation is needed. pub trait Finisher { type Job: Job; diff --git a/crates/core/src/ampc/mapper.rs b/crates/core/src/ampc/mapper.rs index 6150cb1be..1de4a1667 100644 --- a/crates/core/src/ampc/mapper.rs +++ b/crates/core/src/ampc/mapper.rs @@ -16,6 +16,7 @@ use super::{prelude::Job, DhtConn}; +/// A mapper is the specific computation to be run on the graph. pub trait Mapper: bincode::Encode + bincode::Decode + Send + Sync + Clone { type Job: Job; diff --git a/crates/core/src/ampc/mod.rs b/crates/core/src/ampc/mod.rs index 006377d73..c16ce05a7 100644 --- a/crates/core/src/ampc/mod.rs +++ b/crates/core/src/ampc/mod.rs @@ -14,6 +14,33 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//! # Framework for Adaptive Massively Parallel Computation (AMPC). +//! +//! AMPC is a system for implementing large-scale distributed graph algorithms efficiently. +//! It provides a framework for parallel computation across clusters of machines. +//! +//! While similar in concept to MapReduce, AMPC uses a distributed hash table (DHT) as its +//! underlying data structure rather than the traditional map and reduce phases. This key +//! architectural difference enables more flexible and efficient computation patterns. +//! +//! The main advantage over MapReduce is that workers can dynamically access any keys in +//! the DHT during computation. This is in contrast to MapReduce where the keyspace must +//! be statically partitioned between reducers before computation begins. The dynamic +//! access pattern allows for more natural expression of graph algorithms in a distributed +//! setting. +//! +//! This is roughly inspired by +//! [Massively Parallel Graph Computation: From Theory to Practice](https://research.google/blog/massively-parallel-graph-computation-from-theory-to-practice/) +//! +//! ## Key concepts +//! +//! * **DHT**: A distributed hash table is used to store the result of the computation for +//! each round. +//! * **Worker**: A worker owns a subset of the overall graph and is responsible for +//! executing mappers on its portion of the graph and sending results to the DHT. +//! * **Mapper**: A mapper is the specific computation to be run on the graph. +//! * **Coordinator**: The coordinator is responsible for scheduling the jobs on the workers. + use self::{job::Job, worker::WorkerRef}; use crate::distributed::sonic; diff --git a/crates/core/src/ampc/setup.rs b/crates/core/src/ampc/setup.rs index 14ef5761f..c0bda974a 100644 --- a/crates/core/src/ampc/setup.rs +++ b/crates/core/src/ampc/setup.rs @@ -16,12 +16,24 @@ use super::DhtConn; +/// A setup is responsible for initializing the DHT before each round of computation. pub trait Setup { type DhtTables; + /// Setup initial state of the DHT. fn init_dht(&self) -> DhtConn; + + /// Setup state for a new round. + /// + /// This is called once for each round of computation. + /// The first round will run `setup_first_round` first + /// but will still call `setup_round` after that. #[allow(unused_variables)] // reason = "dht might be used by implementors" fn setup_round(&self, dht: &Self::DhtTables) {} + + /// Setup state for the first round. + /// + /// This is called once before the first round of computation. fn setup_first_round(&self, dht: &Self::DhtTables) { self.setup_round(dht); } diff --git a/crates/core/src/ampc/worker.rs b/crates/core/src/ampc/worker.rs index 469656c0d..5bde34c5c 100644 --- a/crates/core/src/ampc/worker.rs +++ b/crates/core/src/ampc/worker.rs @@ -23,6 +23,8 @@ use crate::Result; use anyhow::anyhow; use tokio::net::ToSocketAddrs; +/// A worker is responsible for executing a mapper on its portion of the graph and +/// sending results to the DHT. pub trait Worker: Send + Sync { type Remote: RemoteWorker; diff --git a/crates/core/src/crawler/mod.rs b/crates/core/src/crawler/mod.rs index 69a25e787..e8aaddf74 100644 --- a/crates/core/src/crawler/mod.rs +++ b/crates/core/src/crawler/mod.rs @@ -14,6 +14,15 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//! # Crawler +//! +//! The crawler is responsible for fetching webpages and storing them in WARC files +//! for later processing. +//! +//! Before starting a crawl, a plan needs to be created. This plan is then used by +//! the crawler coordinator to assign sites to crawl to different workers. +//! A site is only assigned to one worker at a time for politeness. + use std::{collections::VecDeque, future::Future, net::SocketAddr, sync::Arc}; type HashMap = std::collections::HashMap; @@ -35,7 +44,7 @@ pub use router::Router; mod file_queue; pub mod planner; pub mod robot_client; -mod wander_prirotiser; +mod wander_prioritiser; mod warc_writer; mod worker; @@ -304,7 +313,7 @@ impl Crawler { } } -pub trait DatumStream: Send + Sync { +pub trait DatumSink: Send + Sync { fn write(&self, crawl_datum: CrawlDatum) -> impl Future> + Send; fn finish(&self) -> impl Future> + Send; } diff --git a/crates/core/src/crawler/planner.rs b/crates/core/src/crawler/planner.rs index bbe8194af..fa7cdeda9 100644 --- a/crates/core/src/crawler/planner.rs +++ b/crates/core/src/crawler/planner.rs @@ -13,6 +13,7 @@ // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . + use anyhow::{anyhow, Result}; use futures::stream::FuturesOrdered; use futures::StreamExt; @@ -71,6 +72,7 @@ impl From for Url { } } +/// Store urls in groups on disk based on their harmonic rank. struct UrlGrouper { groups: Vec>, folder: std::path::PathBuf, @@ -169,6 +171,7 @@ struct Budget { remaining_schedulable: u64, } +/// Create a crawl plan based on the harmonic rank of the hosts. pub struct CrawlPlanner { host_centrality: Arc>, host_centrality_rank: Arc>, diff --git a/crates/core/src/crawler/robot_client.rs b/crates/core/src/crawler/robot_client.rs index 5fbf3ad6d..5fe44ca11 100644 --- a/crates/core/src/crawler/robot_client.rs +++ b/crates/core/src/crawler/robot_client.rs @@ -45,6 +45,7 @@ pub(super) fn reqwest_client(config: &CrawlerConfig) -> Result .map_err(|e| Error::from(anyhow!(e))) } +/// Reqwest client that respects robots.txt for each request. #[derive(Clone)] pub struct RobotClient { robots_txt_manager: RobotsTxtManager, diff --git a/crates/core/src/crawler/wander_prirotiser.rs b/crates/core/src/crawler/wander_prioritiser.rs similarity index 100% rename from crates/core/src/crawler/wander_prirotiser.rs rename to crates/core/src/crawler/wander_prioritiser.rs diff --git a/crates/core/src/crawler/warc_writer.rs b/crates/core/src/crawler/warc_writer.rs index 7b0a7733b..d3aa1d082 100644 --- a/crates/core/src/crawler/warc_writer.rs +++ b/crates/core/src/crawler/warc_writer.rs @@ -21,7 +21,7 @@ use crate::{ warc, }; -use super::{CrawlDatum, DatumStream, Error, Result}; +use super::{CrawlDatum, DatumSink, Error, Result}; use anyhow::anyhow; /// The WarcWriter is responsible for storing the crawl datums @@ -30,7 +30,7 @@ pub struct WarcWriter { tx: tokio::sync::mpsc::Sender, } -impl DatumStream for WarcWriter { +impl DatumSink for WarcWriter { async fn write(&self, crawl_datum: CrawlDatum) -> Result<()> { self.tx .send(WarcWriterMessage::Crawl(crawl_datum)) diff --git a/crates/core/src/crawler/worker.rs b/crates/core/src/crawler/worker.rs index 76746efae..28f9cc18c 100644 --- a/crates/core/src/crawler/worker.rs +++ b/crates/core/src/crawler/worker.rs @@ -39,8 +39,8 @@ use crate::{ }; use super::{ - encoded_body, robot_client::RobotClient, wander_prirotiser::WanderPrioritiser, CrawlDatum, - DatumStream, Domain, Error, Result, RetrieableUrl, Site, WarcWriter, WeightedUrl, WorkerJob, + encoded_body, robot_client::RobotClient, wander_prioritiser::WanderPrioritiser, CrawlDatum, + DatumSink, Domain, Error, Result, RetrieableUrl, Site, WarcWriter, WeightedUrl, WorkerJob, MAX_CONTENT_LENGTH, MAX_OUTGOING_URLS_PER_PAGE, }; @@ -126,7 +126,8 @@ impl WorkerThread { } } -pub struct JobExecutor { +/// JobExecutor receives a job from the coordinator and crawls the urls in the job. +pub struct JobExecutor { writer: Arc, client: RobotClient, has_gotten_429_response: bool, @@ -144,7 +145,7 @@ pub struct JobExecutor { job: WorkerJob, } -impl JobExecutor { +impl JobExecutor { pub fn new( job: WorkerJob, config: Arc, diff --git a/crates/core/src/entrypoint/configure.rs b/crates/core/src/entrypoint/configure.rs index 6b07e7c28..76c430163 100644 --- a/crates/core/src/entrypoint/configure.rs +++ b/crates/core/src/entrypoint/configure.rs @@ -17,7 +17,7 @@ use tokio::fs::File; use tokio::io; use tokio_stream::StreamExt; -use tracing::{debug, info}; +use tracing::info; use crate::config::{ defaults, IndexerConfig, IndexerDualEncoderConfig, IndexerGraphConfig, LocalConfig, @@ -73,7 +73,7 @@ fn download_files() { } fn build_spellchecker() -> Result<()> { - debug!("Building spellchecker"); + info!("Building spellchecker"); let spellchecker_path = Path::new(DATA_PATH).join("web_spell"); if !spellchecker_path.exists() { @@ -97,7 +97,7 @@ fn build_spellchecker() -> Result<()> { } fn create_webgraph() -> Result<()> { - debug!("Creating webgraph"); + info!("Creating webgraph"); let out_path = Path::new(DATA_PATH).join("webgraph"); if out_path.exists() { @@ -128,7 +128,7 @@ fn create_webgraph() -> Result<()> { } fn calculate_centrality() { - debug!("Calculating centrality"); + info!("Calculating centrality"); let webgraph_path = Path::new(DATA_PATH).join("webgraph"); let out_path = Path::new(DATA_PATH).join("centrality"); @@ -144,7 +144,7 @@ fn calculate_centrality() { } fn create_inverted_index() -> Result<()> { - debug!("Creating inverted index"); + info!("Creating inverted index"); let out_path = Path::new(DATA_PATH).join("index"); if out_path.exists() { @@ -209,6 +209,7 @@ fn create_inverted_index() -> Result<()> { } fn create_entity_index() -> Result<()> { + info!("Creating entity index"); let out_path = Path::new(DATA_PATH).join("entity"); if out_path.exists() { std::fs::remove_dir_all(&out_path)?; diff --git a/crates/core/src/entrypoint/search_server.rs b/crates/core/src/entrypoint/search_server.rs index 9804e74c4..c166216ca 100644 --- a/crates/core/src/entrypoint/search_server.rs +++ b/crates/core/src/entrypoint/search_server.rs @@ -137,7 +137,7 @@ impl_search!([ ]); pub struct SearchService { - local_searcher: LocalSearcher>>, + local_searcher: LocalSearcher, // dropping the handle leaves the cluster #[allow(unused)] cluster_handle: Arc, diff --git a/crates/core/src/generic_query/get_homepage.rs b/crates/core/src/generic_query/get_homepage.rs index 3fd5b92d5..8e84c66d5 100644 --- a/crates/core/src/generic_query/get_homepage.rs +++ b/crates/core/src/generic_query/get_homepage.rs @@ -72,7 +72,7 @@ impl GenericQuery for GetHomepageQuery { FirstDocCollector::with_shard_id(ctx.shard_id) } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { FirstDocCollector::without_shard_id() } diff --git a/crates/core/src/generic_query/get_site_urls.rs b/crates/core/src/generic_query/get_site_urls.rs index fe9c754f5..56dee00a1 100644 --- a/crates/core/src/generic_query/get_site_urls.rs +++ b/crates/core/src/generic_query/get_site_urls.rs @@ -65,7 +65,7 @@ impl GenericQuery for GetSiteUrlsQuery { .disable_offset() } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { Self::Collector::new() .with_limit(self.limit as usize) .with_offset(self.offset.unwrap_or(0) as usize) diff --git a/crates/core/src/generic_query/get_webpage.rs b/crates/core/src/generic_query/get_webpage.rs index 25ebdcc14..a20c5e62d 100644 --- a/crates/core/src/generic_query/get_webpage.rs +++ b/crates/core/src/generic_query/get_webpage.rs @@ -68,7 +68,7 @@ impl GenericQuery for GetWebpageQuery { FirstDocCollector::with_shard_id(ctx.shard_id) } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { FirstDocCollector::without_shard_id() } diff --git a/crates/core/src/generic_query/mod.rs b/crates/core/src/generic_query/mod.rs index ccfb382ca..6a5522e41 100644 --- a/crates/core/src/generic_query/mod.rs +++ b/crates/core/src/generic_query/mod.rs @@ -14,6 +14,26 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//! # Main flow +//! ```md +//! `coordinator` <------> `searcher` +//! ----------------------------------- +//! send query to searcher +//! search index +//! collect fruits +//! send fruits to coordinator +//! merge fruits +//! filter fruits +//! for each shard +//! send fruits to searchers +//! construct intermediate output +//! from fruits +//! send intermediate output to coordinator +//! merge intermediate outputs +//! return final output +//! --------------------------------------------------- +//! ``` + use crate::{inverted_index::ShardId, search_ctx, Result}; pub mod top_key_phrases; @@ -34,6 +54,8 @@ pub use get_site_urls::GetSiteUrlsQuery; pub mod collector; pub use collector::Collector; +/// A generic query that can be executed on a searcher +/// against an index. pub trait GenericQuery: Send + Sync + bincode::Encode + bincode::Decode + Clone { type Collector: Collector; type TantivyQuery: tantivy::query::Query; @@ -42,7 +64,7 @@ pub trait GenericQuery: Send + Sync + bincode::Encode + bincode::Decode + Clone fn tantivy_query(&self, ctx: &search_ctx::Ctx) -> Self::TantivyQuery; fn collector(&self, ctx: &search_ctx::Ctx) -> Self::Collector; - fn remote_collector(&self) -> Self::Collector; + fn coordinator_collector(&self) -> Self::Collector; fn filter_fruit_shards( &self, diff --git a/crates/core/src/generic_query/size.rs b/crates/core/src/generic_query/size.rs index 90988c0a9..e05ff3524 100644 --- a/crates/core/src/generic_query/size.rs +++ b/crates/core/src/generic_query/size.rs @@ -70,7 +70,7 @@ impl GenericQuery for SizeQuery { SizeCollector::new().with_shard_id(ctx.shard_id) } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { SizeCollector::new() } diff --git a/crates/core/src/generic_query/top_key_phrases.rs b/crates/core/src/generic_query/top_key_phrases.rs index ccf9e1f6a..7f6a57958 100644 --- a/crates/core/src/generic_query/top_key_phrases.rs +++ b/crates/core/src/generic_query/top_key_phrases.rs @@ -47,7 +47,7 @@ impl GenericQuery for TopKeyPhrasesQuery { TopKeyPhrasesCollector::new(self.top_n).with_shard_id(ctx.shard_id) } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { TopKeyPhrasesCollector::new(self.top_n) } diff --git a/crates/core/src/index.rs b/crates/core/src/index.rs index 793fa02fa..b57cf8bdb 100644 --- a/crates/core/src/index.rs +++ b/crates/core/src/index.rs @@ -51,6 +51,14 @@ impl Index { }) } + pub fn inverted_index(&self) -> &InvertedIndex { + &self.inverted_index + } + + pub fn region_count(&self) -> &Mutex { + &self.region_count + } + pub fn path(&self) -> PathBuf { PathBuf::from(&self.path) } diff --git a/crates/core/src/inverted_index/search.rs b/crates/core/src/inverted_index/search.rs index 298073d61..e12732342 100644 --- a/crates/core/src/inverted_index/search.rs +++ b/crates/core/src/inverted_index/search.rs @@ -341,7 +341,9 @@ impl InvertedIndex { From<::Fruit>, { let fruit = self.search_initial_generic(query)?; - let mut fruit = query.remote_collector().merge_fruits(vec![fruit.into()])?; + let mut fruit = query + .coordinator_collector() + .merge_fruits(vec![fruit.into()])?; if let Some(shard_id) = self.shard_id { fruit = query.filter_fruit_shards(shard_id, fruit); diff --git a/crates/core/src/live_index/crawler/crawlable_site.rs b/crates/core/src/live_index/crawler/crawlable_site.rs index 9ccddb343..de37fc467 100644 --- a/crates/core/src/live_index/crawler/crawlable_site.rs +++ b/crates/core/src/live_index/crawler/crawlable_site.rs @@ -136,7 +136,7 @@ impl CrawlableSite { } } -impl crawler::DatumStream for tokio::sync::Mutex> { +impl crawler::DatumSink for tokio::sync::Mutex> { async fn write(&self, crawl_datum: crawler::CrawlDatum) -> Result<(), crawler::Error> { self.lock().await.push(crawl_datum); Ok(()) diff --git a/crates/core/src/main.rs b/crates/core/src/main.rs index 11ccdda0b..f636ab0dc 100644 --- a/crates/core/src/main.rs +++ b/crates/core/src/main.rs @@ -66,20 +66,14 @@ enum Commands { }, /// Deploy the search server. - SearchServer { - config_path: String, - }, + SearchServer { config_path: String }, /// Deploy the entity search server. - EntitySearchServer { - config_path: String, - }, + EntitySearchServer { config_path: String }, /// Deploy the json http api. The api interacts with /// the search servers, webgraph servers etc. to provide the necesarry functionality. - Api { - config_path: String, - }, + Api { config_path: String }, /// Deploy the crawler. Crawler { @@ -102,23 +96,19 @@ enum Commands { ml: bool, }, - // Commands for the live index. + /// Commands for the live index. LiveIndex { #[clap(subcommand)] options: LiveIndex, }, - // Build spell correction model. - WebSpell { - config_path: String, - }, + /// Build spell correction model. + WebSpell { config_path: String }, - // Compute statistics for sites. - SiteStats { - config_path: String, - }, + /// Compute statistics for sites. + SiteStats { config_path: String }, - // Commands to compute distributed graph algorithms. + /// Commands to compute distributed graph algorithms. Ampc { #[clap(subcommand)] options: AmpcOptions, @@ -155,20 +145,22 @@ enum AmpcOptions { #[derive(Subcommand)] enum AdminOptions { - Init { - host: SocketAddr, - }, + /// Create the admin config file. Run this before any other admin commands so the client knows where to connect. + Init { host: SocketAddr }, + + /// Print the reachable cluster members and which service they are running. Status, - TopKeyphrases { - top: usize, - }, + /// Export the top most common phrases in the index. + TopKeyphrases { top: usize }, + + /// Get statistics about the index. #[clap(subcommand)] - Index(AdminIndexOptions), + IndexStats(AdminIndexStatsOptions), } #[derive(Subcommand)] -enum AdminIndexOptions { +enum AdminIndexStatsOptions { /// Get the size of the index Size, } @@ -251,9 +243,7 @@ enum WebgraphOptions { #[derive(Subcommand)] enum IndexingOptions { /// Create the search index. - Search { - config_path: String, - }, + Search { config_path: String }, /// Merge multiple search indexes into a single index. MergeSearch { @@ -267,10 +257,8 @@ enum IndexingOptions { output_path: String, }, - // Create an index of canonical urls. - Canonical { - config_path: String, - }, + /// Create an index of canonical urls. + Canonical { config_path: String }, } fn load_toml_config>(path: P) -> T { @@ -511,8 +499,8 @@ fn main() -> Result<()> { .block_on(entrypoint::admin::top_keyphrases(top))?; } - AdminOptions::Index(index_options) => match index_options { - AdminIndexOptions::Size => { + AdminOptions::IndexStats(index_options) => match index_options { + AdminIndexStatsOptions::Size => { tokio::runtime::Builder::new_current_thread() .enable_all() .build()? diff --git a/crates/core/src/ranking/computer/mod.rs b/crates/core/src/ranking/computer/mod.rs index 86e2edf4b..5dc99d6d4 100644 --- a/crates/core/src/ranking/computer/mod.rs +++ b/crates/core/src/ranking/computer/mod.rs @@ -14,6 +14,19 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//! The ranking computer is responsible for computing the core ranking signals for +//! each potential page in the result set. This module handles the initial ranking phase +//! that runs independently on each search node in the distributed search cluster. +//! +//! The computer evaluates a set of core ranking signals for each candidate page, +//! including text-based relevance scores like BM25 and authority scores (harmonic centrality). +//! These signals are combined using a linear model to produce an initial ranking score. +//! The top pages are passed to the coordinator node for the final ranking phase. +//! +//! The core signals computed here are designed to be fast to calculate while still +//! providing strong relevance signals. More expensive ranking features are deferred +//! to the final ranking phase on the coordinator. + use crate::query::optic::AsSearchableRule; use crate::query::{Query, MAX_TERMS_FOR_NGRAM_LOOKUPS}; use crate::ranking::bm25f::MultiBm25FWeight; diff --git a/crates/core/src/ranking/mod.rs b/crates/core/src/ranking/mod.rs index b402fdced..4f8e0e8b1 100644 --- a/crates/core/src/ranking/mod.rs +++ b/crates/core/src/ranking/mod.rs @@ -14,6 +14,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//! The ranking module is responsible for ranking pages based on their relevance to a query. +//! +//! The core ranking signals are computed by the `computer` module, which runs independently +//! on each search shard in the search cluster. Increasingly complex stages +//! run in the ranking pipeline on the coordinator node to produce the final ranking. + pub mod bitvec_similarity; pub mod bm25; pub mod bm25f; diff --git a/crates/core/src/ranking/models/cross_encoder.rs b/crates/core/src/ranking/models/cross_encoder.rs index 8cf958435..e829e97e8 100644 --- a/crates/core/src/ranking/models/cross_encoder.rs +++ b/crates/core/src/ranking/models/cross_encoder.rs @@ -1,5 +1,5 @@ // Stract is an open source web search engine. -// Copyright (C) 2023 Stract ApS +// Copyright (C) 2024 Stract ApS // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as @@ -29,6 +29,9 @@ use crate::models::bert::BertModel; const TRUNCATE_INPUT: usize = 128; +/// A cross-encoder model for ranking pages. +/// +/// Takes a query and a page body as input and returns a score for the page. pub struct CrossEncoderModel { tokenizer: tokenizers::Tokenizer, encoder: BertModel, diff --git a/crates/core/src/ranking/models/lambdamart.rs b/crates/core/src/ranking/models/lambdamart.rs index 0ec3b472f..db14ae4e2 100644 --- a/crates/core/src/ranking/models/lambdamart.rs +++ b/crates/core/src/ranking/models/lambdamart.rs @@ -1,5 +1,5 @@ // Stract is an open source web search engine. -// Copyright (C) 2023 Stract ApS +// Copyright (C) 2024 Stract ApS // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as @@ -240,6 +240,9 @@ impl Header { } } +/// A LambdaMART model for ranking pages. +/// +/// Designed for efficient inference of lightgbm compatible models. pub struct LambdaMART { trees: Vec, } diff --git a/crates/core/src/ranking/models/linear.rs b/crates/core/src/ranking/models/linear.rs index 0e5c63e67..cb550770e 100644 --- a/crates/core/src/ranking/models/linear.rs +++ b/crates/core/src/ranking/models/linear.rs @@ -1,5 +1,5 @@ // Stract is an open source web search engine. -// Copyright (C) 2023 Stract ApS +// Copyright (C) 2024 Stract ApS // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as diff --git a/crates/core/src/ranking/pipeline/mod.rs b/crates/core/src/ranking/pipeline/mod.rs index 6a25a08f4..78f137969 100644 --- a/crates/core/src/ranking/pipeline/mod.rs +++ b/crates/core/src/ranking/pipeline/mod.rs @@ -68,10 +68,10 @@ impl StageOrModifier where T: RankableWebpage + Send + Sync, { - fn top_n(&self) -> Top { + fn top(&self) -> Top { match self { - StageOrModifier::Stage(stage) => stage.top_n(), - StageOrModifier::Modifier(modifier) => modifier.top_n(), + StageOrModifier::Stage(stage) => stage.top(), + StageOrModifier::Modifier(modifier) => modifier.top(), } } @@ -139,7 +139,7 @@ where let coefficients = query.signal_coefficients(); for stage_or_modifier in self.stages_or_modifiers.iter() { - let webpages = if let Top::Limit(top_n) = stage_or_modifier.top_n() { + let webpages = if let Top::Limit(top_n) = stage_or_modifier.top() { if query.offset() > top_n { continue; } diff --git a/crates/core/src/ranking/pipeline/modifiers/mod.rs b/crates/core/src/ranking/pipeline/modifiers/mod.rs index fa481e3ce..3809a4525 100644 --- a/crates/core/src/ranking/pipeline/modifiers/mod.rs +++ b/crates/core/src/ranking/pipeline/modifiers/mod.rs @@ -14,28 +14,48 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//! Modifiers are used to modify the ranking of pages. +//! +//! Each page is ranked by a linear combination of the signals like +//! `score = boost * (signal_1 * weight_1 + signal_2 * weight_2 + ...)` +//! +//! Modifiers can either modify the multiplicative boost factor for +//! each page or override the ranking entirely (if we want to rank +//! for something other than the score). + mod inbound_similarity; use super::{RankableWebpage, Top}; pub use inbound_similarity::InboundSimilarity; +/// A modifier that gives full control over the ranking. pub trait FullModifier: Send + Sync { type Webpage: RankableWebpage; + /// Modify the boost factor for each page. fn update_boosts(&self, webpages: &mut [Self::Webpage]); + /// Override ranking of the pages. fn rank(&self, webpages: &mut [Self::Webpage]) { webpages.sort_by(|a, b| b.score().partial_cmp(&a.score()).unwrap()); } - fn top_n(&self) -> Top { + /// The number of pages to return from this part of the pipeline. + fn top(&self) -> Top { Top::Unlimited } } +/// A modifier that modifies the multiplicative boost factor for each page. +/// +/// This is the most common type of modifier. pub trait Modifier: Send + Sync { type Webpage: RankableWebpage; + /// Modify the boost factor for a page. + /// + /// The new boost factor will be multiplied with the page's current boost factor. fn boost(&self, webpage: &Self::Webpage) -> f64; + /// The number of pages to return from this part of the pipeline. fn top(&self) -> Top { Top::Unlimited } @@ -54,7 +74,7 @@ where } } - fn top_n(&self) -> Top { + fn top(&self) -> Top { Modifier::top(self) } } diff --git a/crates/core/src/ranking/pipeline/scorers/lambdamart.rs b/crates/core/src/ranking/pipeline/scorers/lambdamart.rs index fa4932a94..9f039efac 100644 --- a/crates/core/src/ranking/pipeline/scorers/lambdamart.rs +++ b/crates/core/src/ranking/pipeline/scorers/lambdamart.rs @@ -36,7 +36,7 @@ impl RankingStage for Arc { ) } - fn top_n(&self) -> Top { + fn top(&self) -> Top { Top::Limit(20) } } @@ -59,7 +59,7 @@ impl RankingStage for PrecisionLambda { ) } - fn top_n(&self) -> Top { + fn top(&self) -> Top { Top::Limit(20) } } diff --git a/crates/core/src/ranking/pipeline/scorers/mod.rs b/crates/core/src/ranking/pipeline/scorers/mod.rs index bc561e524..2365397f7 100644 --- a/crates/core/src/ranking/pipeline/scorers/mod.rs +++ b/crates/core/src/ranking/pipeline/scorers/mod.rs @@ -14,6 +14,10 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//! Scorers are used to compute the ranking signals in the ranking pipeline. +//! +//! Each scorer computes a single signal which is then used to rank the pages. + pub mod embedding; pub mod inbound_similarity; pub mod lambdamart; @@ -26,14 +30,23 @@ use crate::ranking::{SignalCalculation, SignalCoefficients, SignalEnum}; use super::{RankableWebpage, Top}; +/// A ranking stage that computes some signals for each page. +/// +/// This trait is implemented for all scorers. +/// Most of the time you will want to implement the [`RankingStage`] trait instead, +/// but this trait gives you more control over the ranking pipeline. pub trait FullRankingStage: Send + Sync { type Webpage: RankableWebpage; + /// Compute the signal for each page. fn compute(&self, webpages: &mut [Self::Webpage]); - fn top_n(&self) -> Top { + + /// The number of pages to return from this part of the pipeline. + fn top(&self) -> Top { Top::Unlimited } + /// Update the score for each page. fn update_scores(&self, webpages: &mut [Self::Webpage], coefficients: &SignalCoefficients) { for webpage in webpages.iter_mut() { webpage.set_raw_score(webpage.signals().iter().fold(0.0, |acc, (signal, calc)| { @@ -42,16 +55,21 @@ pub trait FullRankingStage: Send + Sync { } } + /// Rank the pages by their score. fn rank(&self, webpages: &mut [Self::Webpage]) { webpages.sort_by(|a, b| b.score().partial_cmp(&a.score()).unwrap()); } } +/// A ranking stage that computes a single signal for each page. pub trait RankingStage: Send + Sync { type Webpage: RankableWebpage; + /// Compute the signal for a single page. fn compute(&self, webpage: &Self::Webpage) -> (SignalEnum, SignalCalculation); - fn top_n(&self) -> Top { + + /// The number of pages to return from this part of the pipeline. + fn top(&self) -> Top { Top::Unlimited } } @@ -69,7 +87,7 @@ where } } - fn top_n(&self) -> Top { - self.top_n() + fn top(&self) -> Top { + self.top() } } diff --git a/crates/core/src/ranking/pipeline/scorers/reranker.rs b/crates/core/src/ranking/pipeline/scorers/reranker.rs index 6caac87d3..d0bb0ebf8 100644 --- a/crates/core/src/ranking/pipeline/scorers/reranker.rs +++ b/crates/core/src/ranking/pipeline/scorers/reranker.rs @@ -68,7 +68,7 @@ impl FullRankingStage for ReRanker { self.crossencoder_score_webpages(webpages); } - fn top_n(&self) -> Top { + fn top(&self) -> Top { Top::Limit(20) } } diff --git a/crates/core/src/ranking/pipeline/stages/precision.rs b/crates/core/src/ranking/pipeline/stages/precision.rs index 59847eaa0..b336c8a6d 100644 --- a/crates/core/src/ranking/pipeline/stages/precision.rs +++ b/crates/core/src/ranking/pipeline/stages/precision.rs @@ -14,6 +14,11 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//! The precision stage of the ranking pipeline. +//! +//! This stage focusses on refining the first page of results +//! from the recall stage. + use std::sync::Arc; use crate::{ diff --git a/crates/core/src/ranking/pipeline/stages/recall.rs b/crates/core/src/ranking/pipeline/stages/recall.rs index 688af1694..897012af8 100644 --- a/crates/core/src/ranking/pipeline/stages/recall.rs +++ b/crates/core/src/ranking/pipeline/stages/recall.rs @@ -14,6 +14,10 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//! The recall stage of the ranking pipeline. +//! +//! This stage focusses on getting the best pages into the precision stage. + use std::sync::Arc; use crate::{ diff --git a/crates/core/src/searcher/distributed.rs b/crates/core/src/searcher/distributed.rs index 2978bbfd2..b758b5ffb 100644 --- a/crates/core/src/searcher/distributed.rs +++ b/crates/core/src/searcher/distributed.rs @@ -31,7 +31,6 @@ use crate::{ entity_search_server, live_index::LiveIndexService, search_server::{self, RetrieveReq, SearchService} }, generic_query::{self, Collector}, - index::Index, inverted_index::{RetrievedWebpage, ShardId, WebpagePointer}, ranking::pipeline::{PrecisionRankingWebpage, RecallRankingWebpage}, Result, @@ -44,7 +43,7 @@ use futures::{future::join_all, stream::FuturesUnordered, StreamExt}; use itertools::Itertools; use std::future::Future; use thiserror::Error; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::Mutex; use super::{InitialWebsiteResult, LocalSearcher, SearchQuery}; @@ -284,6 +283,7 @@ impl ReusableClientManager for LiveIndexService { } } +/// A searcher that runs the search on a remote cluster. pub struct DistributedSearcher { client: Mutex>, } @@ -414,7 +414,7 @@ impl SearchClient for DistributedSearcher { >: From<>::Response>, <::Child as tantivy::collector::SegmentCollector>::Fruit: From<::Fruit> { - let collector = query.remote_collector(); + let collector = query.coordinator_collector(); let res = self.conn().await .send(query, &AllShardsSelector, &RandomReplicaSelector) @@ -518,7 +518,7 @@ impl SearchClient for DistributedSearcher { queries .iter() .zip_eq(fruits.into_iter()) - .map(|(query, shard_fruits)| query.remote_collector().merge_fruits(shard_fruits)) + .map(|(query, shard_fruits)| query.coordinator_collector().merge_fruits(shard_fruits)) .collect::, _>>() } @@ -584,9 +584,9 @@ impl SearchClient for DistributedSearcher { } /// This should only be used for testing and benchmarks. -pub struct LocalSearchClient(LocalSearcher>>); -impl From>>> for LocalSearchClient { - fn from(searcher: LocalSearcher>>) -> Self { +pub struct LocalSearchClient(LocalSearcher); +impl From for LocalSearchClient { + fn from(searcher: LocalSearcher) -> Self { Self(searcher) } } diff --git a/crates/core/src/searcher/local/guard.rs b/crates/core/src/searcher/local/guard.rs deleted file mode 100644 index 2b7712746..000000000 --- a/crates/core/src/searcher/local/guard.rs +++ /dev/null @@ -1,33 +0,0 @@ -// Stract is an open source web search engine. -// Copyright (C) 2024 Stract ApS -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use tokio::sync::OwnedRwLockReadGuard; - -use crate::index::Index; -use crate::inverted_index::InvertedIndex; - -pub trait ReadGuard: Send + Sync { - fn search_index(&self) -> &Index; - fn inverted_index(&self) -> &InvertedIndex { - &self.search_index().inverted_index - } -} - -impl ReadGuard for OwnedRwLockReadGuard { - fn search_index(&self) -> &Index { - self - } -} diff --git a/crates/core/src/searcher/local/inner.rs b/crates/core/src/searcher/local/inner.rs index f3ef0947a..62bb66ebf 100644 --- a/crates/core/src/searcher/local/inner.rs +++ b/crates/core/src/searcher/local/inner.rs @@ -16,32 +16,31 @@ use crate::{ generic_query::{self, GenericQuery}, + index::Index, inverted_index, ranking::{LocalRanker, SignalComputer}, searcher::InitialWebsiteResult, Result, }; use std::sync::Arc; +use tokio::sync::{OwnedRwLockReadGuard, RwLock}; use crate::{ config::CollectorConfig, models::dual_encoder::DualEncoder, query::Query, ranking::models::linear::LinearRegression, search_ctx::Ctx, searcher::SearchQuery, }; -use super::{InvertedIndexResult, ReadGuard, SearchableIndex}; +use super::InvertedIndexResult; -pub struct InnerLocalSearcher { - index: I, +pub struct InnerLocalSearcher { + index: Arc>, linear_regression: Option>, dual_encoder: Option>, collector_config: CollectorConfig, } -impl InnerLocalSearcher -where - I: SearchableIndex, -{ - pub fn new(index: I) -> Self { +impl InnerLocalSearcher { + pub fn new(index: Arc>) -> Self { Self { index, linear_regression: None, @@ -50,8 +49,8 @@ where } } - pub async fn guard(&self) -> I::ReadGuard { - self.index.read_guard().await + pub async fn guard(&self) -> OwnedRwLockReadGuard { + self.index.clone().read_owned().await } pub fn set_linear_model(&mut self, model: LinearRegression) { @@ -66,19 +65,19 @@ where self.collector_config = config; } - fn parse_query( + fn parse_query( &self, ctx: &Ctx, - guard: &G, + guard: &OwnedRwLockReadGuard, query: &SearchQuery, ) -> Result { Query::parse(ctx, query, guard.inverted_index()) } - fn ranker( + fn ranker( &self, query: &Query, - guard: &G, + guard: &OwnedRwLockReadGuard, de_rank_similar: bool, computer: SignalComputer, ) -> Result { @@ -99,10 +98,10 @@ where .with_offset(query.offset())) } - fn search_inverted_index( + fn search_inverted_index( &self, ctx: &Ctx, - guard: &G, + guard: &OwnedRwLockReadGuard, query: &SearchQuery, de_rank_similar: bool, ) -> Result { @@ -112,8 +111,7 @@ where computer.set_region_count( guard - .search_index() - .region_count + .region_count() .lock() .unwrap_or_else(|e| e.into_inner()) .clone(), @@ -149,7 +147,7 @@ where pub fn search_initial( &self, query: &SearchQuery, - guard: &I::ReadGuard, + guard: &OwnedRwLockReadGuard, de_rank_similar: bool, ) -> Result { let query = query.clone(); @@ -168,7 +166,7 @@ where &self, websites: &[inverted_index::WebpagePointer], query: &str, - guard: &I::ReadGuard, + guard: &OwnedRwLockReadGuard, ) -> Result> { let ctx = guard.inverted_index().local_search_ctx(); let query = SearchQuery { @@ -183,7 +181,7 @@ where pub fn search_initial_generic( &self, query: &Q, - guard: &I::ReadGuard, + guard: &OwnedRwLockReadGuard, ) -> Result<::Fruit> { guard.inverted_index().search_initial_generic(query) } @@ -192,7 +190,7 @@ where &self, query: &Q, fruit: ::Fruit, - guard: &I::ReadGuard, + guard: &OwnedRwLockReadGuard, ) -> Result { guard.inverted_index().retrieve_generic(query, fruit) } @@ -200,7 +198,7 @@ where pub fn search_generic( &self, query: Q, - guard: &I::ReadGuard, + guard: &OwnedRwLockReadGuard, ) -> Result { let fruit = self.search_initial_generic(&query, guard)?; Ok(Q::merge_results(vec![ diff --git a/crates/core/src/searcher/local/mod.rs b/crates/core/src/searcher/local/mod.rs index 8b7439918..f787ad80e 100644 --- a/crates/core/src/searcher/local/mod.rs +++ b/crates/core/src/searcher/local/mod.rs @@ -14,15 +14,13 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -mod guard; -use guard::ReadGuard; +//! The local searcher runs the search against a local index. mod inner; use inner::InnerLocalSearcher; -use tokio::sync::{OwnedRwLockReadGuard, RwLock}; +use tokio::sync::RwLock; use std::collections::HashMap; -use std::future::Future; use std::sync::Arc; use itertools::Itertools; @@ -44,29 +42,12 @@ use crate::{inverted_index, Result}; use super::WebsitesResult; use super::{InitialWebsiteResult, SearchQuery}; -pub trait SearchableIndex: Send + Sync + 'static { - type ReadGuard: ReadGuard; - - fn read_guard(&self) -> impl Future; -} - -impl SearchableIndex for Arc> { - type ReadGuard = OwnedRwLockReadGuard; - - async fn read_guard(&self) -> Self::ReadGuard { - self.clone().read_owned().await - } -} - -pub struct LocalSearcherBuilder { - inner: InnerLocalSearcher, +pub struct LocalSearcherBuilder { + inner: InnerLocalSearcher, } -impl LocalSearcherBuilder -where - I: SearchableIndex, -{ - pub fn new(index: I) -> Self { +impl LocalSearcherBuilder { + pub fn new(index: Arc>) -> Self { Self { inner: InnerLocalSearcher::new(index), } @@ -87,22 +68,19 @@ where self } - pub fn build(self) -> LocalSearcher { + pub fn build(self) -> LocalSearcher { LocalSearcher { inner: Arc::new(self.inner), } } } -pub struct LocalSearcher { - inner: Arc>, +pub struct LocalSearcher { + inner: Arc, } -impl LocalSearcher -where - I: SearchableIndex, -{ - pub fn builder(index: I) -> LocalSearcherBuilder { +impl LocalSearcher { + pub fn builder(index: Arc>) -> LocalSearcherBuilder { LocalSearcherBuilder::new(index) } @@ -203,7 +181,7 @@ where }) } - /// This function is mainly used for tests and benchmarks + /// This function is only used for tests and benchmarks pub fn search_sync(&self, query: &SearchQuery) -> Result { crate::block_on(self.search(query)) } diff --git a/crates/core/src/searcher/mod.rs b/crates/core/src/searcher/mod.rs index 83ac21480..3c31f3f05 100644 --- a/crates/core/src/searcher/mod.rs +++ b/crates/core/src/searcher/mod.rs @@ -14,6 +14,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//! Searchers are responsible for executing search queries against an index. +//! There are two types of searchers: +//! - [`local::LocalSearcher`] which runs the search on the local machine. +//! - [`distributed::DistributedSearcher`] which runs the search on a remote cluster. Each node +//! will run a local searcher and then the results are merged on the coordinator node. + pub mod api; pub mod distributed; pub mod local; diff --git a/crates/core/src/webgraph/query/backlink.rs b/crates/core/src/webgraph/query/backlink.rs index dce2e583f..77a5e7912 100644 --- a/crates/core/src/webgraph/query/backlink.rs +++ b/crates/core/src/webgraph/query/backlink.rs @@ -182,7 +182,7 @@ impl Query for BacklinksQuery { collector } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { TopDocsCollector::from(self.limit).enable_offset() } @@ -314,7 +314,7 @@ impl Query for HostBacklinksQuery { collector } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { TopDocsCollector::from(self.limit) .enable_offset() .with_deduplicator(HostDeduplicator) @@ -467,7 +467,7 @@ impl Query for FullBacklinksQuery { collector } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { TopDocsCollector::from(self.limit).enable_offset() } @@ -617,7 +617,7 @@ impl Query for FullHostBacklinksQuery { collector } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { TopDocsCollector::from(self.limit) .enable_offset() .with_deduplicator(HostDeduplicator) @@ -742,7 +742,7 @@ impl Query for BacklinksWithLabelsQuery { collector } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { TopDocsCollector::from(self.limit).enable_offset() } diff --git a/crates/core/src/webgraph/query/between.rs b/crates/core/src/webgraph/query/between.rs index 90a35ca74..2577f9da8 100644 --- a/crates/core/src/webgraph/query/between.rs +++ b/crates/core/src/webgraph/query/between.rs @@ -113,7 +113,7 @@ impl Query for FullLinksBetweenQuery { collector } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { TopDocsCollector::from(self.limit).enable_offset() } diff --git a/crates/core/src/webgraph/query/degree.rs b/crates/core/src/webgraph/query/degree.rs index 8a021c9bb..d84d2b42b 100644 --- a/crates/core/src/webgraph/query/degree.rs +++ b/crates/core/src/webgraph/query/degree.rs @@ -53,7 +53,7 @@ impl Query for InDegreeQuery { .with_shard_id(searcher.shard()) } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { FastCountCollector::new( ToId.name().to_string(), FastCountValue::U128(self.node.as_u128()), diff --git a/crates/core/src/webgraph/query/forwardlink.rs b/crates/core/src/webgraph/query/forwardlink.rs index f9d187852..d1ca759ca 100644 --- a/crates/core/src/webgraph/query/forwardlink.rs +++ b/crates/core/src/webgraph/query/forwardlink.rs @@ -135,7 +135,7 @@ impl Query for ForwardlinksQuery { collector } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { TopDocsCollector::from(self.limit).enable_offset() } @@ -286,7 +286,7 @@ impl Query for HostForwardlinksQuery { collector } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { TopDocsCollector::from(self.limit) .enable_offset() .with_deduplicator(HostDeduplicator) @@ -437,7 +437,7 @@ impl Query for FullForwardlinksQuery { collector } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { TopDocsCollector::from(self.limit).enable_offset() } @@ -576,7 +576,7 @@ impl Query for FullHostForwardlinksQuery { collector } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { TopDocsCollector::from(self.limit) .enable_offset() .with_deduplicator(HostDeduplicator) diff --git a/crates/core/src/webgraph/query/group_by.rs b/crates/core/src/webgraph/query/group_by.rs index 5f751625f..03d723102 100644 --- a/crates/core/src/webgraph/query/group_by.rs +++ b/crates/core/src/webgraph/query/group_by.rs @@ -157,7 +157,7 @@ impl Query for HostGroupSketchQuery { collector } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { GroupSketchCollector::new(self.group, self.value) } @@ -306,7 +306,7 @@ impl Query for HostGroupQuery { collector } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { GroupExactCollector::new(self.group, self.value) } diff --git a/crates/core/src/webgraph/query/id2node.rs b/crates/core/src/webgraph/query/id2node.rs index 1332c57a0..a4e16b19b 100644 --- a/crates/core/src/webgraph/query/id2node.rs +++ b/crates/core/src/webgraph/query/id2node.rs @@ -83,7 +83,7 @@ impl Query for Id2NodeQuery { })) } - fn remote_collector(&self) -> Self::Collector { + fn coordinator_collector(&self) -> Self::Collector { FirstDocCollector::without_shard_id() } diff --git a/crates/core/src/webgraph/query/mod.rs b/crates/core/src/webgraph/query/mod.rs index c7c450ae0..e802f3cde 100644 --- a/crates/core/src/webgraph/query/mod.rs +++ b/crates/core/src/webgraph/query/mod.rs @@ -14,6 +14,26 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//! # Main flow +//! ```md +//! `coordinator` <------> `searcher` +//! ----------------------------------- +//! send query to searcher +//! search index +//! collect fruits +//! send fruits to coordinator +//! merge fruits +//! filter fruits +//! for each shard +//! send fruits to searchers +//! construct intermediate output +//! from fruits +//! send intermediate output to coordinator +//! merge intermediate outputs +//! return final output +//! --------------------------------------------------- +//! ``` + use super::searcher::Searcher; use crate::{ampc::dht::ShardId, Result}; pub use collector::Collector; @@ -49,7 +69,7 @@ pub trait Query: Send + Sync + bincode::Encode + bincode::Decode + Clone { fn tantivy_query(&self, searcher: &Searcher) -> Self::TantivyQuery; fn collector(&self, searcher: &Searcher) -> Self::Collector; - fn remote_collector(&self) -> Self::Collector; + fn coordinator_collector(&self) -> Self::Collector; fn filter_fruit_shards( &self, diff --git a/crates/core/src/webgraph/remote.rs b/crates/core/src/webgraph/remote.rs index 09e7189b6..feafc695f 100644 --- a/crates/core/src/webgraph/remote.rs +++ b/crates/core/src/webgraph/remote.rs @@ -120,7 +120,7 @@ impl RemoteWebgraph { <::Child as tantivy::collector::SegmentCollector>::Fruit: From<::Fruit>, { - let collector = query.remote_collector(); + let collector = query.coordinator_collector(); let res = self .conn() @@ -250,7 +250,7 @@ impl RemoteWebgraph { queries .iter() .zip_eq(fruits.into_iter()) - .map(|(query, shard_fruits)| query.remote_collector().merge_fruits(shard_fruits)) + .map(|(query, shard_fruits)| query.coordinator_collector().merge_fruits(shard_fruits)) .collect::, _>>() } diff --git a/crates/core/src/webgraph/store.rs b/crates/core/src/webgraph/store.rs index f9c499d6d..34e574e57 100644 --- a/crates/core/src/webgraph/store.rs +++ b/crates/core/src/webgraph/store.rs @@ -271,7 +271,9 @@ impl EdgeStore { From<::Fruit>, { let fruit = self.search_initial(query)?; - let fruit = query.remote_collector().merge_fruits(vec![fruit.into()])?; + let fruit = query + .coordinator_collector() + .merge_fruits(vec![fruit.into()])?; let res = self.retrieve(query, fruit)?; Ok(Q::merge_results(vec![res])) }