Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod subgraph_manifest;

pub mod amp_subgraph;
pub mod polling_monitor;

pub mod subgraph;
pub mod subgraph_provider;
69 changes: 26 additions & 43 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::subgraph::runner::SubgraphRunner;
use graph::amp;
use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper};
use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities};
use graph::components::link_resolver::LinkResolverContext;
use graph::components::metrics::gas::GasMetrics;
use graph::components::metrics::subgraph::DeploymentStatusMetric;
use graph::components::store::SourceableStore;
Expand All @@ -29,7 +28,7 @@ use tokio::task;

use super::context::OffchainMonitor;
use super::SubgraphTriggerProcessor;
use crate::subgraph::runner::SubgraphRunnerError;
use crate::{subgraph::runner::SubgraphRunnerError, subgraph_manifest};

#[derive(Clone)]
pub struct SubgraphInstanceManager<S: SubgraphStore, AC> {
Expand Down Expand Up @@ -83,30 +82,22 @@ where
let deployment_status_metric = deployment_status_metric.clone();

async move {
let link_resolver = self
.link_resolver
.for_manifest(&loc.hash.to_string())
.map_err(SubgraphAssignmentProviderError::ResolveError)?;

let file_bytes = link_resolver
.cat(
&LinkResolverContext::new(&loc.hash, &logger),
&loc.hash.to_ipfs_link(),
)
.await
.map_err(SubgraphAssignmentProviderError::ResolveError)?;

let manifest: serde_yaml::Mapping = serde_yaml::from_slice(&file_bytes)
.map_err(|e| SubgraphAssignmentProviderError::ResolveError(e.into()))?;

match BlockchainKind::from_manifest(&manifest)? {
let raw_manifest = subgraph_manifest::load_raw_subgraph_manifest(
&logger,
&*instance_manager.subgraph_store,
&*instance_manager.link_resolver,
&loc.hash,
)
.await?;

match BlockchainKind::from_manifest(&raw_manifest)? {
BlockchainKind::Ethereum => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_ethereum::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
raw_manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
Expand All @@ -121,7 +112,7 @@ where
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
raw_manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
Expand All @@ -136,7 +127,7 @@ where
logger.clone(),
self.env_vars.cheap_clone(),
loc.cheap_clone(),
manifest,
raw_manifest,
stop_block,
Box::new(graph_chain_substreams::TriggerProcessor::new(
loc.clone(),
Expand Down Expand Up @@ -251,7 +242,7 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
logger: Logger,
env_vars: Arc<EnvVars>,
deployment: DeploymentLocator,
manifest: serde_yaml::Mapping,
raw_manifest: serde_yaml::Mapping,
stop_block: Option<BlockNumber>,
tp: Box<dyn TriggerProcessor<C, RuntimeHostBuilder<C>>>,
deployment_status_metric: DeploymentStatusMetric,
Expand All @@ -264,7 +255,7 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
logger,
env_vars,
deployment,
manifest,
raw_manifest,
stop_block,
tp,
deployment_status_metric,
Expand All @@ -278,7 +269,7 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
logger: Logger,
env_vars: Arc<EnvVars>,
deployment: DeploymentLocator,
manifest: serde_yaml::Mapping,
raw_manifest: serde_yaml::Mapping,
stop_block: Option<BlockNumber>,
tp: Box<dyn TriggerProcessor<C, RuntimeHostBuilder<C>>>,
deployment_status_metric: DeploymentStatusMetric,
Expand All @@ -291,8 +282,8 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
let subgraph_store = self.subgraph_store.cheap_clone();
let registry = self.metrics_registry.cheap_clone();

let raw_yaml = serde_yaml::to_string(&manifest).unwrap();
let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), manifest)?;
let manifest =
UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), raw_manifest)?;

// Allow for infinite retries for subgraph definition files.
let link_resolver = Arc::from(
Expand All @@ -302,24 +293,16 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
.with_retries(),
);

// Make sure the `raw_yaml` is present on both this subgraph and the graft base.
self.subgraph_store
.set_manifest_raw_yaml(&deployment.hash, raw_yaml)
.await?;
if let Some(graft) = &manifest.graft {
if self.subgraph_store.is_deployed(&graft.base)? {
let file_bytes = self
.link_resolver
.cat(
&LinkResolverContext::new(&deployment.hash, &logger),
&graft.base.to_ipfs_link(),
)
.await?;
let yaml = String::from_utf8(file_bytes)?;

self.subgraph_store
.set_manifest_raw_yaml(&graft.base, yaml)
.await?;
// Makes sure the raw manifest is cached in the subgraph store
let _raw_manifest = subgraph_manifest::load_raw_subgraph_manifest(
&logger,
&*self.subgraph_store,
&*self.link_resolver,
&graft.base,
)
.await?;
}
}

Expand Down
93 changes: 93 additions & 0 deletions core/src/subgraph_manifest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use graph::{
cheap_clone::CheapClone as _,
components::{
link_resolver::{LinkResolver, LinkResolverContext},
store::{StoreError, SubgraphStore},
},
data::subgraph::DeploymentHash,
};
use slog::{debug, Logger};

pub(super) async fn load_raw_subgraph_manifest(
logger: &Logger,
subgraph_store: &dyn SubgraphStore,
link_resolver: &dyn LinkResolver,
hash: &DeploymentHash,
) -> Result<serde_yaml::Mapping, Error> {
if let Some(raw_manifest) =
subgraph_store
.raw_manifest(hash)
.await
.map_err(|e| Error::LoadManifest {
hash: hash.cheap_clone(),
source: anyhow::Error::from(e),
})?
{
debug!(logger, "Loaded raw manifest from the subgraph store");
return Ok(raw_manifest);
}

debug!(logger, "Loading raw manifest using link resolver");

let link_resolver =
link_resolver
.for_manifest(&hash.to_string())
.map_err(|e| Error::CreateLinkResolver {
hash: hash.cheap_clone(),
source: e,
})?;

let file_bytes = link_resolver
.cat(
&LinkResolverContext::new(hash, logger),
&hash.to_ipfs_link(),
)
.await
.map_err(|e| Error::LoadManifest {
hash: hash.cheap_clone(),
source: e,
})?;

let raw_manifest: serde_yaml::Mapping =
serde_yaml::from_slice(&file_bytes).map_err(|e| Error::ParseManifest {
hash: hash.cheap_clone(),
source: e,
})?;

subgraph_store
.set_raw_manifest_once(hash, &raw_manifest)
.await
.map_err(|e| Error::StoreManifest {
hash: hash.cheap_clone(),
source: e,
})?;

Ok(raw_manifest)
}

#[derive(Debug, thiserror::Error)]
pub(super) enum Error {
#[error("failed to create link resolver for '{hash}': {source:#}")]
CreateLinkResolver {
hash: DeploymentHash,
source: anyhow::Error,
},

#[error("failed to load manifest for '{hash}': {source:#}")]
LoadManifest {
hash: DeploymentHash,
source: anyhow::Error,
},

#[error("failed to parse manifest for '{hash}': {source:#}")]
ParseManifest {
hash: DeploymentHash,
source: serde_yaml::Error,
},

#[error("failed to store manifest for '{hash}': {source:#}")]
StoreManifest {
hash: DeploymentHash,
source: StoreError,
},
}
59 changes: 20 additions & 39 deletions core/src/subgraph_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use graph::{
amp,
cheap_clone::CheapClone as _,
components::{
link_resolver::{LinkResolver, LinkResolverContext},
link_resolver::LinkResolver,
metrics::subgraph::SubgraphCountMetric,
store::DeploymentLocator,
store::{DeploymentLocator, SubgraphStore},
subgraph::SubgraphInstanceManager,
},
log::factory::LoggerFactory,
Expand All @@ -16,6 +16,8 @@ use parking_lot::RwLock;
use slog::{debug, error};
use tokio_util::sync::CancellationToken;

use super::subgraph_manifest;

/// Starts and stops subgraph deployments.
///
/// For each subgraph deployment, checks the subgraph processing kind
Expand All @@ -27,6 +29,7 @@ use tokio_util::sync::CancellationToken;
pub struct SubgraphProvider {
logger_factory: LoggerFactory,
count_metrics: Arc<SubgraphCountMetric>,
subgraph_store: Arc<dyn SubgraphStore>,
link_resolver: Arc<dyn LinkResolver>,

/// Stops active subgraph start request tasks.
Expand Down Expand Up @@ -54,12 +57,14 @@ impl SubgraphProvider {
/// # Arguments
/// - `logger_factory`: Creates loggers for each subgraph deployment start/stop request
/// - `count_metrics`: Tracks the number of started subgraph deployments
/// - `subgraph_store`: Loads subgraph manifests to determine the subgraph processing kinds
/// - `link_resolver`: Loads subgraph manifests to determine the subgraph processing kinds
/// - `cancel_token`: Stops active subgraph start request tasks
/// - `instance_managers`: Contains the enabled subgraph instance managers
pub fn new(
logger_factory: &LoggerFactory,
count_metrics: Arc<SubgraphCountMetric>,
subgraph_store: Arc<dyn SubgraphStore>,
link_resolver: Arc<dyn LinkResolver>,
cancel_token: CancellationToken,
instance_managers: SubgraphInstanceManagers,
Expand All @@ -74,6 +79,7 @@ impl SubgraphProvider {
Self {
logger_factory,
count_metrics,
subgraph_store,
link_resolver,
cancel_token,
instance_managers,
Expand All @@ -94,30 +100,17 @@ impl SubgraphProvider {
) -> Result<(), Error> {
let logger = self.logger_factory.subgraph_logger(&loc);

let link_resolver = self
.link_resolver
.for_manifest(&loc.hash.to_string())
.map_err(|e| Error::CreateLinkResolver {
loc: loc.cheap_clone(),
source: e,
})?;

let file_bytes = link_resolver
.cat(
&LinkResolverContext::new(&loc.hash, &logger),
&loc.hash.to_ipfs_link(),
)
.await
.map_err(|e| Error::LoadManifest {
loc: loc.cheap_clone(),
source: e,
})?;

let raw_manifest: serde_yaml::Mapping =
serde_yaml::from_slice(&file_bytes).map_err(|e| Error::ParseManifest {
loc: loc.cheap_clone(),
source: e,
})?;
let raw_manifest = subgraph_manifest::load_raw_subgraph_manifest(
&logger,
&*self.subgraph_store,
&*self.link_resolver,
&loc.hash,
)
.await
.map_err(|e| Error::LoadManifest {
loc: loc.cheap_clone(),
source: e,
})?;

let subgraph_kind = SubgraphProcessingKind::from_manifest(&raw_manifest);
self.assignments.set_subgraph_kind(&loc, subgraph_kind);
Expand Down Expand Up @@ -220,22 +213,10 @@ impl SubgraphInstanceManager for SubgraphProvider {
/// Enumerates all possible errors of the subgraph provider.
#[derive(Debug, thiserror::Error)]
enum Error {
#[error("failed to create link resolver for '{loc}': {source:#}")]
CreateLinkResolver {
loc: DeploymentLocator,
source: anyhow::Error,
},

#[error("failed to load manifest for '{loc}': {source:#}")]
LoadManifest {
loc: DeploymentLocator,
source: anyhow::Error,
},

#[error("failed to parse manifest for '{loc}': {source:#}")]
ParseManifest {
loc: DeploymentLocator,
source: serde_yaml::Error,
source: subgraph_manifest::Error,
},

#[error("failed to get instance manager for '{loc}' with kind '{subgraph_kind}'")]
Expand Down
Loading