From 2aae2884e9e965afa02dfc702bd997b25f1701e9 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Mon, 8 Dec 2025 15:30:15 +0200 Subject: [PATCH] feat(core, store): load raw subgraph manifests from store --- Cargo.lock | 1 + core/src/lib.rs | 3 +- core/src/subgraph/instance_manager.rs | 69 +++++++------------ core/src/subgraph_manifest.rs | 93 ++++++++++++++++++++++++++ core/src/subgraph_provider.rs | 59 ++++++---------- graph/src/components/store/traits.rs | 24 +++++-- node/src/launcher.rs | 1 + node/src/manager/commands/run.rs | 1 + store/postgres/Cargo.toml | 1 + store/postgres/src/deployment.rs | 26 +++++-- store/postgres/src/deployment_store.rs | 24 +++++-- store/postgres/src/subgraph_store.rs | 44 +++++++++--- tests/src/fixture/mod.rs | 1 + 13 files changed, 240 insertions(+), 107 deletions(-) create mode 100644 core/src/subgraph_manifest.rs diff --git a/Cargo.lock b/Cargo.lock index f1af838abc8..dc1e4cedec0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3059,6 +3059,7 @@ dependencies = [ "rand 0.9.2", "serde", "serde_json", + "serde_yaml", "sqlparser 0.59.0", "stable-hash 0.3.4", "thiserror 2.0.16", diff --git a/core/src/lib.rs b/core/src/lib.rs index 61de81c0b64..118ce3ebaae 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,5 +1,6 @@ +mod subgraph_manifest; + pub mod amp_subgraph; pub mod polling_monitor; - pub mod subgraph; pub mod subgraph_provider; diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 77b4b7288f1..ac19fdf361c 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -12,7 +12,6 @@ use crate::subgraph::runner::SubgraphRunner; use graph::amp; use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper}; use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities}; -use graph::components::link_resolver::LinkResolverContext; use graph::components::metrics::gas::GasMetrics; use graph::components::metrics::subgraph::DeploymentStatusMetric; use graph::components::store::SourceableStore; @@ -29,7 +28,7 @@ use tokio::task; use super::context::OffchainMonitor; use super::SubgraphTriggerProcessor; -use crate::subgraph::runner::SubgraphRunnerError; +use crate::{subgraph::runner::SubgraphRunnerError, subgraph_manifest}; #[derive(Clone)] pub struct SubgraphInstanceManager { @@ -83,30 +82,22 @@ where let deployment_status_metric = deployment_status_metric.clone(); async move { - let link_resolver = self - .link_resolver - .for_manifest(&loc.hash.to_string()) - .map_err(SubgraphAssignmentProviderError::ResolveError)?; - - let file_bytes = link_resolver - .cat( - &LinkResolverContext::new(&loc.hash, &logger), - &loc.hash.to_ipfs_link(), - ) - .await - .map_err(SubgraphAssignmentProviderError::ResolveError)?; - - let manifest: serde_yaml::Mapping = serde_yaml::from_slice(&file_bytes) - .map_err(|e| SubgraphAssignmentProviderError::ResolveError(e.into()))?; - - match BlockchainKind::from_manifest(&manifest)? { + let raw_manifest = subgraph_manifest::load_raw_subgraph_manifest( + &logger, + &*instance_manager.subgraph_store, + &*instance_manager.link_resolver, + &loc.hash, + ) + .await?; + + match BlockchainKind::from_manifest(&raw_manifest)? { BlockchainKind::Ethereum => { let runner = instance_manager .build_subgraph_runner::( logger.clone(), self.env_vars.cheap_clone(), loc.clone(), - manifest, + raw_manifest, stop_block, Box::new(SubgraphTriggerProcessor {}), deployment_status_metric, @@ -121,7 +112,7 @@ where logger.clone(), self.env_vars.cheap_clone(), loc.clone(), - manifest, + raw_manifest, stop_block, Box::new(SubgraphTriggerProcessor {}), deployment_status_metric, @@ -136,7 +127,7 @@ where logger.clone(), self.env_vars.cheap_clone(), loc.cheap_clone(), - manifest, + raw_manifest, stop_block, Box::new(graph_chain_substreams::TriggerProcessor::new( loc.clone(), @@ -251,7 +242,7 @@ impl SubgraphInstanceManager { logger: Logger, env_vars: Arc, deployment: DeploymentLocator, - manifest: serde_yaml::Mapping, + raw_manifest: serde_yaml::Mapping, stop_block: Option, tp: Box>>, deployment_status_metric: DeploymentStatusMetric, @@ -264,7 +255,7 @@ impl SubgraphInstanceManager { logger, env_vars, deployment, - manifest, + raw_manifest, stop_block, tp, deployment_status_metric, @@ -278,7 +269,7 @@ impl SubgraphInstanceManager { logger: Logger, env_vars: Arc, deployment: DeploymentLocator, - manifest: serde_yaml::Mapping, + raw_manifest: serde_yaml::Mapping, stop_block: Option, tp: Box>>, deployment_status_metric: DeploymentStatusMetric, @@ -291,8 +282,8 @@ impl SubgraphInstanceManager { let subgraph_store = self.subgraph_store.cheap_clone(); let registry = self.metrics_registry.cheap_clone(); - let raw_yaml = serde_yaml::to_string(&manifest).unwrap(); - let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), manifest)?; + let manifest = + UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), raw_manifest)?; // Allow for infinite retries for subgraph definition files. let link_resolver = Arc::from( @@ -302,24 +293,16 @@ impl SubgraphInstanceManager { .with_retries(), ); - // Make sure the `raw_yaml` is present on both this subgraph and the graft base. - self.subgraph_store - .set_manifest_raw_yaml(&deployment.hash, raw_yaml) - .await?; if let Some(graft) = &manifest.graft { if self.subgraph_store.is_deployed(&graft.base)? { - let file_bytes = self - .link_resolver - .cat( - &LinkResolverContext::new(&deployment.hash, &logger), - &graft.base.to_ipfs_link(), - ) - .await?; - let yaml = String::from_utf8(file_bytes)?; - - self.subgraph_store - .set_manifest_raw_yaml(&graft.base, yaml) - .await?; + // Makes sure the raw manifest is cached in the subgraph store + let _raw_manifest = subgraph_manifest::load_raw_subgraph_manifest( + &logger, + &*self.subgraph_store, + &*self.link_resolver, + &graft.base, + ) + .await?; } } diff --git a/core/src/subgraph_manifest.rs b/core/src/subgraph_manifest.rs new file mode 100644 index 00000000000..c02497c10c5 --- /dev/null +++ b/core/src/subgraph_manifest.rs @@ -0,0 +1,93 @@ +use graph::{ + cheap_clone::CheapClone as _, + components::{ + link_resolver::{LinkResolver, LinkResolverContext}, + store::{StoreError, SubgraphStore}, + }, + data::subgraph::DeploymentHash, +}; +use slog::{debug, Logger}; + +pub(super) async fn load_raw_subgraph_manifest( + logger: &Logger, + subgraph_store: &dyn SubgraphStore, + link_resolver: &dyn LinkResolver, + hash: &DeploymentHash, +) -> Result { + if let Some(raw_manifest) = + subgraph_store + .raw_manifest(hash) + .await + .map_err(|e| Error::LoadManifest { + hash: hash.cheap_clone(), + source: anyhow::Error::from(e), + })? + { + debug!(logger, "Loaded raw manifest from the subgraph store"); + return Ok(raw_manifest); + } + + debug!(logger, "Loading raw manifest using link resolver"); + + let link_resolver = + link_resolver + .for_manifest(&hash.to_string()) + .map_err(|e| Error::CreateLinkResolver { + hash: hash.cheap_clone(), + source: e, + })?; + + let file_bytes = link_resolver + .cat( + &LinkResolverContext::new(hash, logger), + &hash.to_ipfs_link(), + ) + .await + .map_err(|e| Error::LoadManifest { + hash: hash.cheap_clone(), + source: e, + })?; + + let raw_manifest: serde_yaml::Mapping = + serde_yaml::from_slice(&file_bytes).map_err(|e| Error::ParseManifest { + hash: hash.cheap_clone(), + source: e, + })?; + + subgraph_store + .set_raw_manifest_once(hash, &raw_manifest) + .await + .map_err(|e| Error::StoreManifest { + hash: hash.cheap_clone(), + source: e, + })?; + + Ok(raw_manifest) +} + +#[derive(Debug, thiserror::Error)] +pub(super) enum Error { + #[error("failed to create link resolver for '{hash}': {source:#}")] + CreateLinkResolver { + hash: DeploymentHash, + source: anyhow::Error, + }, + + #[error("failed to load manifest for '{hash}': {source:#}")] + LoadManifest { + hash: DeploymentHash, + source: anyhow::Error, + }, + + #[error("failed to parse manifest for '{hash}': {source:#}")] + ParseManifest { + hash: DeploymentHash, + source: serde_yaml::Error, + }, + + #[error("failed to store manifest for '{hash}': {source:#}")] + StoreManifest { + hash: DeploymentHash, + source: StoreError, + }, +} diff --git a/core/src/subgraph_provider.rs b/core/src/subgraph_provider.rs index 65eeb7eb728..cbfb60a5e11 100644 --- a/core/src/subgraph_provider.rs +++ b/core/src/subgraph_provider.rs @@ -4,9 +4,9 @@ use graph::{ amp, cheap_clone::CheapClone as _, components::{ - link_resolver::{LinkResolver, LinkResolverContext}, + link_resolver::LinkResolver, metrics::subgraph::SubgraphCountMetric, - store::DeploymentLocator, + store::{DeploymentLocator, SubgraphStore}, subgraph::SubgraphInstanceManager, }, log::factory::LoggerFactory, @@ -16,6 +16,8 @@ use parking_lot::RwLock; use slog::{debug, error}; use tokio_util::sync::CancellationToken; +use super::subgraph_manifest; + /// Starts and stops subgraph deployments. /// /// For each subgraph deployment, checks the subgraph processing kind @@ -27,6 +29,7 @@ use tokio_util::sync::CancellationToken; pub struct SubgraphProvider { logger_factory: LoggerFactory, count_metrics: Arc, + subgraph_store: Arc, link_resolver: Arc, /// Stops active subgraph start request tasks. @@ -54,12 +57,14 @@ impl SubgraphProvider { /// # Arguments /// - `logger_factory`: Creates loggers for each subgraph deployment start/stop request /// - `count_metrics`: Tracks the number of started subgraph deployments + /// - `subgraph_store`: Loads subgraph manifests to determine the subgraph processing kinds /// - `link_resolver`: Loads subgraph manifests to determine the subgraph processing kinds /// - `cancel_token`: Stops active subgraph start request tasks /// - `instance_managers`: Contains the enabled subgraph instance managers pub fn new( logger_factory: &LoggerFactory, count_metrics: Arc, + subgraph_store: Arc, link_resolver: Arc, cancel_token: CancellationToken, instance_managers: SubgraphInstanceManagers, @@ -74,6 +79,7 @@ impl SubgraphProvider { Self { logger_factory, count_metrics, + subgraph_store, link_resolver, cancel_token, instance_managers, @@ -94,30 +100,17 @@ impl SubgraphProvider { ) -> Result<(), Error> { let logger = self.logger_factory.subgraph_logger(&loc); - let link_resolver = self - .link_resolver - .for_manifest(&loc.hash.to_string()) - .map_err(|e| Error::CreateLinkResolver { - loc: loc.cheap_clone(), - source: e, - })?; - - let file_bytes = link_resolver - .cat( - &LinkResolverContext::new(&loc.hash, &logger), - &loc.hash.to_ipfs_link(), - ) - .await - .map_err(|e| Error::LoadManifest { - loc: loc.cheap_clone(), - source: e, - })?; - - let raw_manifest: serde_yaml::Mapping = - serde_yaml::from_slice(&file_bytes).map_err(|e| Error::ParseManifest { - loc: loc.cheap_clone(), - source: e, - })?; + let raw_manifest = subgraph_manifest::load_raw_subgraph_manifest( + &logger, + &*self.subgraph_store, + &*self.link_resolver, + &loc.hash, + ) + .await + .map_err(|e| Error::LoadManifest { + loc: loc.cheap_clone(), + source: e, + })?; let subgraph_kind = SubgraphProcessingKind::from_manifest(&raw_manifest); self.assignments.set_subgraph_kind(&loc, subgraph_kind); @@ -220,22 +213,10 @@ impl SubgraphInstanceManager for SubgraphProvider { /// Enumerates all possible errors of the subgraph provider. #[derive(Debug, thiserror::Error)] enum Error { - #[error("failed to create link resolver for '{loc}': {source:#}")] - CreateLinkResolver { - loc: DeploymentLocator, - source: anyhow::Error, - }, - #[error("failed to load manifest for '{loc}': {source:#}")] LoadManifest { loc: DeploymentLocator, - source: anyhow::Error, - }, - - #[error("failed to parse manifest for '{loc}': {source:#}")] - ParseManifest { - loc: DeploymentLocator, - source: serde_yaml::Error, + source: subgraph_manifest::Error, }, #[error("failed to get instance manager for '{loc}' with kind '{subgraph_kind}'")] diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index fff49c8f8ee..85bae95cffb 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -211,17 +211,27 @@ pub trait SubgraphStore: Send + Sync + 'static { /// hash. Returns `None` if there is no deployment with that hash fn active_locator(&self, hash: &str) -> Result, StoreError>; - /// This migrates subgraphs that existed before the raw_yaml column was added. - async fn set_manifest_raw_yaml( - &self, - hash: &DeploymentHash, - raw_yaml: String, - ) -> Result<(), StoreError>; - /// Return `true` if the `instrument` flag for the deployment is set. /// When this flag is set, indexing of the deployment should log /// additional diagnostic information fn instrument(&self, deployment: &DeploymentLocator) -> Result; + + /// Sets the raw subgraph manifest for the given deployment hash, if not previously set. + /// + /// Returns `Ok(())` regardless of whether the value was updated. + async fn set_raw_manifest_once( + &self, + hash: &DeploymentHash, + raw_manifest: &serde_yaml::Mapping, + ) -> Result<(), StoreError>; + + /// Returns the raw subgraph manifest for the given deployment hash, if one exists. + /// + /// Returns `None` if no manifest is found for the specified deployment hash. + async fn raw_manifest( + &self, + hash: &DeploymentHash, + ) -> Result, StoreError>; } pub trait ReadStore: Send + Sync + 'static { diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 3bbccf5cf0e..1269e86fcb4 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -328,6 +328,7 @@ where let subgraph_provider = graph_core::subgraph_provider::SubgraphProvider::new( &logger_factory, sg_count.cheap_clone(), + network_store.subgraph_store(), link_resolver.cheap_clone(), tokio_util::sync::CancellationToken::new(), subgraph_instance_managers, diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 473a12e5d17..32b6fbc9365 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -201,6 +201,7 @@ pub async fn run( let subgraph_provider = Arc::new(graph_core::subgraph_provider::SubgraphProvider::new( &logger_factory, sg_metrics.cheap_clone(), + subgraph_store.clone(), link_resolver.cheap_clone(), cancel_token.clone(), subgraph_instance_managers, diff --git a/store/postgres/Cargo.toml b/store/postgres/Cargo.toml index c3c992329eb..18080b8618a 100644 --- a/store/postgres/Cargo.toml +++ b/store/postgres/Cargo.toml @@ -34,6 +34,7 @@ hex = "0.4.3" pretty_assertions = "1.4.1" sqlparser = { workspace = true } thiserror = { workspace = true } +serde_yaml.workspace = true [dev-dependencies] clap.workspace = true diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index 340d80d1184..3c60ce2cedb 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -400,22 +400,40 @@ pub fn set_history_blocks( .map_err(StoreError::from) } -/// This migrates subgraphs that existed before the raw_yaml column was added. -pub fn set_manifest_raw_yaml( +/// Sets the raw subgraph manifest for the given `site`, if not previously set. +/// +/// Returns `Ok(())` regardless of whether the value was updated. +pub fn set_raw_manifest_once( conn: &mut PgConnection, site: &Site, - raw_yaml: &str, + raw_manifest: &str, ) -> Result<(), StoreError> { use subgraph_manifest as sm; update(sm::table.filter(sm::id.eq(site.id))) .filter(sm::raw_yaml.is_null()) - .set(sm::raw_yaml.eq(raw_yaml)) + .set(sm::raw_yaml.eq(raw_manifest)) .execute(conn) .map(|_| ()) .map_err(|e| e.into()) } +/// Returns the raw subgraph manifest for the given `site`, if one exists. +/// +/// Returns `None` if no manifest is found for this `site`. +pub fn load_raw_manifest( + conn: &mut PgConnection, + site: &Site, +) -> Result, StoreError> { + use subgraph_manifest as sm; + + sm::table + .select(sm::raw_yaml) + .filter(sm::id.eq(site.id)) + .first(conn) + .map_err(StoreError::from) +} + /// Most of the time, this will be a noop; the only time we actually modify /// the deployment table is the first forward block after a reorg fn reset_reorg_count(conn: &mut PgConnection, site: &Site) -> StoreResult<()> { diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index f9aa0dfde75..e1ec5644c45 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1878,19 +1878,33 @@ impl DeploymentStore { .await } - pub(crate) async fn set_manifest_raw_yaml( + fn is_source(&self, site: &Site) -> Result { + self.primary.is_source(site) + } + + /// Sets the raw subgraph manifest for the given `site`, if not previously set. + /// + /// Returns `Ok(())` regardless of whether the value was updated. + pub(crate) async fn set_raw_manifest_once( &self, site: Arc, - raw_yaml: String, + raw_manifest: String, ) -> Result<(), StoreError> { self.with_conn(move |conn, _| { - deployment::set_manifest_raw_yaml(conn, &site, &raw_yaml).map_err(Into::into) + deployment::set_raw_manifest_once(conn, &site, &raw_manifest) + .map_err(CancelableError::from) }) .await } - fn is_source(&self, site: &Site) -> Result { - self.primary.is_source(site) + /// Returns the raw subgraph manifest for the given `site`, if one exists. + /// + /// Returns `None` if no manifest is found for this `site`. + pub(crate) async fn raw_manifest(&self, site: Arc) -> Result, StoreError> { + self.with_conn(move |conn, _| { + deployment::load_raw_manifest(conn, &site).map_err(CancelableError::from) + }) + .await } } diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 7f5993735c2..181a1449a9b 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -1632,20 +1632,48 @@ impl SubgraphStoreTrait for SubgraphStore { Ok(sites.first().map(DeploymentLocator::from)) } - async fn set_manifest_raw_yaml( + fn instrument(&self, deployment: &DeploymentLocator) -> Result { + let site = self.find_site(deployment.id.into())?; + let store = self.for_site(&site)?; + + let info = store.subgraph_info(site)?; + Ok(info.instrument) + } + + async fn set_raw_manifest_once( &self, hash: &DeploymentHash, - raw_yaml: String, + raw_manifest: &serde_yaml::Mapping, ) -> Result<(), StoreError> { let (store, site) = self.store(hash)?; - store.set_manifest_raw_yaml(site, raw_yaml).await + + store + .set_raw_manifest_once( + site, + serde_yaml::to_string(raw_manifest).map_err(|_e| { + StoreError::InternalError("invalid subgraph manifest".to_string()) + })?, + ) + .await } - fn instrument(&self, deployment: &DeploymentLocator) -> Result { - let site = self.find_site(deployment.id.into())?; - let store = self.for_site(&site)?; + async fn raw_manifest( + &self, + hash: &DeploymentHash, + ) -> Result, StoreError> { + let (store, site) = self.store(hash)?; - let info = store.subgraph_info(site)?; - Ok(info.instrument) + store + .raw_manifest(site) + .await + .and_then(|optional_raw_manifest| { + optional_raw_manifest + .map(|raw_manifest| { + serde_yaml::from_str(&raw_manifest).map_err(|_e| { + StoreError::InternalError("invalid subgraph manifest".to_string()) + }) + }) + .transpose() + }) } } diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 27daa844c1c..f32a9d64f7b 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -575,6 +575,7 @@ pub async fn setup_inner( let subgraph_provider = Arc::new(graph_core::subgraph_provider::SubgraphProvider::new( &logger_factory, sg_count.cheap_clone(), + subgraph_store.clone(), link_resolver.cheap_clone(), tokio_util::sync::CancellationToken::new(), subgraph_instance_managers,