diff --git a/Cargo.lock b/Cargo.lock index ecdef04ec..dad45e913 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4703,9 +4703,9 @@ dependencies = [ [[package]] name = "thegraph-graphql-http" -version = "0.2.4" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae3b675ae2fd6e213fa1b428ba44009b309338b6e9b7e6205a674ccecd5d67d4" +checksum = "80bff1bad9a7c3b210876b5601460cab05d8fa6e8f52481472427ed18bc33975" dependencies = [ "async-trait", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 708c3e170..2e47821cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,9 +54,7 @@ thegraph-core = { version = "0.9.0", features = [ "attestation", "serde", ] } -thegraph-graphql-http = { version = "0.2.1", features = [ - "http-client-reqwest", -] } +thegraph-graphql-http = { version = "0.3.2", features = ["reqwest"] } thiserror = "2.0.2" tokio = { version = "1.38.0", features = [ "macros", diff --git a/src/client_query.rs b/src/client_query.rs index eea2f441b..760cf67e2 100644 --- a/src/client_query.rs +++ b/src/client_query.rs @@ -36,7 +36,7 @@ use crate::{ indexing_performance, metrics::{with_metric, METRICS}, middleware::RequestId, - network::{self, DeploymentError, Indexing, IndexingId, ResolvedSubgraphInfo, SubgraphError}, + network::{DeploymentError, Indexing, IndexingId, ResolvedSubgraphInfo, SubgraphError}, reports, }; @@ -464,7 +464,7 @@ fn build_candidates_list( blocks_per_minute: u64, block_requirements: &BlockRequirements, subgraph_versions: &[DeploymentId], - indexings: HashMap>, + indexings: HashMap>, ) -> ( Vec>, BTreeMap, @@ -492,7 +492,7 @@ fn build_candidates_list( let indexing = match indexing { Ok(indexing) => indexing, Err(err) => { - candidates_errors.insert(indexing_id.indexer, err.into()); + candidates_errors.insert(indexing_id.indexer, IndexerError::Unavailable(err)); continue; } }; @@ -630,33 +630,6 @@ fn blocks_behind(seconds_behind: u32, blocks_per_minute: u64) -> u64 { ((seconds_behind as f64 / 60.0) * blocks_per_minute as f64) as u64 } -impl From for IndexerError { - fn from(err: network::ResolutionError) -> Self { - match err { - network::ResolutionError::Unavailable(reason) => { - let reason = match reason { - network::UnavailableReason::Blocked(reason) => { - UnavailableReason::Blocked(reason) - } - reason @ network::UnavailableReason::IndexerServiceVersionBelowMin - | reason @ network::UnavailableReason::GraphNodeVersionBelowMin => { - UnavailableReason::NotSupported(reason.to_string()) - } - reason @ network::UnavailableReason::IndexerResolutionError { .. } - | reason @ network::UnavailableReason::IndexingProgressNotFound => { - UnavailableReason::NoStatus(reason.to_string()) - } - }; - IndexerError::Unavailable(reason) - } - network::ResolutionError::Internal(err) => { - tracing::error!(error = ?err, "internal error"); - IndexerError::Unavailable(UnavailableReason::Internal(err)) - } - } - } -} - pub async fn handle_indexer_query( State(ctx): State, Extension(auth): Extension, @@ -680,7 +653,7 @@ pub async fn handle_indexer_query( .get(&indexing_id) .ok_or_else(|| Error::NoIndexers)? .as_ref() - .map_err(|err| bad_indexers(err.clone().into()))?; + .map_err(|err| bad_indexers(IndexerError::Unavailable(err.clone())))?; let (latest_block, blocks_per_minute) = { let chain = ctx.chains.chain(&subgraph.chain); diff --git a/src/errors.rs b/src/errors.rs index 87c5cb0b6..92ed3105d 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -105,6 +105,12 @@ pub enum UnavailableReason { Internal(&'static str), } +impl UnavailableReason { + pub fn invalid_url() -> Self { + UnavailableReason::NoStatus("invalid URL".to_string()) + } +} + #[derive(Clone, Debug, PartialEq, Eq)] pub struct MissingBlockError { pub missing: Option, diff --git a/src/indexers.rs b/src/indexers.rs deleted file mode 100644 index 4fb022b96..000000000 --- a/src/indexers.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub use urls::*; - -pub mod cost_models; -pub mod indexing_progress; -pub mod public_poi; -mod urls; -pub mod version; diff --git a/src/indexers/cost_models.rs b/src/indexers/cost_models.rs deleted file mode 100644 index 353953ab3..000000000 --- a/src/indexers/cost_models.rs +++ /dev/null @@ -1,112 +0,0 @@ -use serde::Deserialize; -use thegraph_core::DeploymentId; -use thegraph_graphql_http::{ - graphql::{Document, IntoDocument, IntoDocumentWithVariables}, - http_client::{RequestError, ReqwestExt, ResponseError}, -}; - -use super::urls::CostUrl; - -const COST_MODEL_QUERY_DOCUMENT: &str = r#" - query costModels($deployments: [String!]!) { - costModels(deployments: $deployments) { - deployment - model - } - } -"#; - -/// Errors that can occur while fetching the cost models. -#[derive(Clone, Debug, thiserror::Error)] -pub enum Error { - /// The request failed. - #[error("request error: {0}")] - Request(String), - - /// Invalid response. - /// - /// The response could not be deserialized or is missing required fields. - #[error("invalid response: {0}")] - InvalidResponse(String), - - /// The response did not contain any cost model information. - #[error("empty response")] - EmptyResponse, -} - -/// Send a request to the indexer to get the cost models of the given deployments. -pub async fn send_request( - client: &reqwest::Client, - url: CostUrl, - deployments: impl IntoIterator, -) -> Result, Error> { - let resp = client - .post(url.into_inner()) - .send_graphql::(Request::new(deployments)) - .await - .map_err(|err| match err { - RequestError::RequestSerializationError(..) => { - unreachable!("request serialization should not fail") - } - RequestError::RequestSendError(..) | RequestError::ResponseRecvError(..) => { - Error::Request(err.to_string()) - } - RequestError::ResponseDeserializationError { .. } => { - Error::InvalidResponse(err.to_string()) - } - })? - .map_err(|err| match err { - ResponseError::Failure { .. } => Error::Request(err.to_string()), - ResponseError::Empty => Error::EmptyResponse, - })?; - - Ok(resp.cost_models) -} - -/// The request type for the cost model query. -/// -/// This is a GraphQL query that fetches the cost models for a set of deployments. -/// -/// See [`COST_MODEL_QUERY_DOCUMENT`] for the query document. -#[derive(Debug, Clone)] -struct Request { - document: Document, - vars_deployments: Vec, -} - -impl Request { - /// Create a new cost model query request. - pub fn new<'a>(deployments: impl IntoIterator) -> Self { - let deployments = deployments - .into_iter() - .map(|item| item.to_string()) - .collect(); - Self { - document: COST_MODEL_QUERY_DOCUMENT.into_document(), - vars_deployments: deployments, - } - } -} - -impl IntoDocumentWithVariables for Request { - type Variables = serde_json::Value; - - fn into_document_with_variables(self) -> (Document, Self::Variables) { - ( - self.document, - serde_json::json!({ "deployments": self.vars_deployments }), - ) - } -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct Response { - cost_models: Vec, -} - -#[derive(Debug, Deserialize)] -pub struct CostModelSource { - pub deployment: DeploymentId, - pub model: String, -} diff --git a/src/indexers/indexing_progress.rs b/src/indexers/indexing_progress.rs deleted file mode 100644 index cd963cbf4..000000000 --- a/src/indexers/indexing_progress.rs +++ /dev/null @@ -1,202 +0,0 @@ -use serde::Deserialize; -use serde_with::serde_as; -use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId}; -use thegraph_graphql_http::{ - graphql::{Document, IntoDocument, IntoDocumentWithVariables}, - http_client::{RequestError, ReqwestExt as _, ResponseError}, -}; - -use super::urls::StatusUrl; - -const INDEXING_PROGRESS_QUERY_DOCUMENT: &str = r#" - query indexingProgress($deployments: [String!]!) { - indexingStatuses(subgraphs: $deployments) { - subgraph - chains { - network - latestBlock { number } - earliestBlock { number } - } - } - }"#; - -/// Errors that can occur while fetching the indexing progress. -#[derive(Clone, Debug, thiserror::Error)] -pub enum Error { - /// The request failed. - #[error("request error: {0}")] - Request(String), - - /// Invalid response. - /// - /// The response could not be deserialized or is missing required fields. - #[error("invalid response: {0}")] - InvalidResponse(String), - - /// The response did not contain any progress information. - #[error("empty response")] - EmptyResponse, -} - -/// Send a request to the indexer to get the indexing status of the given deployments. -pub async fn send_request( - client: &reqwest::Client, - url: StatusUrl, - deployments: impl IntoIterator, -) -> Result, Error> { - let resp = client - .post(url.into_inner()) - .send_graphql::(Request::new(deployments)) - .await - .map_err(|err| match err { - RequestError::RequestSerializationError(..) => { - unreachable!("request serialization should not fail") - } - RequestError::RequestSendError(..) | RequestError::ResponseRecvError(..) => { - Error::Request(err.to_string()) - } - RequestError::ResponseDeserializationError { .. } => { - Error::InvalidResponse(err.to_string()) - } - })? - .map_err(|err| match err { - ResponseError::Failure { .. } => Error::Request(err.to_string()), - ResponseError::Empty => Error::EmptyResponse, - })?; - - if resp.indexing_statuses.is_empty() { - return Err(Error::EmptyResponse); - } - - Ok(resp.indexing_statuses) -} - -/// The request type for the indexing progress query. -/// -/// This type is used to construct the GraphQL request document and variables for the indexing -/// progress query. -/// -/// See [`INDEXING_PROGRESS_QUERY_DOCUMENT`] for the query document. -#[derive(Debug, Clone)] -struct Request { - document: Document, - vars_deployments: Vec, -} - -impl Request { - /// Create a new indexing progress query request. - fn new<'a>(deployments: impl IntoIterator) -> Self { - let deployments = deployments - .into_iter() - .map(|item| item.to_string()) - .collect(); - Self { - document: INDEXING_PROGRESS_QUERY_DOCUMENT.into_document(), - vars_deployments: deployments, - } - } -} - -impl IntoDocumentWithVariables for Request { - type Variables = serde_json::Value; - - fn into_document_with_variables(self) -> (Document, Self::Variables) { - ( - self.document, - serde_json::json!({ "deployments": self.vars_deployments }), - ) - } -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -struct Response { - indexing_statuses: Vec, -} - -#[derive(Debug, Deserialize)] -pub struct IndexingStatusResponse { - #[serde(rename = "subgraph")] - pub deployment_id: DeploymentId, - pub chains: Vec, -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ChainStatus { - pub latest_block: Option, - pub earliest_block: Option, -} - -#[serde_as] -#[derive(Debug, Deserialize)] -pub struct BlockStatus { - #[serde_as(as = "serde_with::DisplayFromStr")] - pub number: BlockNumber, -} - -#[cfg(test)] -mod tests { - use super::Response; - - #[test] - fn deserialize_response() { - //* Given - let response = serde_json::json!({ - "indexingStatuses": [ - { - "subgraph": "QmZTy9EJHu8rfY9QbEk3z1epmmvh5XHhT2Wqhkfbyt8k9Z", - "chains": [ - { - "network": "rinkeby", - "latestBlock": { - "number": "10164818", - "hash": "0xaa94881130ba16c28cc90a5a880b117bdc90b6b11e9cde0c78804cdb93cc9e85" - }, - "earliestBlock": { - "number": "7559999", - "hash": "0x0" - } - } - ] - }, - { - "subgraph": "QmSLQfPFcz2pKRJZUH16Sk26EFpRgdxTYGnMiKvWgKRM2a", - "chains": [ - { - "network": "rinkeby" - } - ] - } - ] - }); - - //* When - let response = serde_json::from_value(response); - - //* Then - let response: Response = response.expect("deserialization failed"); - - assert_eq!(response.indexing_statuses.len(), 2); - let status1 = &response.indexing_statuses[0]; - let status2 = &response.indexing_statuses[1]; - - // Status 1 - assert_eq!( - status1.deployment_id.to_string(), - "QmZTy9EJHu8rfY9QbEk3z1epmmvh5XHhT2Wqhkfbyt8k9Z" - ); - assert_eq!(status1.chains.len(), 1); - assert!(status1.chains[0].latest_block.is_some()); - assert!(status1.chains[0].earliest_block.is_some()); - - // Status 2 - assert_eq!( - status2.deployment_id.to_string(), - "QmSLQfPFcz2pKRJZUH16Sk26EFpRgdxTYGnMiKvWgKRM2a" - ); - assert_eq!(status2.chains.len(), 1); - assert!(status2.chains[0].latest_block.is_none()); - assert!(status2.chains[0].earliest_block.is_none()); - } -} diff --git a/src/indexers/public_poi.rs b/src/indexers/public_poi.rs deleted file mode 100644 index 1a5a07350..000000000 --- a/src/indexers/public_poi.rs +++ /dev/null @@ -1,189 +0,0 @@ -use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; -use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId, ProofOfIndexing}; -use thegraph_graphql_http::{ - graphql::{Document, IntoDocument, IntoDocumentWithVariables}, - http_client::{RequestError, ReqwestExt, ResponseError}, -}; - -use super::urls::StatusUrl; - -const PUBLIC_PROOF_OF_INDEXING_QUERY_DOCUMENT: &str = r#" - query publicPois($requests: [PublicProofOfIndexingRequest!]!) { - publicProofsOfIndexing(requests: $requests) { - deployment - proofOfIndexing - block { number } - } - }"#; - -/// Errors that can occur while fetching the indexer's public POIs. -#[derive(Clone, Debug, thiserror::Error)] -pub enum Error { - /// The request failed. - #[error("request error: {0}")] - Request(String), - - /// Invalid response. - /// - /// The response could not be deserialized or is missing required fields. - #[error("invalid response: {0}")] - InvalidResponse(String), - - /// The response did not contain any public POIs. - #[error("empty response")] - EmptyResponse, -} - -#[derive(Clone, Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct PublicProofOfIndexingRequest { - deployment: DeploymentId, - block_number: BlockNumber, -} - -impl From<(DeploymentId, BlockNumber)> for PublicProofOfIndexingRequest { - fn from((deployment, block_number): (DeploymentId, BlockNumber)) -> Self { - Self { - deployment, - block_number, - } - } -} - -/// Send a request to the indexer to get the Public POIs of the given deployment-block number pairs. -pub async fn send_request( - client: &reqwest::Client, - url: StatusUrl, - pois: impl IntoIterator, -) -> Result, Error> { - let resp = client - .post(url.into_inner()) - .send_graphql::(Request::new(pois)) - .await - .map_err(|err| match err { - RequestError::RequestSerializationError(..) => { - unreachable!("request serialization should not fail") - } - RequestError::RequestSendError(..) | RequestError::ResponseRecvError(..) => { - Error::Request(err.to_string()) - } - RequestError::ResponseDeserializationError { .. } => { - Error::InvalidResponse(err.to_string()) - } - })? - .map_err(|err| match err { - ResponseError::Failure { .. } => Error::Request(err.to_string()), - ResponseError::Empty => Error::EmptyResponse, - })?; - - if resp.public_proofs_of_indexing.is_empty() { - return Err(Error::EmptyResponse); - } - - Ok(resp.public_proofs_of_indexing) -} - -#[derive(Clone, Debug)] -pub struct Request { - document: Document, - var_requests: Vec, -} - -impl Request { - pub fn new<'a>(requests: impl IntoIterator) -> Self { - Self { - document: PUBLIC_PROOF_OF_INDEXING_QUERY_DOCUMENT.into_document(), - var_requests: requests.into_iter().copied().map(Into::into).collect(), - } - } -} - -impl IntoDocumentWithVariables for Request { - type Variables = serde_json::Value; - - fn into_document_with_variables(self) -> (Document, Self::Variables) { - ( - self.document, - serde_json::json!({ "requests": self.var_requests }), - ) - } -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -struct Response { - public_proofs_of_indexing: Vec, -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct PublicProofOfIndexingResult { - pub deployment: DeploymentId, - pub block: PartialBlockPtr, - pub proof_of_indexing: Option, -} - -#[serde_as] -#[derive(Debug, Deserialize)] -pub struct PartialBlockPtr { - #[serde_as(as = "DisplayFromStr")] - pub number: BlockNumber, -} - -#[cfg(test)] -mod tests { - use thegraph_core::{deployment_id, proof_of_indexing as poi}; - - use super::Response; - - #[test] - fn deserialize_public_pois_response() { - //* Given - let response = r#"{ - "publicProofsOfIndexing": [ - { - "deployment": "QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH", - "proofOfIndexing": "0xba8a057796a81e013789789996551bb5b2920fb9947334db956992f7098bd287", - "block": { - "number": "123" - } - }, - { - "deployment": "QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw", - "block": { - "number": "456" - } - } - ] - }"#; - - //* When - let response = serde_json::from_str::(response); - - //* Then - let response = response.expect("deserialization failed"); - - assert_eq!(response.public_proofs_of_indexing.len(), 2); - assert_eq!( - response.public_proofs_of_indexing[0].deployment, - deployment_id!("QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH") - ); - assert_eq!( - response.public_proofs_of_indexing[0].proof_of_indexing, - Some(poi!( - "ba8a057796a81e013789789996551bb5b2920fb9947334db956992f7098bd287" - )) - ); - assert_eq!(response.public_proofs_of_indexing[0].block.number, 123); - assert_eq!( - response.public_proofs_of_indexing[1].deployment, - deployment_id!("QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw") - ); - assert_eq!( - response.public_proofs_of_indexing[1].proof_of_indexing, - None - ); - assert_eq!(response.public_proofs_of_indexing[1].block.number, 456); - } -} diff --git a/src/indexers/urls.rs b/src/indexers/urls.rs deleted file mode 100644 index 2b0599cfd..000000000 --- a/src/indexers/urls.rs +++ /dev/null @@ -1,110 +0,0 @@ -use std::borrow::Borrow; - -use url::Url; - -/// Builds the URL to query the version of the indexer. -/// -/// # Panics -/// The function panics if the URL cannot be built. -pub fn version_url>(url: U) -> VersionUrl { - let url = url - .borrow() - .join("version/") - .expect("failed to build indexer version URL"); - VersionUrl(url) -} - -/// Builds the URL to the status endpoint of the indexer. -/// -/// # Panics -/// The function panics if the URL cannot be built. -pub fn status_url>(url: U) -> StatusUrl { - let url = url - .borrow() - .join("status/") - .expect("failed to build indexer status URL"); - StatusUrl(url) -} - -/// Builds the URL to the cost model endpoint of the indexer. -/// -/// # Panics -/// The function panics if the URL cannot be built. -pub fn cost_url>(url: U) -> CostUrl { - let url = url - .borrow() - .join("cost/") - .expect("failed to build indexer cost URL"); - CostUrl(url) -} - -/// Newtype wrapper around `Url` to provide type safety. -macro_rules! url_new_type { - ($name:ident) => { - /// Newtype wrapper around `Url` to provide type safety. - #[derive(Clone, PartialEq, Eq, Hash)] - pub struct $name(Url); - - impl $name { - /// Return the internal representation. - pub(super) fn into_inner(self) -> Url { - self.0 - } - } - - impl std::fmt::Display for $name { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - std::fmt::Display::fmt(&self.0, f) - } - } - - impl std::fmt::Debug for $name { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - std::fmt::Display::fmt(&self, f) - } - } - - impl AsRef for $name { - fn as_ref(&self) -> &Url { - &self.0 - } - } - - impl std::ops::Deref for $name { - type Target = Url; - - fn deref(&self) -> &Self::Target { - &self.0 - } - } - }; -} - -url_new_type!(VersionUrl); -url_new_type!(StatusUrl); -url_new_type!(CostUrl); - -#[cfg(test)] -mod tests { - use url::Url; - - use super::{cost_url, status_url, version_url}; - - /// Ensure the different URL builder functions accept owned and borrowed URL parameters. - #[test] - fn check_url_builders() { - let url = Url::parse("http://localhost:8020").expect("Invalid URL"); - - // Version URL - let _ = version_url(&url); - let _ = version_url(url.clone()); - - // Status URL - let _ = status_url(&url); - let _ = status_url(url.clone()); - - // Cost URL - let _ = cost_url(&url); - let _ = cost_url(url.clone()); - } -} diff --git a/src/indexers/version.rs b/src/indexers/version.rs deleted file mode 100644 index 18e8c61c1..000000000 --- a/src/indexers/version.rs +++ /dev/null @@ -1,64 +0,0 @@ -use semver::Version; -use serde::Deserialize; -use thegraph_graphql_http::http_client::ReqwestExt as _; - -use super::urls::{StatusUrl, VersionUrl}; - -/// Fetches the version of the indexer service at the given URL. -/// -/// This function sends a GET request to the indexer service's `/version` endpoint. -pub async fn fetch_indexer_service_version( - client: &reqwest::Client, - url: VersionUrl, -) -> anyhow::Result { - let response = client - .get(url.into_inner()) - .send() - .await? - .json::() - .await?; - Ok(response.version) -} - -/// Fetches the version of the graph-node service at the given URL. -/// -/// Sends a POST request to the graph-node service's `/status` endpoint with the query -/// `"{ version { version } }"`. -pub async fn fetch_graph_node_version( - client: &reqwest::Client, - url: StatusUrl, -) -> anyhow::Result { - let query = "{ version { version } }"; - let response: GraphNodeVersion = client.post(url.into_inner()).send_graphql(query).await??; - Ok(response.version.version) -} - -#[derive(Debug, Deserialize)] -struct IndexerVersion { - version: Version, -} - -#[derive(Debug, Deserialize)] -struct GraphNodeVersion { - version: IndexerVersion, -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn deserialize_indexer_version_json() { - //* Given - let json = serde_json::json!({ - "version": "0.1.0" - }); - - //* When - let res = serde_json::from_value(json); - - //* Then - let version: IndexerVersion = res.expect("Failed to deserialize IndexerVersion"); - assert_eq!(version.version, Version::new(0, 1, 0)); - } -} diff --git a/src/main.rs b/src/main.rs index 1a0764e98..6d6675b22 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,6 @@ mod exchange_rate; mod graphql; mod http_ext; mod indexer_client; -mod indexers; mod indexing_performance; mod metrics; mod middleware; @@ -21,8 +20,6 @@ mod receipts; mod reports; mod subgraph_studio; mod time; -#[allow(dead_code)] -mod ttl_hash_map; mod unattestable_errors; use std::{ diff --git a/src/network.rs b/src/network.rs index bb26294ca..547886869 100644 --- a/src/network.rs +++ b/src/network.rs @@ -1,19 +1,30 @@ -//! Ad-hoc implementation of the network resolution service for the Graph Gateway. This service -//! provides information about the subgraphs (and subgraph deployments) registered in the network -//! smart contract, as well as the indexers that are indexing them. - -pub use errors::{DeploymentError, ResolutionError, SubgraphError, UnavailableReason}; -pub use internal::{Indexing, IndexingId}; +pub use errors::{DeploymentError, SubgraphError}; pub use service::{NetworkService, ResolvedSubgraphInfo}; +pub use snapshot::{Indexing, IndexingId}; +use thegraph_graphql_http::graphql::{IntoDocument as _, IntoDocumentWithVariables}; -mod config; +pub mod cost_model; mod errors; -pub mod indexer_host_resolver; -pub mod indexer_indexing_cost_model_resolver; -pub mod indexer_indexing_poi_blocklist; -pub mod indexer_indexing_poi_resolver; -pub mod indexer_indexing_progress_resolver; -pub mod indexer_version_resolver; -pub mod internal; +pub mod host_filter; +mod indexer_processing; +pub mod indexing_progress; +pub mod poi_filter; +mod pre_processing; pub mod service; +mod snapshot; pub mod subgraph_client; +mod subgraph_processing; +pub mod version_filter; + +pub struct GraphQlRequest { + pub document: String, + pub variables: serde_json::Value, +} +impl IntoDocumentWithVariables for GraphQlRequest { + type Variables = serde_json::Value; + fn into_document_with_variables( + self, + ) -> (thegraph_graphql_http::graphql::Document, Self::Variables) { + (self.document.into_document(), self.variables) + } +} diff --git a/src/network/config.rs b/src/network/config.rs deleted file mode 100644 index 9e78d5204..000000000 --- a/src/network/config.rs +++ /dev/null @@ -1,19 +0,0 @@ -use semver::Version; - -/// The minimum version requirements for the indexer. -#[derive(Debug, Clone)] -pub struct VersionRequirements { - /// The minimum indexer version. - pub min_indexer_service_version: Version, - /// The minimum graph node version. - pub min_graph_node_version: Version, -} - -impl Default for VersionRequirements { - fn default() -> Self { - Self { - min_indexer_service_version: Version::new(0, 0, 0), - min_graph_node_version: Version::new(0, 0, 0), - } - } -} diff --git a/src/network/indexer_indexing_cost_model_resolver.rs b/src/network/cost_model.rs similarity index 51% rename from src/network/indexer_indexing_cost_model_resolver.rs rename to src/network/cost_model.rs index 95c9919c8..b1d30af7f 100644 --- a/src/network/indexer_indexing_cost_model_resolver.rs +++ b/src/network/cost_model.rs @@ -1,48 +1,24 @@ -//! Resolves the cost models for the indexers' deployments. -//! -//! The cost models are fetched from the indexer's cost URL. - -use std::{collections::HashMap, time::Duration}; +use std::collections::HashMap; use thegraph_core::DeploymentId; +use thegraph_graphql_http::http_client::ReqwestExt; use url::Url; -use crate::{indexers, indexers::cost_models::CostModelSource}; +use crate::network::GraphQlRequest; -/// Resolve the indexers' cost models sources and compile them into cost models. pub struct CostModelResolver { - client: reqwest::Client, - timeout: Duration, + http: reqwest::Client, cache: parking_lot::Mutex>, } impl CostModelResolver { - pub fn new(client: reqwest::Client, timeout: Duration) -> Self { + pub fn new(http: reqwest::Client) -> Self { Self { - client, - timeout, + http, cache: Default::default(), } } - async fn fetch_cost_model_sources( - &self, - url: &Url, - indexings: &[DeploymentId], - ) -> anyhow::Result> { - let indexer_cost_url = indexers::cost_url(url); - tokio::time::timeout( - self.timeout, - indexers::cost_models::send_request(&self.client, indexer_cost_url, indexings), - ) - .await? - .map_err(Into::into) - } - - /// Fetches the cost model sources for the given deployments from the indexer. - /// - /// Returns a map of deployment IDs to the retrieved cost model sources. If certain deployment - /// ID's cost model fetch fails, the corresponding value in the map is `None`. pub async fn resolve( &self, url: &Url, @@ -51,7 +27,7 @@ impl CostModelResolver { let sources = match self.fetch_cost_model_sources(url, indexings).await { Ok(sources) => sources, Err(cost_model_err) => { - tracing::debug!(%url, %cost_model_err); + tracing::debug!(%cost_model_err); return self.cache.lock().clone(); } }; @@ -59,12 +35,53 @@ impl CostModelResolver { // Only support cost models of the form `default => x;`. let cost_models: HashMap = sources .into_iter() - .filter_map(|src| Some((src.deployment, parse_simple_cost_model(&src.model)?))) + .filter_map(|(deployment, src)| Some((deployment, parse_simple_cost_model(&src)?))) .collect(); *self.cache.lock() = cost_models.clone(); cost_models } + + async fn fetch_cost_model_sources( + &self, + url: &Url, + deployments: &[DeploymentId], + ) -> anyhow::Result> { + // ref: df8e647b-1e6e-422a-8846-dc9ee7e0dcc2 + let url = url.join("cost").unwrap(); + + let query = r#" + query costModels($deployments: [String!]!) { + costModels(deployments: $deployments) { + deployment + model + } + } + "#; + #[derive(serde::Deserialize)] + #[serde(rename_all = "camelCase")] + struct Response { + cost_models: Vec, + } + #[derive(serde::Deserialize)] + pub struct CostModelSource { + pub deployment: DeploymentId, + pub model: Option, + } + let resp = self + .http + .post(url) + .send_graphql::(GraphQlRequest { + document: query.to_string(), + variables: serde_json::json!({ "deployments": deployments }), + }) + .await??; + Ok(resp + .cost_models + .into_iter() + .filter_map(|CostModelSource { deployment, model }| Some((deployment, model?))) + .collect()) + } } fn parse_simple_cost_model(src: &str) -> Option { diff --git a/src/network/errors.rs b/src/network/errors.rs index 8057c9435..060d61b06 100644 --- a/src/network/errors.rs +++ b/src/network/errors.rs @@ -1,11 +1,3 @@ -use semver::Version; - -use crate::network::{ - indexer_host_resolver::ResolutionError as HostResolutionError, - indexer_version_resolver::ResolutionError as VersionResolutionError, -}; - -/// Subgraph validation error. #[derive(Clone, Debug, thiserror::Error)] pub enum SubgraphError { /// No allocations were found for the subgraph. @@ -17,146 +9,9 @@ pub enum SubgraphError { NoValidVersions, } -/// Deployment validation error #[derive(Clone, Debug, thiserror::Error)] pub enum DeploymentError { /// No allocations were found for the subgraph. #[error("no allocations")] NoAllocations, } - -#[derive(Debug, Clone, thiserror::Error)] -pub enum ResolutionError { - /// The indexing is unavailable. - #[error(transparent)] - Unavailable(UnavailableReason), - - /// Errors that should only occur in exceptional conditions. - #[error("internal error: {0}")] - Internal(&'static str), -} - -#[derive(Debug, Clone, thiserror::Error)] -pub enum UnavailableReason { - #[error("blocked ({0})")] - Blocked(String), - #[error("{0}")] - IndexerResolutionError(&'static str), - #[error("indexer service version below the minimum required")] - IndexerServiceVersionBelowMin, - #[error("graph node version below the minimum required")] - GraphNodeVersionBelowMin, - #[error("indexing progress not found")] - IndexingProgressNotFound, -} - -impl From for ResolutionError { - fn from(error: IndexingError) -> Self { - match error { - IndexingError::Indexer(err) => { - let reason = match err { - IndexerInfoResolutionError::BlockedHost => { - UnavailableReason::Blocked("host".to_string()) - } - IndexerInfoResolutionError::HostResolutionFailed(err) => { - tracing::debug!(error=?err, "host resolution failed"); - - let reason = match err { - HostResolutionError::InvalidUrl(_) => "invalid indexer URL", - HostResolutionError::Dns(_) => "indexer URL DNS resolution failed", - HostResolutionError::Timeout => { - "indexer URL DNS resolution failed (timeout)" - } - }; - UnavailableReason::IndexerResolutionError(reason) - } - IndexerInfoResolutionError::IndexerServiceVersionResolutionFailed(err) => { - let reason = match err { - VersionResolutionError::FetchError(_) => { - "indexer service version resolution failed" - } - VersionResolutionError::Timeout => { - "indexer service version resolution failed (timeout)" - } - }; - UnavailableReason::IndexerResolutionError(reason) - } - IndexerInfoResolutionError::IndexerServiceVersionBelowMin(..) => { - UnavailableReason::IndexerServiceVersionBelowMin - } - IndexerInfoResolutionError::GraphNodeVersionResolutionFailed(err) => { - tracing::debug!(error=?err, "graph node version resolution failed"); - - let reason = match err { - VersionResolutionError::FetchError(_) => { - "graph node version resolution failed" - } - VersionResolutionError::Timeout => { - "graph node version resolution failed (timeout)" - } - }; - UnavailableReason::IndexerResolutionError(reason) - } - IndexerInfoResolutionError::GraphNodeVersionBelowMin(..) => { - UnavailableReason::GraphNodeVersionBelowMin - } - }; - ResolutionError::Unavailable(reason) - } - IndexingError::Indexing(err) => { - let reason = match err { - IndexingInfoResolutionError::Blocked(reason) => { - UnavailableReason::Blocked(reason) - } - IndexingInfoResolutionError::IndexingProgressNotFound => { - UnavailableReason::IndexingProgressNotFound - } - }; - ResolutionError::Unavailable(reason) - } - IndexingError::Internal(reason) => ResolutionError::Internal(reason), - } - } -} - -/// Indexing error. -#[derive(Clone, Debug, thiserror::Error)] -pub enum IndexingError { - #[error(transparent)] - Indexer(#[from] IndexerInfoResolutionError), - - #[error(transparent)] - Indexing(#[from] IndexingInfoResolutionError), - - #[error("internal error: {0}")] - Internal(&'static str), -} - -/// Errors when processing the indexer information. -#[derive(Clone, Debug, thiserror::Error)] -pub enum IndexerInfoResolutionError { - #[error("indexer host blocked")] - BlockedHost, - #[error("indexer host resolution failed: {0}")] - HostResolutionFailed(#[from] HostResolutionError), - #[error("indexer service version resolution failed: {0}")] - IndexerServiceVersionResolutionFailed(VersionResolutionError), - #[error("service version {0} below the minimum required {1}")] - IndexerServiceVersionBelowMin(Version, Version), - #[error("graph node version resolution failed: {0}")] - #[allow(dead_code)] // TODO: Remove once the graph node version requirement is enforced - GraphNodeVersionResolutionFailed(VersionResolutionError), - #[error("graph node version {0} below the minimum required {1}")] - GraphNodeVersionBelowMin(Version, Version), -} - -/// Error when processing the indexer's indexing information. -#[derive(Clone, Debug, thiserror::Error)] -pub enum IndexingInfoResolutionError { - /// The indexing has been blocked by the public POIs blocklist. - #[error("indexing blocked: {0}")] - Blocked(String), - /// The indexing progress information was not found. - #[error("indexing progress information not found")] - IndexingProgressNotFound, -} diff --git a/src/network/host_filter.rs b/src/network/host_filter.rs new file mode 100644 index 000000000..88804073c --- /dev/null +++ b/src/network/host_filter.rs @@ -0,0 +1,72 @@ +use std::{ + collections::{HashMap, HashSet}, + net::IpAddr, + time::Duration, +}; + +use hickory_resolver::TokioAsyncResolver as DnsResolver; +use ipnetwork::IpNetwork; +use parking_lot::RwLock; +use url::{Host, Url}; + +use crate::errors::UnavailableReason; + +pub struct HostFilter { + blocklist: HashSet, + resolver: DnsResolver, + cache: RwLock>>, +} + +impl HostFilter { + pub fn new(blocklist: HashSet) -> anyhow::Result { + Ok(Self { + blocklist, + resolver: DnsResolver::tokio_from_system_conf()?, + cache: Default::default(), + }) + } + + pub async fn check(&self, url: &Url) -> Result<(), UnavailableReason> { + if self.blocklist.is_empty() { + return Ok(()); + } + + let host_str = url.host_str().ok_or_else(UnavailableReason::invalid_url)?; + let cached_lookup = { + let cache = self.cache.read(); + cache.get(host_str).cloned() + }; + let lookup = match cached_lookup { + Some(lookup) => lookup, + None => { + let host = url.host().ok_or_else(UnavailableReason::invalid_url)?; + let lookup = match host { + Host::Ipv4(ip) => vec![IpAddr::V4(ip)], + Host::Ipv6(ip) => vec![IpAddr::V6(ip)], + Host::Domain(host) => self.resolve_host(host).await.map_err(|_| { + UnavailableReason::NoStatus("unable to resolve host".to_string()) + })?, + }; + self.cache + .write() + .insert(host_str.to_string(), lookup.clone()); + lookup + } + }; + + if lookup + .into_iter() + .any(|ip| self.blocklist.iter().any(|net| net.contains(ip))) + { + return Err(UnavailableReason::Blocked("bad host".to_string())); + } + + Ok(()) + } + + async fn resolve_host(&self, host: &str) -> anyhow::Result> { + let lookup = + tokio::time::timeout(Duration::from_secs(5), self.resolver.lookup_ip(host)).await??; + Ok(lookup.into_iter().collect()) + } +} diff --git a/src/network/indexer_host_resolver.rs b/src/network/indexer_host_resolver.rs deleted file mode 100644 index f41df9902..000000000 --- a/src/network/indexer_host_resolver.rs +++ /dev/null @@ -1,119 +0,0 @@ -//! Resolves the IP address of a URL host. -//! -//! This module provides a resolver for URL hosts. The resolver caches the results of host -//! resolution to avoid repeated DNS lookups. -use std::{borrow::Borrow, collections::HashMap, net::IpAddr, time::Duration}; - -use hickory_resolver::{error::ResolveError, TokioAsyncResolver as DnsResolver}; -use parking_lot::RwLock; -use url::{Host, Url}; - -/// Error that can occur during URL host resolution. -#[derive(Clone, Debug, thiserror::Error)] -pub enum ResolutionError { - /// The URL is invalid. - /// - /// For example, the URL does not contain a host. - #[error("invalid URL: {0}")] - InvalidUrl(String), - - /// Failed to resolve the host. - /// - /// This error occurs when the host could not be resolved to an IP address. - /// - /// This is a wrapper around [`ResolveError`]. - #[error("dns resolution error: {0}")] - Dns(#[from] ResolveError), - - /// Resolution timed out. - #[error("timeout")] - Timeout, -} - -impl ResolutionError { - /// Create a new [`ResolutionError::InvalidUrl`] error from an invalid URL error. - pub fn invalid_url(reason: E) -> Self { - Self::InvalidUrl(reason.to_string()) - } -} - -/// A resolver for URL hosts. -/// -/// This resolver caches the results of host resolution to avoid repeated DNS lookups. -pub struct HostResolver { - inner: DnsResolver, - cache: RwLock, ResolutionError>>>, - timeout: Duration, -} - -impl HostResolver { - pub fn new(timeout: Duration) -> anyhow::Result { - Ok(Self { - inner: DnsResolver::tokio_from_system_conf()?, - cache: Default::default(), - timeout, - }) - } - - /// Resolve the IP address of the given domain with a timeout. - async fn resolve_domain(&self, domain: &str) -> Result, ResolutionError> { - tokio::time::timeout(self.timeout, self.inner.lookup_ip(domain)) - .await - .map_err(|_| ResolutionError::Timeout)? - .map_err(Into::into) - .map(FromIterator::from_iter) - } - - /// Gets the cached DNS resolution result for the given host. - /// - /// This method locks the cache in read mode and returns the cached information. - fn get_from_cache(&self, host: &str) -> Option, ResolutionError>> { - let cache_read = self.cache.read(); - cache_read.get(host).cloned() - } - - /// Updates the cache with the given DNS resolution result. - /// - /// This method locks the cache in write mode and updates the cache with the given progress - /// information. - fn update_cache(&self, host: &str, res: Result, ResolutionError>) { - let mut cache_write = self.cache.write(); - cache_write.insert(host.to_owned(), res); - } - - /// Resolve the IP address of the given URL. - /// - /// The URL is resolved to an IP address. The result is cached so that subsequent calls with the - /// same URL will return the same result. - pub async fn resolve_url>( - &self, - url: U, - ) -> Result, ResolutionError> { - let url = url.borrow(); - - // Check if the result is already cached, otherwise resolve the URLs' associated IP - // addresses - let host_str = url - .host_str() - .ok_or(ResolutionError::invalid_url("no host"))?; - - match self.get_from_cache(host_str) { - Some(state) => state, - None => { - // Resolve the URL IP addresses - let host = url.host().ok_or(ResolutionError::invalid_url("no host"))?; - - let resolution = match host { - Host::Ipv4(ip) => Ok(vec![IpAddr::V4(ip)]), - Host::Ipv6(ip) => Ok(vec![IpAddr::V6(ip)]), - Host::Domain(domain) => self.resolve_domain(domain).await, - }; - - // Cache the result - self.update_cache(host_str, resolution.clone()); - - resolution - } - } - } -} diff --git a/src/network/indexer_indexing_poi_blocklist.rs b/src/network/indexer_indexing_poi_blocklist.rs deleted file mode 100644 index 1543f872f..000000000 --- a/src/network/indexer_indexing_poi_blocklist.rs +++ /dev/null @@ -1,83 +0,0 @@ -//! This module contains the [`PoiBlocklist`] struct, which is used to block indexers based -//! on their Proof of Indexing (POI) information. -//! -//! Given a list of blocked POIs, the blocklist checks if an indexer reports any of them as public -//! POIs. If a match is found, the indexer is blocked for the associated deployment ID. -//! -//! The blocklist caches the blocklist state for each indexer, so that subsequent checks against the -//! same indexer are fast. The cached entries are considered expired after a given TTL. - -use std::collections::{HashMap, HashSet}; - -use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId, ProofOfIndexing}; - -use crate::config::BlockedPoi; - -/// A blocklist based on the Proof of Indexing (POI) of indexers. -#[derive(Default)] -pub struct PoiBlocklist { - blocklist: HashMap>, -} - -impl PoiBlocklist { - pub fn new(conf: Vec) -> Self { - let mut blocklist: HashMap> = - Default::default(); - for info in conf.into_iter() { - blocklist - .entry(info.deployment) - .or_default() - .push((info.block_number, info.public_poi.into())); - } - Self { blocklist } - } - - pub fn is_empty(&self) -> bool { - self.blocklist.is_empty() - } - - /// Get a list of POIs metadata that are affected. - /// - /// If none of the deployments are affected, an empty list is returned. This allows to avoid - /// querying the indexer for POIs if none of its deployments is affected. - pub fn affected_pois_metadata<'a>( - &self, - deployments: impl IntoIterator, - ) -> Vec<(DeploymentId, BlockNumber)> { - deployments - .into_iter() - .flat_map(|deployment| { - self.blocklist.get(deployment).into_iter().flat_map(|pois| { - pois.iter() - .map(|(block_number, _)| (*deployment, *block_number)) - }) - }) - .collect() - } - - /// Return deployments with blocked POIs. - pub fn check( - &self, - pois: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing>, - ) -> HashSet { - pois.iter() - .filter(|((deployment_id, block_number), poi)| { - self.check_poi(*deployment_id, *block_number, **poi) - }) - .map(|((deployment_id, _), _)| *deployment_id) - .collect() - } - - /// Check if the POI is in the blocklist. - fn check_poi( - &self, - deployment: DeploymentId, - block_number: BlockNumber, - poi: ProofOfIndexing, - ) -> bool { - match self.blocklist.get(&deployment) { - None => false, - Some(blocked_pois) => blocked_pois.contains(&(block_number, poi)), - } - } -} diff --git a/src/network/indexer_indexing_poi_resolver.rs b/src/network/indexer_indexing_poi_resolver.rs deleted file mode 100644 index 666f844f7..000000000 --- a/src/network/indexer_indexing_poi_resolver.rs +++ /dev/null @@ -1,209 +0,0 @@ -//! A resolver for the Proof of Indexing (POI) of indexers. -//! -//! The resolver fetches the public POIs of indexers based on the given POIs metadata. It caches the -//! results of these requests to avoid making the same request multiple times. -//! -//! The cache has a TTL of 20 minutes. Entries are considered expired after this time causing the -//! resolver to fetch the public POIs of the indexer again. - -use std::{collections::HashMap, time::Duration}; - -use parking_lot::RwLock; -use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId, ProofOfIndexing}; -use url::Url; - -use crate::{ - indexers, indexers::public_poi::Error as PublicPoiFetchError, ttl_hash_map::TtlHashMap, -}; - -/// The number of Public POI queries in a single request. -const POIS_PER_REQUEST_BATCH_SIZE: usize = 10; - -/// Error that can occur during POI resolution. -#[derive(Clone, Debug, thiserror::Error)] -pub enum ResolutionError { - /// An error occurred while fetching the Public POIs of the indexer. - /// - /// This includes network errors, timeouts, and deserialization errors. - #[error("fetch error: {0}")] - FetchError(#[from] PublicPoiFetchError), - - /// Resolution timed out. - #[error("timeout")] - Timeout, -} - -/// A resolver for the Proof of Indexing (POI) of indexers. Results are cached for some TTL to avoid -/// making the same request multiple times. -#[allow(clippy::type_complexity)] -pub struct PoiResolver { - client: reqwest::Client, - cache: RwLock>, - timeout: Duration, -} - -impl PoiResolver { - /// Create a new [`PoiResolver`] with the given timeout and cache TTL. - pub fn new(client: reqwest::Client, timeout: Duration, cache_ttl: Duration) -> Self { - Self { - client, - timeout, - cache: RwLock::new(TtlHashMap::with_ttl(cache_ttl)), - } - } - - /// Fetch the public POIs of the indexer based on the given POIs metadata. - async fn fetch_indexer_public_pois( - &self, - url: &Url, - pois: &[(DeploymentId, BlockNumber)], - ) -> HashMap<(DeploymentId, BlockNumber), Result> { - let status_url = indexers::status_url(url); - let res = tokio::time::timeout( - self.timeout, - send_requests(&self.client, status_url, pois, POIS_PER_REQUEST_BATCH_SIZE), - ) - .await; - - match res { - Ok(res) => res - .into_iter() - .map(|(meta, result)| (meta, result.map_err(Into::into))) - .collect(), - // If the request timed out, return a timeout error for all deployment-block number pairs - Err(_) => pois - .iter() - .map(|meta| (*meta, Err(ResolutionError::Timeout))) - .collect(), - } - } - - /// Gets the cached Public POIs information for the given deployment-block number pairs. - /// - /// This method locks the cache in read mode and returns the cached information. - fn get_from_cache<'a>( - &self, - url: &str, - keys: impl IntoIterator, - ) -> HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> { - let cache_read = self.cache.read(); - let mut result = HashMap::new(); - - for key in keys { - match cache_read.get(&(url.to_owned(), *key)) { - Some(value) => { - result.insert(*key, *value); - } - None => continue, - } - } - - result - } - - /// Updates the cache with the given Public POIs information. - /// - /// This method locks the cache in write mode and updates the cache with the given progress - /// information. - fn update_cache<'a>( - &self, - url: &str, - data: impl IntoIterator, - ) { - let mut cache_write = self.cache.write(); - for (key, value) in data { - cache_write.insert((url.to_owned(), *key), *value); - } - } - - /// Resolve the public POIs of the indexer based on the given POIs metadata. - /// - /// If the public POIs of the indexer are already in the cache, the resolver returns them. - pub async fn resolve( - &self, - url: &Url, - poi_requests: &[(DeploymentId, BlockNumber)], - ) -> HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> { - let url_string = url.to_string(); - let mut results = self.get_from_cache(&url_string, poi_requests); - let missing_requests: Vec<(DeploymentId, BlockNumber)> = poi_requests - .iter() - .filter(|r| !results.contains_key(r)) - .cloned() - .collect(); - if missing_requests.is_empty() { - return results; - } - - let fetched: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> = self - .fetch_indexer_public_pois(url, &missing_requests) - .await - .into_iter() - .filter_map(|(key, result)| match result { - Ok(poi) => Some((key, poi)), - Err(poi_fetch_err) => { - tracing::warn!(%poi_fetch_err, ?key); - None - } - }) - .collect(); - self.update_cache(&url_string, &fetched); - results.extend(fetched); - results - } -} - -/// Send requests to the indexer to get the Public POIs of the given deployment-block number pairs. -/// -/// Given a list of deployment-block number pairs, the function sends requests to the indexer to get -/// the Public POIs of the indexers. The function batches the queries into groups of `batch_size` -/// and sends them in a single request. All requests are sent concurrently to the indexer. The -/// function returns a map of deployment-block number pairs to the Public POIs of the indexers, or -/// an error if the request failed. -async fn send_requests( - client: &reqwest::Client, - url: indexers::StatusUrl, - poi_requests: &[(DeploymentId, BlockNumber)], - batch_size: usize, -) -> HashMap<(DeploymentId, BlockNumber), Result> { - // Batch the POI queries into groups of `batch_size` - let request_batches = poi_requests.chunks(batch_size); - - // Create a request for each batch - let requests = request_batches.map(|batch| { - let url = url.clone(); - async move { - // Request the indexings' POIs - let response = indexers::public_poi::send_request(client, url.clone(), batch).await; - - let result = match response { - Err(err) => { - // If the request failed, mark all deployment-block number pairs in the batch as - // failed. - return batch - .iter() - .map(|meta| (*meta, Err(err.clone()))) - .collect::>(); - } - Ok(res) => res, - }; - - // Construct a map of deployment IDs to responses - result - .into_iter() - .filter_map(|res| { - Some(( - (res.deployment, res.block.number), - Ok(res.proof_of_indexing?), - )) - }) - .collect::>() - } - }); - - // Send all requests concurrently - let responses = futures::future::join_all(requests).await; - - // Merge the responses into a single map - responses.into_iter().flatten().collect() -} diff --git a/src/network/indexer_indexing_progress_resolver.rs b/src/network/indexer_indexing_progress_resolver.rs deleted file mode 100644 index 3a9dc9c2a..000000000 --- a/src/network/indexer_indexing_progress_resolver.rs +++ /dev/null @@ -1,177 +0,0 @@ -//! A resolver that fetches the indexing progress of deployments from an indexer's status URL. - -use std::{collections::HashMap, time::Duration}; - -use parking_lot::{Mutex, RwLock}; -use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId}; -use url::Url; - -use crate::{ - indexers, - indexers::indexing_progress::{ChainStatus, Error as IndexingProgressFetchError}, -}; - -/// The number of deployments indexing progress to query in a single request. -const INDEXINGS_PER_REQUEST_BATCH_SIZE: usize = 100; - -/// An error that occurred while resolving the indexer's progress. -#[derive(Clone, Debug, thiserror::Error)] -pub enum ResolutionError { - /// An error occurred while fetching the indexer progress. - /// - /// This includes network errors, timeouts, and deserialization errors. - #[error("fetch error: {0}")] - FetchError(#[from] IndexingProgressFetchError), - - /// The resolution timed out. - #[error("timeout")] - Timeout, -} - -/// The indexing progress information of a deployment on a chain. -#[derive(Debug, Clone)] -pub struct IndexingProgressInfo { - /// The latest block number indexed by the indexer. - pub latest_block: BlockNumber, - /// The earliest block number indexed by the indexer. - pub min_block: Option, -} - -/// A resolver that fetches the indexing progress of deployments from an indexer's status URL. -pub struct IndexingProgressResolver { - client: reqwest::Client, - timeout: Duration, - cache: RwLock>>>, -} - -impl IndexingProgressResolver { - pub fn new(client: reqwest::Client, timeout: Duration) -> Self { - Self { - client, - timeout, - cache: Default::default(), - } - } - - /// Fetches the indexing progress of the given deployments from the indexer's status URL. - async fn fetch_indexing_progress( - &self, - url: &Url, - indexings: &[DeploymentId], - ) -> HashMap, ResolutionError>> { - let status_url = indexers::status_url(url); - let res = tokio::time::timeout( - self.timeout, - send_requests( - &self.client, - status_url, - indexings, - INDEXINGS_PER_REQUEST_BATCH_SIZE, - ), - ) - .await; - - match res { - Ok(res) => res - .into_iter() - .map(|(deployment_id, result)| (deployment_id, result.map_err(Into::into))) - .collect(), - // If the request timed out, return a timeout error for all deployments - Err(_) => indexings - .iter() - .map(|deployment_id| (*deployment_id, Err(ResolutionError::Timeout))) - .collect(), - } - } - - /// Resolves the indexing progress of the given deployments. - /// - /// The function fetches the indexing progress of the given deployments from the indexer's - /// status URL. If the fetch fails, the function returns the cached data for the failed - /// deployments. If the fetch succeeds, the function updates the cache with the fetched data. - /// - /// Returns a map of deployment IDs to their indexing progress information. - pub async fn resolve( - &self, - url: &Url, - indexer_deployments: &[DeploymentId], - ) -> HashMap { - let url_string = url.to_string(); - let results = self.fetch_indexing_progress(url, indexer_deployments).await; - - let mut outer_cache = self.cache.read(); - if !outer_cache.contains_key(&url_string) { - drop(outer_cache); - self.cache - .write() - .insert(url_string.clone(), Default::default()); - outer_cache = self.cache.read(); - } - - let mut cache = outer_cache.get(&url_string).unwrap().lock(); - for (deployment, result) in results { - let status = result.ok().and_then(|chains| { - let chain = chains.first()?; - Some(IndexingProgressInfo { - latest_block: chain.latest_block.as_ref().map(|block| block.number)?, - min_block: chain.earliest_block.as_ref().map(|block| block.number), - }) - }); - if let Some(status) = status { - cache.insert(deployment, status); - } - } - cache.clone() - } -} - -/// Sends requests to the indexer's status URL to fetch the indexing progress of deployments. -/// -/// Given a list of deployment IDs, the function groups them into batches of a given size and sends -/// all requests concurrently. If one request fails, the function marks all deployments in the batch -/// as failed. The function returns a map of deployment IDs to the indexing progress information. -async fn send_requests( - client: &reqwest::Client, - url: indexers::StatusUrl, - indexings: &[DeploymentId], - batch_size: usize, -) -> HashMap, IndexingProgressFetchError>> { - // Group the deployments into batches of `batch_size` - let request_batches = indexings.chunks(batch_size); - - // Create a request for each batch - let requests = request_batches.map(|batch| { - let url = url.clone(); - async move { - // Request the indexing progress - let response = - indexers::indexing_progress::send_request(client, url.clone(), batch).await; - - let result = match response { - Err(err) => { - // If the request failed, mark all deployment IDs in the batch as failed - return batch - .iter() - .map(|deployment_id| (*deployment_id, Err(err.clone()))) - .collect::>(); - } - Ok(res) => res, - }; - - // Construct a map of deployment IDs to responses - result - .into_iter() - .filter(|response| { - batch.contains(&response.deployment_id) && !response.chains.is_empty() - }) - .map(|response| (response.deployment_id, Ok(response.chains))) - .collect::>() - } - }); - - // Send all requests concurrently - let responses = futures::future::join_all(requests).await; - - // Merge the responses into a single map - responses.into_iter().flatten().collect() -} diff --git a/src/network/internal/indexer_processing.rs b/src/network/indexer_processing.rs similarity index 55% rename from src/network/internal/indexer_processing.rs rename to src/network/indexer_processing.rs index f3aa7509a..6c710001e 100644 --- a/src/network/internal/indexer_processing.rs +++ b/src/network/indexer_processing.rs @@ -1,31 +1,21 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use custom_debug::CustomDebug; -use ipnetwork::IpNetwork; -use semver::Version; use thegraph_core::{alloy::primitives::BlockNumber, AllocationId, DeploymentId, IndexerId}; use tracing::Instrument; use url::Url; -use super::InternalState; use crate::{ config::BlockedIndexer, - network::{ - config::VersionRequirements, - errors::{IndexerInfoResolutionError, IndexingInfoResolutionError}, - indexer_host_resolver::HostResolver, - indexer_indexing_poi_blocklist::PoiBlocklist, - indexer_indexing_poi_resolver::PoiResolver, - indexer_indexing_progress_resolver::IndexingProgressResolver, - indexer_version_resolver::VersionResolver, - }, + errors::UnavailableReason, + network::{indexing_progress::IndexingProgressResolver, service::InternalState}, }; /// Internal representation of the indexer pre-processed information. /// /// This is not the final representation of the indexer. #[derive(CustomDebug)] -pub(super) struct IndexerRawInfo { +pub struct IndexerRawInfo { /// The indexer's ID. pub id: IndexerId, /// The indexer's URL. @@ -43,7 +33,7 @@ pub(super) struct IndexerRawInfo { /// /// This is not the final representation of the indexer. #[derive(CustomDebug)] -pub(super) struct IndexerInfo { +pub struct IndexerInfo { /// The indexer's ID. pub id: IndexerId, /// The indexer's URL. @@ -54,20 +44,15 @@ pub(super) struct IndexerInfo { /// The total amount of tokens staked by the indexer. pub staked_tokens: u128, - /// The indexer's "indexer service" version. - pub indexer_service_version: Version, - /// The indexer's "graph node" version. - pub graph_node_version: Version, - /// The indexer's indexings information. - pub indexings: HashMap>, + pub indexings: HashMap>, } /// Internal representation of the indexer's indexing information. /// /// This is not the final representation of the indexer's indexing information. #[derive(Clone, Debug, Eq, PartialEq)] -pub(super) struct IndexingRawInfo { +pub struct IndexingRawInfo { /// The largest allocation. pub largest_allocation: AllocationId, /// The total amount of tokens allocated. @@ -79,7 +64,7 @@ pub(super) struct IndexingRawInfo { /// This type uses the "type-state" pattern to represent the different states of the indexing /// information: unresolved, partially resolved (with indexing progress) and completely resolved. #[derive(Debug)] -pub(super) struct IndexingInfo { +pub struct IndexingInfo { /// The largest allocation. pub largest_allocation: AllocationId, @@ -135,13 +120,13 @@ impl IndexingInfo { } } -pub(super) type ResolvedIndexingInfo = IndexingInfo; +pub type ResolvedIndexingInfo = IndexingInfo; -pub(super) type ResolvedIndexerInfo = IndexerInfo; +pub type ResolvedIndexerInfo = IndexerInfo; /// Internal representation of the indexing's progress information. #[derive(Clone, Debug)] -pub(super) struct IndexingProgress { +pub struct IndexingProgress { /// The latest indexed block. pub latest_block: BlockNumber, /// The minimum indexed block. @@ -149,10 +134,10 @@ pub(super) struct IndexingProgress { } /// Process the fetched network topology information. -pub(super) async fn process_info( +pub async fn process_info( state: &InternalState, indexers: &HashMap, -) -> HashMap> { +) -> HashMap> { let processed_info = { let indexers_iter_fut = indexers.iter().map(move |(indexer_id, indexer)| { // Instrument the indexer processing span @@ -160,56 +145,16 @@ pub(super) async fn process_info( "indexer processing", indexer.id = ?indexer.id, indexer.url = %indexer.url, - indexer.indexer_service_version = tracing::field::Empty, - indexer.graph_node_version = tracing::field::Empty, ); tracing::trace!(parent: &indexer_span, "processing"); async move { - // Check if the indexer's host is in the host blocklist - // - // If the indexer host cannot be resolved or is in the blocklist, the indexer must - // be marked as unhealthy - if let Err(err) = resolve_and_check_indexer_blocked_by_host_blocklist( - &state.indexer_host_resolver, - &state.indexer_host_blocklist, - &indexer.url, - ) - .await - { - tracing::debug!(%err); + if let Err(err) = state.indexer_host_filter.check(&indexer.url).await { + return (*indexer_id, Err(err)); + } + if let Err(err) = state.indexer_version_filter.check(&indexer.url).await { return (*indexer_id, Err(err)); } - - // Check if the indexer's reported versions are supported - // - // If the versions cannot be resolved or are not supported, the indexer must be - // marked as unhealthy - let (indexer_service_version, graph_node_version) = - match resolve_and_check_indexer_blocked_by_version( - &state.indexer_version_resolver, - &state.indexer_version_requirements, - &indexer.url, - ) - .await - { - Ok(versions) => versions, - Err(err) => { - tracing::debug!(%err); - return (*indexer_id, Err(err)); - } - }; - - // Update the span information with the resolved versions - tracing::Span::current() - .record( - "indexer.indexer_service_version", - tracing::field::display(&indexer_service_version), - ) - .record( - "indexer.graph_node_version", - tracing::field::display(&graph_node_version), - ); let blocklist = state.indexer_blocklist.get(&*indexer.id); // Resolve the indexer's indexings information @@ -227,8 +172,6 @@ pub(super) async fn process_info( id: indexer.id, url: indexer.url.clone(), staked_tokens: indexer.staked_tokens, - indexer_service_version, - graph_node_version, indexings, }), ) @@ -243,79 +186,13 @@ pub(super) async fn process_info( FromIterator::from_iter(processed_info) } -/// Resolve and check if the indexer's host is in the host blocklist. -/// -/// - If the indexer's host is not resolvable: the indexer is BLOCKED. -/// - If the host blocklist was not configured: the indexer is ALLOWED. -/// - If the indexer's host is in the blocklist: the indexer is BLOCKED. -async fn resolve_and_check_indexer_blocked_by_host_blocklist( - resolver: &HostResolver, - blocklist: &HashSet, - url: &Url, -) -> Result<(), IndexerInfoResolutionError> { - // Resolve the indexer's URL, if it fails (or times out), the indexer must be BLOCKED - let addrs = resolver.resolve_url(url).await?; - - if addrs - .iter() - .any(|addr| blocklist.iter().any(|net| net.contains(*addr))) - { - return Err(IndexerInfoResolutionError::BlockedHost); - } - - Ok(()) -} - -/// Resolve and check if the indexer's reported versions are supported. -async fn resolve_and_check_indexer_blocked_by_version( - resolver: &VersionResolver, - version_requirements: &VersionRequirements, - url: &Url, -) -> Result<(Version, Version), IndexerInfoResolutionError> { - // Resolve the indexer's service version - let indexer_service_version = resolver - .resolve_indexer_service_version(url) - .await - .map_err(IndexerInfoResolutionError::IndexerServiceVersionResolutionFailed)?; - - // Check if the indexer's service version is supported - if indexer_service_version < version_requirements.min_indexer_service_version { - return Err(IndexerInfoResolutionError::IndexerServiceVersionBelowMin( - indexer_service_version, - version_requirements.min_indexer_service_version.clone(), - )); - } - - // Resolve the indexer's graph node version, with a timeout - let graph_node_version = match resolver.resolve_graph_node_version(url).await { - Err(err) => { - // TODO: After more graph nodes support reporting their version, - // we should assume they are on the minimum version if we can't - // get the version. - tracing::trace!("graph-node version resolution failed: {err}"); - version_requirements.min_graph_node_version.clone() - } - Ok(result) => result, - }; - - // Check if the indexer's graph node version is supported - if graph_node_version < version_requirements.min_graph_node_version { - return Err(IndexerInfoResolutionError::GraphNodeVersionBelowMin( - graph_node_version, - version_requirements.min_graph_node_version.clone(), - )); - } - - Ok((indexer_service_version, graph_node_version)) -} - /// Process the indexer's indexings information. async fn process_indexer_indexings( state: &InternalState, url: &Url, indexings: HashMap, blocklist: Option<&BlockedIndexer>, -) -> HashMap> { +) -> HashMap> { let mut indexer_indexings: HashMap, _>> = indexings .into_iter() .map(|(id, info)| (id, Ok(info.into()))) @@ -325,35 +202,31 @@ async fn process_indexer_indexings( None => (), Some(blocklist) if blocklist.deployments.is_empty() => { for entry in indexer_indexings.values_mut() { - *entry = Err(IndexingInfoResolutionError::Blocked( - blocklist.reason.clone(), - )); + *entry = Err(UnavailableReason::Blocked(blocklist.reason.clone())); } } Some(blocklist) => { for deployment in &blocklist.deployments { indexer_indexings.insert( *deployment, - Err(IndexingInfoResolutionError::Blocked( - blocklist.reason.clone(), - )), + Err(UnavailableReason::Blocked(blocklist.reason.clone())), ); } } }; + // ref: df8e647b-1e6e-422a-8846-dc9ee7e0dcc2 + let status_url = url.join("status").unwrap(); + // Keep track of the healthy indexers, so we efficiently resolve the indexer's indexings thar // are not marked as unhealthy in a previous resolution step let mut healthy_indexer_indexings = indexer_indexings.keys().copied().collect::>(); // Check if the indexer's indexings should be blocked by POI - let blocked_indexings_by_poi = resolve_and_check_indexer_indexings_blocked_by_poi( - &state.poi_blocklist, - &state.poi_resolver, - url, - &healthy_indexer_indexings, - ) - .await; + let blocked_indexings_by_poi = state + .indexer_poi_filer + .blocked_deployments(&status_url, &healthy_indexer_indexings) + .await; // Remove the blocked indexings from the healthy indexers list healthy_indexer_indexings.retain(|id| !blocked_indexings_by_poi.contains(id)); @@ -364,7 +237,7 @@ async fn process_indexer_indexings( .map(|(id, res)| { let info = res.and_then(|info| { if blocked_indexings_by_poi.contains(&id) { - Err(IndexingInfoResolutionError::Blocked("bad POI".into())) + Err(UnavailableReason::Blocked("bad PoI".to_string())) } else { Ok(info) } @@ -377,7 +250,7 @@ async fn process_indexer_indexings( // Resolve the indexer's indexing progress information let mut indexing_progress = resolve_indexer_progress( &state.indexing_progress_resolver, - url, + &status_url, &healthy_indexer_indexings, ) .await; @@ -400,7 +273,9 @@ async fn process_indexer_indexings( _ => { return ( id, - Err(IndexingInfoResolutionError::IndexingProgressNotFound), + Err(UnavailableReason::NoStatus( + "failed to get indexing progress".to_string(), + )), ); } }; @@ -429,37 +304,13 @@ async fn process_indexer_indexings( .collect() } -/// Resolve and check if any of the indexer's indexings should be blocked by POI. -async fn resolve_and_check_indexer_indexings_blocked_by_poi( - blocklist: &PoiBlocklist, - resolver: &PoiResolver, - url: &Url, - indexings: &[DeploymentId], -) -> HashSet { - if blocklist.is_empty() { - return Default::default(); - } - - // Get the list of affected POIs to resolve for the indexer's deployments - // If none of the deployments are affected, the indexer must be ALLOWED - let indexer_affected_pois = blocklist.affected_pois_metadata(indexings); - if indexer_affected_pois.is_empty() { - return Default::default(); - } - - // Resolve the indexer public POIs for the affected deployments - let poi_result = resolver.resolve(url, &indexer_affected_pois).await; - - blocklist.check(poi_result) -} - /// Resolve the indexer's progress information. async fn resolve_indexer_progress( resolver: &IndexingProgressResolver, - url: &Url, + status_url: &Url, indexings: &[DeploymentId], -) -> HashMap> { - let mut progress_info = resolver.resolve(url, indexings).await; +) -> HashMap> { + let mut progress_info = resolver.resolve(status_url, indexings).await; // Get the progress information for each indexing indexings @@ -473,7 +324,9 @@ async fn resolve_indexer_progress( latest_block: info.latest_block, min_block: info.min_block, }) - .ok_or(IndexingInfoResolutionError::IndexingProgressNotFound), + .ok_or_else(|| { + UnavailableReason::NoStatus("failed to get indexing progress".to_string()) + }), ) }) .collect::>() diff --git a/src/network/indexer_version_resolver.rs b/src/network/indexer_version_resolver.rs deleted file mode 100644 index ddf534ffb..000000000 --- a/src/network/indexer_version_resolver.rs +++ /dev/null @@ -1,166 +0,0 @@ -//! Indexer versions resolver. -//! -//! The resolver is responsible for fetching the versions of the indexer service and graph-node -//! services. If the version takes more than the timeout to resolve, the resolver will return an -//! error. -//! -//! The resolver will perform better if the client provided has a connection pool with the different -//! indexers, as it will be able to reuse already established connections. - -use std::{collections::HashMap, sync::Arc, time::Duration}; - -use parking_lot::RwLock; -use semver::Version; -use url::Url; - -use crate::indexers; - -/// The error that can occur while resolving the indexer versions. -#[derive(Clone, Debug, thiserror::Error)] -pub enum ResolutionError { - /// An error occurred while querying the indexer version. - #[error("fetch error: {0}")] - FetchError(String), - - /// The resolution timed out. - #[error("timeout")] - Timeout, -} - -/// The indexer versions resolver. -/// -/// The resolver is responsible for fetching the versions of the indexer service and graph-node -/// services. If the version takes more than the timeout to resolve, the resolver will return an -/// error. -#[derive(Clone)] -pub struct VersionResolver { - /// The indexer client. - /// - /// Providing a client with a connection pool with the different indexers will reduce - /// significantly the time to resolve the versions as the resolver will be able to reuse - /// already established connections. - client: reqwest::Client, - - /// The indexer service version resolution timeout. - indexer_service_version_resolution_timeout: Duration, - /// The indexer graph-node version resolution timeout. - graph_node_version_resolution_timeout: Duration, - - /// Cache for the resolved indexer service versions. - indexer_service_version_cache: Arc>>, - /// Cache for the resolved indexer graph-node versions. - graph_node_version_cache: Arc>>, -} - -impl VersionResolver { - pub fn new(client: reqwest::Client, timeout: Duration) -> Self { - Self { - client, - indexer_service_version_resolution_timeout: timeout, - graph_node_version_resolution_timeout: timeout, - indexer_service_version_cache: Default::default(), - graph_node_version_cache: Default::default(), - } - } - - /// Fetches the indexer service version from the given URL. - async fn fetch_indexer_service_version( - &self, - url: &indexers::VersionUrl, - ) -> Result { - tokio::time::timeout( - self.indexer_service_version_resolution_timeout, - indexers::version::fetch_indexer_service_version(&self.client, url.clone()), - ) - .await - .map_err(|_| ResolutionError::Timeout)? - .map_err(|err| ResolutionError::FetchError(err.to_string())) - } - - /// Fetches the indexer graph-node version from the given URL. - async fn fetch_graph_node_version( - &self, - url: &indexers::StatusUrl, - ) -> Result { - tokio::time::timeout( - self.graph_node_version_resolution_timeout, - indexers::version::fetch_graph_node_version(&self.client, url.clone()), - ) - .await - .map_err(|_| ResolutionError::Timeout)? - .map_err(|err| ResolutionError::FetchError(err.to_string())) - } - - /// Resolves the indexer service version. - /// - /// The version resolution time is upper-bounded by the configured timeout. - pub async fn resolve_indexer_service_version( - &self, - url: &Url, - ) -> Result { - let url_string = url.to_string(); - let version_url = indexers::version_url(url); - - let version = match self.fetch_indexer_service_version(&version_url).await { - Ok(version) => version, - Err(err) => { - tracing::debug!( - version_url = url_string, - error = ?err, - "indexer service version resolution failed" - ); - - // Try to get the version from the cache, otherwise return the fetch error - let cache = self.indexer_service_version_cache.read(); - return if let Some(version) = cache.get(&url_string) { - Ok(version.clone()) - } else { - Err(err) - }; - } - }; - - // Update the cache with the resolved version - { - let mut cache = self.indexer_service_version_cache.write(); - cache.insert(url_string, version.clone()); - } - - Ok(version) - } - - /// Resolves the indexer graph-node version. - /// - /// The version resolution time is upper-bounded by the configured timeout. - pub async fn resolve_graph_node_version(&self, url: &Url) -> Result { - let url_string = url.to_string(); - let status_url = indexers::status_url(url); - - let version = match self.fetch_graph_node_version(&status_url).await { - Ok(version) => version, - Err(err) => { - tracing::debug!( - version_url = url_string, - error = ?err, - "indexer graph-node version resolution failed" - ); - - // Try to get the version from the cache, otherwise return the fetch error - let cache = self.graph_node_version_cache.read(); - return if let Some(version) = cache.get(&url_string) { - Ok(version.clone()) - } else { - Err(err) - }; - } - }; - - // Update the cache with the resolved version - { - let mut cache = self.graph_node_version_cache.write(); - cache.insert(url_string, version.clone()); - } - - Ok(version) - } -} diff --git a/src/network/indexing_progress.rs b/src/network/indexing_progress.rs new file mode 100644 index 000000000..fdf1df9ea --- /dev/null +++ b/src/network/indexing_progress.rs @@ -0,0 +1,201 @@ +use std::collections::HashMap; + +use parking_lot::{Mutex, RwLock}; +use serde_with::serde_as; +use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId}; +use thegraph_graphql_http::http_client::ReqwestExt; +use url::Url; + +use super::GraphQlRequest; + +#[derive(Debug, Clone)] +pub struct IndexingProgressInfo { + pub latest_block: BlockNumber, + pub min_block: Option, +} + +pub struct IndexingProgressResolver { + http: reqwest::Client, + cache: RwLock>>>, +} + +impl IndexingProgressResolver { + pub fn new(http: reqwest::Client) -> Self { + Self { + http, + cache: Default::default(), + } + } + + pub async fn resolve( + &self, + status_url: &Url, + deployments: &[DeploymentId], + ) -> HashMap { + let url_string = status_url.to_string(); + + let results = send_requests(&self.http, status_url, deployments).await; + + let mut outer_cache = self.cache.read(); + if !outer_cache.contains_key(&url_string) { + drop(outer_cache); + self.cache + .write() + .insert(url_string.clone(), Default::default()); + outer_cache = self.cache.read(); + } + + let mut cache = outer_cache.get(&url_string).unwrap().lock(); + for (deployment, status) in results { + cache.insert(deployment, status); + } + cache.clone() + } +} + +async fn send_requests( + http: &reqwest::Client, + status_url: &Url, + indexings: &[DeploymentId], +) -> HashMap { + let request_batches = indexings.chunks(100); + let requests = request_batches.map(|batch| { + let status_url = status_url.clone(); + async move { + let result = send_request(http, status_url.clone(), batch).await; + match result { + Ok(response) => response, + Err(indexing_progress_err) => { + tracing::debug!(%indexing_progress_err); + Default::default() + } + } + } + }); + + let responses = futures::future::join_all(requests).await; + responses.into_iter().flatten().collect() +} + +async fn send_request( + client: &reqwest::Client, + status_url: Url, + deployments: &[DeploymentId], +) -> anyhow::Result> { + let query = r#" + query indexingProgress($deployments: [String!]!) { + indexingStatuses(subgraphs: $deployments) { + subgraph + chains { + network + latestBlock { number } + earliestBlock { number } + } + } + }"#; + let response = client + .post(status_url) + .send_graphql::(GraphQlRequest { + document: query.to_string(), + variables: serde_json::json!({ "deployments": deployments }), + }) + .await??; + let statuses = response + .indexing_statuses + .into_iter() + .filter(|info| info.chains.len() == 1) + .filter_map(|info| { + let chain = &info.chains[0]; + let status = IndexingProgressInfo { + latest_block: chain.latest_block.as_ref().map(|block| block.number)?, + min_block: chain.earliest_block.as_ref().map(|block| block.number), + }; + Some((info.deployment_id, status)) + }) + .collect(); + Ok(statuses) +} + +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct Response { + indexing_statuses: Vec, +} +#[derive(Debug, serde::Deserialize)] +pub struct IndexingStatusResponse { + #[serde(rename = "subgraph")] + pub deployment_id: DeploymentId, + pub chains: Vec, +} +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ChainStatus { + pub latest_block: Option, + pub earliest_block: Option, +} +#[serde_as] +#[derive(Debug, serde::Deserialize)] +pub struct BlockStatus { + #[serde_as(as = "serde_with::DisplayFromStr")] + pub number: BlockNumber, +} + +#[cfg(test)] +mod tests { + use super::Response; + + #[test] + fn deserialize_response() { + let response = serde_json::json!({ + "indexingStatuses": [ + { + "subgraph": "QmZTy9EJHu8rfY9QbEk3z1epmmvh5XHhT2Wqhkfbyt8k9Z", + "chains": [ + { + "network": "rinkeby", + "latestBlock": { + "number": "10164818", + "hash": "0xaa94881130ba16c28cc90a5a880b117bdc90b6b11e9cde0c78804cdb93cc9e85" + }, + "earliestBlock": { + "number": "7559999", + "hash": "0x0" + } + } + ] + }, + { + "subgraph": "QmSLQfPFcz2pKRJZUH16Sk26EFpRgdxTYGnMiKvWgKRM2a", + "chains": [ + { + "network": "rinkeby" + } + ] + } + ] + }); + + let response = serde_json::from_value(response); + let response: Response = response.expect("deserialization failed"); + + assert_eq!(response.indexing_statuses.len(), 2); + let status1 = &response.indexing_statuses[0]; + let status2 = &response.indexing_statuses[1]; + + assert_eq!( + status1.deployment_id.to_string(), + "QmZTy9EJHu8rfY9QbEk3z1epmmvh5XHhT2Wqhkfbyt8k9Z" + ); + assert_eq!(status1.chains.len(), 1); + assert!(status1.chains[0].latest_block.is_some()); + assert!(status1.chains[0].earliest_block.is_some()); + + assert_eq!( + status2.deployment_id.to_string(), + "QmSLQfPFcz2pKRJZUH16Sk26EFpRgdxTYGnMiKvWgKRM2a" + ); + assert_eq!(status2.chains.len(), 1); + assert!(status2.chains[0].latest_block.is_none()); + assert!(status2.chains[0].earliest_block.is_none()); + } +} diff --git a/src/network/internal.rs b/src/network/internal.rs deleted file mode 100644 index 4f6e9905f..000000000 --- a/src/network/internal.rs +++ /dev/null @@ -1,69 +0,0 @@ -use std::{collections::HashMap, time::Duration}; - -use thegraph_core::{DeploymentId, IndexerId, SubgraphId}; - -use self::indexer_processing::IndexerRawInfo; -pub use self::{ - snapshot::{Indexing, IndexingId, NetworkTopologySnapshot}, - state::InternalState, - subgraph_processing::{AllocationInfo, DeploymentInfo, SubgraphInfo}, -}; -use super::{subgraph_client::Client as SubgraphClient, DeploymentError, SubgraphError}; - -mod indexer_processing; -mod pre_processing; -mod snapshot; -mod state; -mod subgraph_processing; - -/// Fetch the network topology information from the graph network subgraph. -pub async fn fetch_update( - network: &PreprocessedNetworkInfo, - state: &InternalState, -) -> NetworkTopologySnapshot { - // Process network topology information - let indexers_info = indexer_processing::process_info(state, &network.indexers).await; - snapshot::new_from( - indexers_info, - network.subgraphs.clone(), - network.deployments.clone(), - ) -} - -pub struct PreprocessedNetworkInfo { - subgraphs: HashMap>, - deployments: HashMap>, - indexers: HashMap, -} - -/// Fetch the subgraphs information from the graph network subgraph and performs pre-processing -/// steps, i.e., validation and conversion into the internal representation. -/// -/// 1. Fetch the subgraphs information from the graph network subgraph. -/// 2. Validate and convert the subgraphs fetched info into the internal representation. -/// -/// If the fetch fails or the response is empty, an error is returned. -/// -/// Invalid info is filtered out before converting into the internal representation. -pub async fn fetch_and_preprocess_subgraph_info( - client: &mut SubgraphClient, - timeout: Duration, -) -> anyhow::Result { - // Fetch the subgraphs information from the graph network subgraph - let data = tokio::time::timeout(timeout, client.fetch()).await??; - anyhow::ensure!(!data.is_empty(), "empty subgraph response"); - - // Pre-process (validate and convert) the fetched subgraphs information - let indexers = pre_processing::into_internal_indexers_raw_info(data.iter()); - let subgraphs = pre_processing::into_internal_subgraphs_raw_info(data.into_iter()); - let deployments = pre_processing::into_internal_deployments_raw_info(subgraphs.values()); - - let subgraphs = subgraph_processing::process_subgraph_info(subgraphs); - let deployments = subgraph_processing::process_deployments_info(deployments); - - Ok(PreprocessedNetworkInfo { - subgraphs, - deployments, - indexers, - }) -} diff --git a/src/network/internal/state.rs b/src/network/internal/state.rs deleted file mode 100644 index bd70737cf..000000000 --- a/src/network/internal/state.rs +++ /dev/null @@ -1,28 +0,0 @@ -use std::collections::{BTreeMap, HashSet}; - -use ipnetwork::IpNetwork; -use thegraph_core::alloy::primitives::Address; - -use crate::{ - config::BlockedIndexer, - network::{ - config::VersionRequirements as IndexerVersionRequirements, - indexer_host_resolver::HostResolver, - indexer_indexing_cost_model_resolver::CostModelResolver, - indexer_indexing_poi_blocklist::PoiBlocklist, indexer_indexing_poi_resolver::PoiResolver, - indexer_indexing_progress_resolver::IndexingProgressResolver, - indexer_version_resolver::VersionResolver, - }, -}; - -pub struct InternalState { - pub indexer_blocklist: BTreeMap, - pub indexer_host_resolver: HostResolver, - pub indexer_host_blocklist: HashSet, - pub indexer_version_requirements: IndexerVersionRequirements, - pub indexer_version_resolver: VersionResolver, - pub poi_blocklist: PoiBlocklist, - pub poi_resolver: PoiResolver, - pub indexing_progress_resolver: IndexingProgressResolver, - pub cost_model_resolver: CostModelResolver, -} diff --git a/src/network/poi_filter.rs b/src/network/poi_filter.rs new file mode 100644 index 000000000..c1e17f23c --- /dev/null +++ b/src/network/poi_filter.rs @@ -0,0 +1,279 @@ +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; + +use serde_with::serde_as; +use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId, ProofOfIndexing}; +use thegraph_graphql_http::http_client::ReqwestExt; +use tokio::time::Instant; +use url::Url; + +use super::GraphQlRequest; + +pub struct PoiFilter { + http: reqwest::Client, + blocklist: HashMap>, + cache: parking_lot::RwLock>, +} + +struct IndexerEntry { + pois: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing>, + last_refresh: Instant, +} + +impl PoiFilter { + pub fn new( + http: reqwest::Client, + blocklist: HashMap>, + ) -> Self { + Self { + http, + blocklist, + cache: Default::default(), + } + } + + pub async fn blocked_deployments( + &self, + status_url: &Url, + deployments: &[DeploymentId], + ) -> HashSet { + let requests: Vec<(DeploymentId, BlockNumber)> = self + .blocklist + .iter() + .filter(|(deployment, _)| deployments.contains(deployment)) + .flat_map(|(deployment, entries)| { + entries.iter().map(|(block, _)| (*deployment, *block)) + }) + .collect(); + let pois = self.resolve(status_url, requests).await; + + deployments + .iter() + .filter(|deployment| match self.blocklist.get(deployment) { + None => false, + Some(blocklist) => blocklist.iter().any(|(block, poi)| { + pois.get(&(**deployment, *block)) + .map(|poi_| poi == poi_) + .unwrap_or(true) + }), + }) + .cloned() + .collect() + } + + /// Fetch public PoIs, such that results are cached indefinitely and refreshed every 20 minutes. + async fn resolve( + &self, + status_url: &Url, + requests: Vec<(DeploymentId, BlockNumber)>, + ) -> HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> { + let url_string = status_url.to_string(); + + let mut results: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> = Default::default(); + let mut refresh = false; + { + let cache = self.cache.read(); + if let Some(indexer) = cache.get(&url_string.to_string()) { + refresh = indexer.last_refresh.elapsed() > Duration::from_secs(20 * 60); + for key in &requests { + if let Some(poi) = indexer.pois.get(key) { + results.insert(*key, *poi); + } + } + } + } + + let updates: Vec<(DeploymentId, u64)> = if refresh { + requests.clone() + } else { + requests + .iter() + .filter(|r| !results.contains_key(r)) + .cloned() + .collect() + }; + if updates.is_empty() { + return results; + } + let fetched: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> = + send_requests(&self.http, status_url, updates).await; + { + let now = Instant::now(); + let mut cache = self.cache.write(); + let indexer = cache.entry(url_string).or_insert_with(|| IndexerEntry { + pois: Default::default(), + last_refresh: now, + }); + indexer.last_refresh = now; + for (key, value) in &fetched { + indexer.pois.insert(*key, *value); + } + } + results.extend(fetched); + + results + } +} + +async fn send_requests( + http: &reqwest::Client, + status_url: &Url, + requests: Vec<(DeploymentId, BlockNumber)>, +) -> HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> { + let request_batches = requests.chunks(10); + let requests = request_batches.map(|batch| { + let status_url = status_url.clone(); + async move { + let result = send_request(http, status_url.clone(), batch).await; + match result { + Ok(response) => response, + Err(poi_fetch_err) => { + tracing::debug!(%poi_fetch_err); + Default::default() + } + } + } + }); + + let responses = futures::future::join_all(requests).await; + responses + .into_iter() + .flatten() + .filter_map(|response| { + Some(( + (response.deployment, response.block.number), + response.proof_of_indexing?, + )) + }) + .collect() +} + +async fn send_request( + http: &reqwest::Client, + status_url: Url, + pois: &[(DeploymentId, BlockNumber)], +) -> anyhow::Result> { + let query = r#" + query publicPois($requests: [PublicProofOfIndexingRequest!]!) { + publicProofsOfIndexing(requests: $requests) { + deployment + proofOfIndexing + block { number } + } + }"#; + let request = GraphQlRequest { + document: query.to_string(), + variables: serde_json::json!({ "requests": pois.iter().map(|(deployment, block)| serde_json::json!({ + "deployment": deployment, + "blockNumber": block, + })).collect::>() }), + }; + let response = http + .post(status_url) + .send_graphql::(request) + .await??; + Ok(response.public_proofs_of_indexing) +} + +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct Response { + public_proofs_of_indexing: Vec, +} +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PublicProofOfIndexingResponse { + pub deployment: DeploymentId, + pub block: PartialBlockPtr, + pub proof_of_indexing: Option, +} +#[serde_as] +#[derive(Debug, serde::Deserialize)] +pub struct PartialBlockPtr { + #[serde_as(as = "serde_with::DisplayFromStr")] + pub number: BlockNumber, +} + +#[cfg(test)] +mod tests { + use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, + }; + + use thegraph_core::{ + alloy::{hex, primitives::FixedBytes}, + DeploymentId, + }; + use url::Url; + + use crate::init_logging; + + #[tokio::test] + async fn poi_filter() { + init_logging("poi_filter", false); + + type ResponsePoi = Arc>>; + let deployment: DeploymentId = "QmUzRg2HHMpbgf6Q4VHKNDbtBEJnyp5JWCh2gUX9AV6jXv" + .parse() + .unwrap(); + let bad_poi = + hex!("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").into(); + let response_poi: ResponsePoi = Arc::new(parking_lot::Mutex::new(bad_poi)); + let request_count = Arc::new(parking_lot::Mutex::new(0)); + + let route = { + let response_poi = response_poi.clone(); + let request_count = request_count.clone(); + axum::routing::post(move || async move { + *request_count.lock() += 1; + axum::Json(serde_json::json!({ + "data": { + "publicProofsOfIndexing": [ + { + "deployment": deployment, + "proofOfIndexing": response_poi.lock().to_string(), + "block": { "number": "0" } + } + ] + } + })) + }) + }; + let app = axum::Router::new().route("/status", route); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let indexer_url: Url = + format!("http://127.0.0.1:{}", listener.local_addr().unwrap().port()) + .parse() + .unwrap(); + eprintln!("listening on {indexer_url}"); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + let blocklist = HashMap::from([(deployment, vec![(0, bad_poi.into())])]); + let poi_filter = super::PoiFilter::new(reqwest::Client::new(), blocklist); + + let status_url = indexer_url.join("status").unwrap(); + let assert_blocked = |blocked: Vec| async { + let result = poi_filter + .blocked_deployments(&status_url, &[deployment]) + .await; + assert_eq!(result, HashSet::from_iter(blocked)); + }; + + assert_blocked(vec![deployment]).await; + assert_eq!(*request_count.lock(), 1); + *response_poi.lock() = + hex!("1111111111111111111111111111111111111111111111111111111111111111").into(); + assert_blocked(vec![deployment]).await; + assert_eq!(*request_count.lock(), 1); + tokio::time::pause(); + tokio::time::advance(Duration::from_secs(20 * 60)).await; + assert_blocked(vec![]).await; + assert_eq!(*request_count.lock(), 2); + } +} diff --git a/src/network/internal/pre_processing.rs b/src/network/pre_processing.rs similarity index 97% rename from src/network/internal/pre_processing.rs rename to src/network/pre_processing.rs index f13e4435f..9bbb25458 100644 --- a/src/network/internal/pre_processing.rs +++ b/src/network/pre_processing.rs @@ -5,13 +5,12 @@ use thegraph_core::{AllocationId, DeploymentId, IndexerId, SubgraphId}; use url::Url; use crate::network::{ - internal::{ - indexer_processing::{IndexerRawInfo, IndexingRawInfo}, - subgraph_processing::{DeploymentRawInfo, SubgraphRawInfo, SubgraphVersionRawInfo}, - AllocationInfo, - }, + indexer_processing::{IndexerRawInfo, IndexingRawInfo}, subgraph_client, subgraph_client::types::SubgraphVersion, + subgraph_processing::{ + AllocationInfo, DeploymentRawInfo, SubgraphRawInfo, SubgraphVersionRawInfo, + }, }; pub fn into_internal_indexers_raw_info<'a>( diff --git a/src/network/service.rs b/src/network/service.rs index 68c635f81..c5927896d 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -11,27 +11,27 @@ use ipnetwork::IpNetwork; use semver::Version; use thegraph_core::{ alloy::primitives::{Address, BlockNumber}, - DeploymentId, SubgraphId, + DeploymentId, IndexerId, SubgraphId, }; use tokio::{sync::watch, time::MissedTickBehavior}; use super::{ - config::VersionRequirements, + cost_model::CostModelResolver, errors::{DeploymentError, SubgraphError}, - indexer_host_resolver::HostResolver, - indexer_indexing_cost_model_resolver::CostModelResolver, - indexer_indexing_poi_blocklist::PoiBlocklist, - indexer_indexing_poi_resolver::PoiResolver, - indexer_indexing_progress_resolver::IndexingProgressResolver, - indexer_version_resolver::VersionResolver, - internal::{ - fetch_and_preprocess_subgraph_info, fetch_update, Indexing, IndexingId, InternalState, - NetworkTopologySnapshot, PreprocessedNetworkInfo, - }, + host_filter::HostFilter, + indexer_processing::{self, IndexerRawInfo}, + indexing_progress::IndexingProgressResolver, + poi_filter::PoiFilter, + snapshot::{self, Indexing, IndexingId, NetworkTopologySnapshot}, subgraph_client::Client as SubgraphClient, - ResolutionError, + subgraph_processing::{DeploymentInfo, SubgraphInfo}, + version_filter::{MinimumVersionRequirements, VersionFilter}, +}; +use crate::{ + config::{BlockedIndexer, BlockedPoi}, + errors::UnavailableReason, + network::{pre_processing, subgraph_processing}, }; -use crate::config::{BlockedIndexer, BlockedPoi}; /// Subgraph resolution information returned by the [`NetworkService`]. pub struct ResolvedSubgraphInfo { @@ -46,7 +46,7 @@ pub struct ResolvedSubgraphInfo { /// Subgraph versions, in descending order. pub versions: Vec, /// A list of [`Indexing`]s for the resolved subgraph versions. - pub indexings: HashMap>, + pub indexings: HashMap>, } impl ResolvedSubgraphInfo { @@ -109,19 +109,12 @@ impl NetworkService { let subgraph_chain = subgraph.chain.clone(); let subgraph_start_block = subgraph.start_block; - let indexings = subgraph - .indexings - .clone() - .into_iter() - .map(|(id, res)| (id, res.map_err(|err| err.into()))) - .collect(); - Ok(Some(ResolvedSubgraphInfo { chain: subgraph_chain, start_block: subgraph_start_block, subgraphs: vec![subgraph.id], versions: subgraph.versions.clone(), - indexings, + indexings: subgraph.indexings.clone(), })) } @@ -145,12 +138,7 @@ impl NetworkService { let deployment_start_block = deployment.start_block; let subgraphs = deployment.subgraphs.iter().copied().collect::>(); - let indexings = deployment - .indexings - .clone() - .into_iter() - .map(|(id, res)| (id, res.map_err(|err| err.into()))) - .collect(); + let indexings = deployment.indexings.clone(); Ok(Some(ResolvedSubgraphInfo { chain: deployment_chain, @@ -174,7 +162,7 @@ impl NetworkService { } pub fn spawn( - http_client: reqwest::Client, + http: reqwest::Client, subgraph_client: SubgraphClient, min_indexer_service_version: Version, min_graph_node_version: Version, @@ -182,45 +170,62 @@ pub fn spawn( indexer_host_blocklist: HashSet, poi_blocklist: Vec, ) -> NetworkService { + let poi_blocklist = poi_blocklist + .iter() + .map(|entry| &entry.deployment) + .collect::>() + .into_iter() + .map(|deployment| { + ( + *deployment, + poi_blocklist + .iter() + .filter(|entry| &entry.deployment == deployment) + .map(|entry| (entry.block_number, entry.public_poi.into())) + .collect::>(), + ) + }) + .collect(); let internal_state = InternalState { indexer_blocklist, - indexer_host_resolver: HostResolver::new(Duration::from_secs(5)) + indexer_host_filter: HostFilter::new(indexer_host_blocklist) .expect("failed to create host resolver"), - indexer_host_blocklist, - indexer_version_requirements: VersionRequirements { - min_indexer_service_version, - min_graph_node_version, - }, - indexer_version_resolver: VersionResolver::new(http_client.clone(), Duration::from_secs(5)), - poi_blocklist: PoiBlocklist::new(poi_blocklist), - poi_resolver: PoiResolver::new( - http_client.clone(), - Duration::from_secs(5), - Duration::from_secs(20 * 60), + indexer_version_filter: VersionFilter::new( + http.clone(), + MinimumVersionRequirements { + indexer_service: min_indexer_service_version, + graph_node: min_graph_node_version, + }, ), - indexing_progress_resolver: IndexingProgressResolver::new( - http_client.clone(), - Duration::from_secs(25), - ), - cost_model_resolver: CostModelResolver::new(http_client.clone(), Duration::from_secs(5)), + indexer_poi_filer: PoiFilter::new(http.clone(), poi_blocklist), + indexing_progress_resolver: IndexingProgressResolver::new(http.clone()), + cost_model_resolver: CostModelResolver::new(http.clone()), }; - let update_interval = Duration::from_secs(60); - let network = spawn_updater_task(subgraph_client, internal_state, update_interval); + let network = spawn_updater_task(subgraph_client, internal_state); NetworkService { network } } +pub struct InternalState { + pub indexer_blocklist: BTreeMap, + pub indexer_host_filter: HostFilter, + pub indexer_version_filter: VersionFilter, + pub indexer_poi_filer: PoiFilter, + pub indexing_progress_resolver: IndexingProgressResolver, + pub cost_model_resolver: CostModelResolver, +} + /// Spawn a background task to fetch the network topology information from the graph network /// subgraph at regular intervals fn spawn_updater_task( mut subgraph_client: SubgraphClient, state: InternalState, - update_interval: Duration, ) -> watch::Receiver { let (tx, rx) = watch::channel(Default::default()); tokio::spawn(async move { let mut network_info: Option = None; + let update_interval = Duration::from_secs(30); let mut timer = tokio::time::interval(update_interval); timer.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -253,3 +258,55 @@ fn spawn_updater_task( rx } + +/// Fetch the subgraphs information from the graph network subgraph and performs pre-processing +/// steps, i.e., validation and conversion into the internal representation. +/// +/// 1. Fetch the subgraphs information from the graph network subgraph. +/// 2. Validate and convert the subgraphs fetched info into the internal representation. +/// +/// If the fetch fails or the response is empty, an error is returned. +/// +/// Invalid info is filtered out before converting into the internal representation. +pub async fn fetch_and_preprocess_subgraph_info( + client: &mut SubgraphClient, + timeout: Duration, +) -> anyhow::Result { + // Fetch the subgraphs information from the graph network subgraph + let data = tokio::time::timeout(timeout, client.fetch()).await??; + anyhow::ensure!(!data.is_empty(), "empty subgraph response"); + + // Pre-process (validate and convert) the fetched subgraphs information + let indexers = pre_processing::into_internal_indexers_raw_info(data.iter()); + let subgraphs = pre_processing::into_internal_subgraphs_raw_info(data.into_iter()); + let deployments = pre_processing::into_internal_deployments_raw_info(subgraphs.values()); + + let subgraphs = subgraph_processing::process_subgraph_info(subgraphs); + let deployments = subgraph_processing::process_deployments_info(deployments); + + Ok(PreprocessedNetworkInfo { + subgraphs, + deployments, + indexers, + }) +} + +/// Fetch the network topology information from the graph network subgraph. +pub async fn fetch_update( + network: &PreprocessedNetworkInfo, + state: &InternalState, +) -> NetworkTopologySnapshot { + // Process network topology information + let indexers_info = indexer_processing::process_info(state, &network.indexers).await; + snapshot::new_from( + indexers_info, + network.subgraphs.clone(), + network.deployments.clone(), + ) +} + +pub struct PreprocessedNetworkInfo { + subgraphs: HashMap>, + deployments: HashMap>, + indexers: HashMap, +} diff --git a/src/network/internal/snapshot.rs b/src/network/snapshot.rs similarity index 88% rename from src/network/internal/snapshot.rs rename to src/network/snapshot.rs index bcad301e7..21244642f 100644 --- a/src/network/internal/snapshot.rs +++ b/src/network/snapshot.rs @@ -1,5 +1,3 @@ -//! Entities that are used to represent the network topology. - use std::{ collections::{HashMap, HashSet}, fmt::Debug, @@ -7,16 +5,18 @@ use std::{ }; use custom_debug::CustomDebug; -use semver::Version; use thegraph_core::{ alloy::primitives::BlockNumber, AllocationId, DeploymentId, IndexerId, SubgraphId, }; use url::Url; -use super::{DeploymentInfo, SubgraphInfo}; -use crate::network::{ - errors::{DeploymentError, IndexerInfoResolutionError, IndexingError, SubgraphError}, - internal::indexer_processing::ResolvedIndexerInfo, +use crate::{ + errors::UnavailableReason, + network::{ + errors::{DeploymentError, SubgraphError}, + indexer_processing::ResolvedIndexerInfo, + subgraph_processing::{DeploymentInfo, SubgraphInfo}, + }, }; /// The [`IndexingId`] struct represents the unique identifier of an indexing. @@ -86,11 +86,6 @@ pub struct Indexer { #[debug(with = std::fmt::Display::fmt)] pub url: Url, - /// The indexer's "indexer service" version. - pub indexer_service_version: Version, - /// The indexer's "graph node" version. - pub graph_node_version: Version, - /// The indexer's staked tokens. pub staked_tokens: u128, } @@ -114,7 +109,7 @@ pub struct Subgraph { /// The subgraph's indexings. /// /// A table holding all the known indexings for the subgraph. - pub indexings: HashMap>, + pub indexings: HashMap>, } #[derive(Debug, Clone)] @@ -134,7 +129,7 @@ pub struct Deployment { /// The deployment's indexings. /// /// A table holding all the known indexings for the deployment. - pub indexings: HashMap>, + pub indexings: HashMap>, } /// A snapshot of the network topology. @@ -148,7 +143,7 @@ pub struct NetworkTopologySnapshot { /// Construct the [`NetworkTopologySnapshot`] from the indexers and subgraphs information. pub fn new_from( - indexers_info: HashMap>, + indexers_info: HashMap>, subgraphs_info: HashMap>, deployments_info: HashMap>, ) -> NetworkTopologySnapshot { @@ -162,8 +157,6 @@ pub fn new_from( let indexer = Indexer { id: info.id, url: info.url.clone(), - indexer_service_version: info.indexer_service_version.clone(), - graph_node_version: info.graph_node_version.clone(), staked_tokens: info.staked_tokens, }; @@ -204,10 +197,7 @@ pub fn new_from( /// Construct the subgraphs table row. fn construct_subgraphs_table_row( subgraph_info: SubgraphInfo, - indexers: &HashMap< - IndexerId, - Result<(ResolvedIndexerInfo, Arc), IndexerInfoResolutionError>, - >, + indexers: &HashMap), UnavailableReason>>, ) -> Result { let versions = subgraph_info.versions; let version_ids = versions.iter().map(|v| v.deployment_id).collect(); @@ -277,10 +267,7 @@ fn construct_subgraphs_table_row( /// Construct the subgraphs table row. fn construct_deployments_table_row( deployment_info: DeploymentInfo, - indexers: &HashMap< - IndexerId, - Result<(ResolvedIndexerInfo, Arc), IndexerInfoResolutionError>, - >, + indexers: &HashMap), UnavailableReason>>, ) -> Result { let deployment_id = deployment_info.id; let deployment_manifest_chain = deployment_info.manifest_network; @@ -317,15 +304,12 @@ fn construct_deployments_table_row( /// If the indexer reported an error for the indexing, the row is constructed with the error. fn construct_indexings_table_row( indexing_id: IndexingId, - indexers: &HashMap< - IndexerId, - Result<(ResolvedIndexerInfo, Arc), IndexerInfoResolutionError>, - >, -) -> (IndexingId, Result) { + indexers: &HashMap), UnavailableReason>>, +) -> (IndexingId, Result) { // If the indexer reported an error, bail out. let (indexer_info, indexer) = match indexers.get(&indexing_id.indexer).as_ref() { Some(Ok(indexer)) => indexer, - Some(Err(err)) => return (indexing_id, Err(err.clone().into())), + Some(Err(err)) => return (indexing_id, Err(err.clone())), None => { // Log this error as it should not happen. tracing::error!( @@ -336,7 +320,7 @@ fn construct_indexings_table_row( return ( indexing_id, - Err(IndexingError::Internal("indexer not found")), + Err(UnavailableReason::Internal("indexer not found")), ); } }; @@ -344,7 +328,7 @@ fn construct_indexings_table_row( // If the indexer's indexing info is not found or failed to resolve, bail out. let indexing_info = match indexer_info.indexings.get(&indexing_id.deployment) { Some(Ok(info)) => info, - Some(Err(err)) => return (indexing_id, Err(err.clone().into())), + Some(Err(err)) => return (indexing_id, Err(err.clone())), None => { // Log this error as it should not happen. tracing::error!( @@ -355,7 +339,7 @@ fn construct_indexings_table_row( return ( indexing_id, - Err(IndexingError::Internal("indexing info not found")), + Err(UnavailableReason::Internal("indexing info not found")), ); } }; diff --git a/src/network/internal/subgraph_processing.rs b/src/network/subgraph_processing.rs similarity index 96% rename from src/network/internal/subgraph_processing.rs rename to src/network/subgraph_processing.rs index 456cd5376..16a821b01 100644 --- a/src/network/internal/subgraph_processing.rs +++ b/src/network/subgraph_processing.rs @@ -8,7 +8,7 @@ use crate::network::errors::{DeploymentError, SubgraphError}; /// /// This is not the final representation of the subgraph. #[derive(Debug, Clone)] -pub(super) struct SubgraphRawInfo { +pub struct SubgraphRawInfo { pub id: SubgraphId, pub versions: Vec, } @@ -17,7 +17,7 @@ pub(super) struct SubgraphRawInfo { /// /// This is not the final representation of the subgraph version. #[derive(Debug, Clone)] -pub(super) struct SubgraphVersionRawInfo { +pub struct SubgraphVersionRawInfo { pub deployment: DeploymentRawInfo, } @@ -25,7 +25,7 @@ pub(super) struct SubgraphVersionRawInfo { /// /// This is not the final representation of the deployment. #[derive(Debug, Clone)] -pub(super) struct DeploymentRawInfo { +pub struct DeploymentRawInfo { pub id: DeploymentId, pub manifest_network: String, pub manifest_start_block: BlockNumber, @@ -75,7 +75,7 @@ pub struct AllocationInfo { /// Process the fetched subgraphs' information. /// /// - If the subgraph has no allocations, [`SubgraphError::NoAllocations`] is returned. -pub(super) fn process_subgraph_info( +pub fn process_subgraph_info( subgraphs: HashMap, ) -> HashMap> { subgraphs @@ -144,7 +144,7 @@ fn check_subgraph_has_allocations(subgraph: &SubgraphRawInfo) -> Result<(), Subg /// Process the fetched deployments' information. /// /// - If the deployment has no allocations, [`DeploymentError::NoAllocations`] is returned. -pub(super) fn process_deployments_info( +pub fn process_deployments_info( deployments: HashMap, ) -> HashMap> { deployments diff --git a/src/network/version_filter.rs b/src/network/version_filter.rs new file mode 100644 index 000000000..7d9d1d564 --- /dev/null +++ b/src/network/version_filter.rs @@ -0,0 +1,120 @@ +use std::collections::HashMap; + +use parking_lot::RwLock; +use semver::Version; +use thegraph_graphql_http::http_client::ReqwestExt; +use url::Url; + +use crate::errors::UnavailableReason; + +pub struct MinimumVersionRequirements { + pub indexer_service: Version, + pub graph_node: Version, +} + +pub struct VersionFilter { + http: reqwest::Client, + min_versions: MinimumVersionRequirements, + indexer_service_version_cache: RwLock>, + graph_node_version_cache: RwLock>, +} + +impl VersionFilter { + pub fn new(http: reqwest::Client, min_versions: MinimumVersionRequirements) -> Self { + Self { + http, + min_versions, + indexer_service_version_cache: Default::default(), + graph_node_version_cache: Default::default(), + } + } + + pub async fn check(&self, url: &Url) -> Result<(), UnavailableReason> { + let indexer_service_version = self.fetch_indexer_service_version(url).await?; + if indexer_service_version < self.min_versions.indexer_service { + return Err(UnavailableReason::NotSupported( + "indexer-service version below minimum".to_string(), + )); + } + + // TODO: After more graph nodes support reporting their version, we should assume they are + // on the minimum version if we can't get the version. + match self.fetch_graph_node_version(url).await { + Ok(version) if version < self.min_versions.graph_node => { + return Err(UnavailableReason::NotSupported( + "graph-node version below minimum".to_string(), + )) + } + _ => (), + }; + + Ok(()) + } + + async fn fetch_indexer_service_version(&self, url: &Url) -> Result { + if let Some(version) = self.indexer_service_version_cache.read().get(url.as_str()) { + return Ok(version.clone()); + } + + let url = url + .join("version") + .map_err(|_| UnavailableReason::invalid_url())?; + #[derive(Debug, serde::Deserialize)] + struct Response { + version: Version, + } + let version = self + .http + .get(url.clone()) + .send() + .await + .map_err(|_| UnavailableReason::NoStatus("indexer not available".to_string()))? + .json::() + .await + .map_err(|_| UnavailableReason::NoStatus("indexer not available".to_string()))? + .version; + + self.indexer_service_version_cache + .write() + .insert(url.to_string(), version.clone()); + + Ok(version) + } + + async fn fetch_graph_node_version(&self, url: &Url) -> Result { + if let Some(version) = self.graph_node_version_cache.read().get(url.as_str()) { + return Ok(version.clone()); + } + + let url = url + .join("status") + .map_err(|_| UnavailableReason::invalid_url())?; + #[derive(Debug, serde::Deserialize)] + struct Response { + version: InnerVersion, + } + #[derive(Debug, serde::Deserialize)] + struct InnerVersion { + version: Version, + } + let version = self + .http + .post(url.clone()) + .send_graphql::("{ version { version } }") + .await + .map_err(|err| { + UnavailableReason::NoStatus(format!("failed to query graph-node version: {err}")) + })? + .map_err(|err| { + UnavailableReason::NoStatus(format!("failed to query graph-node version: {err}")) + })? + .version + .version; + + self.graph_node_version_cache + .write() + .insert(url.to_string(), version.clone()); + + Ok(version) + } +} diff --git a/src/ttl_hash_map.rs b/src/ttl_hash_map.rs deleted file mode 100644 index 01269015d..000000000 --- a/src/ttl_hash_map.rs +++ /dev/null @@ -1,396 +0,0 @@ -//! A hashmap with entries that expire after a given TTL. -//! -//!
-//! The hashmap expired entries are not automatically removed. You must call -//! [`cleanup`](TtlHashMap::cleanup) to remove the expired entries and release the unused memory. -//!
-use std::{ - collections::HashMap, - time::{Duration, Instant}, -}; - -/// The default TTL for entries is [`Duration::MAX`]. -pub const DEFAULT_TTL: Duration = Duration::MAX; - -/// A hashmap with entries that expire after a given TTL. -#[derive(Clone)] -pub struct TtlHashMap { - ttl: Duration, - inner: HashMap, -} - -impl Default for TtlHashMap { - fn default() -> Self { - Self { - ttl: DEFAULT_TTL, - inner: Default::default(), - } - } -} - -impl TtlHashMap { - /// Create a new hashmap with the default TTL (see [`DEFAULT_TTL`]). - pub fn new() -> Self { - Default::default() - } - - /// Create a new hashmap with the given TTL. - pub fn with_ttl(ttl: Duration) -> Self { - Self { - ttl, - inner: Default::default(), - } - } - - /// Create a new hashmap with the given TTL and capacity. - pub fn with_ttl_and_capacity(ttl: Duration, capacity: usize) -> Self { - Self { - ttl, - inner: HashMap::with_capacity(capacity), - } - } -} - -impl TtlHashMap -where - K: Eq + std::hash::Hash, -{ - /// Insert a key-value pair into the hashmap. - /// - /// If the key already exists, the value is updated and the old value is returned. - /// Otherwise, `None` is returned. - pub fn insert(&mut self, key: K, value: V) -> Option { - let now = Instant::now(); - self.inner - .insert(key, (now, value)) - .and_then(|(timestamp, value)| { - if timestamp.elapsed() < self.ttl { - Some(value) - } else { - None - } - }) - } - - /// Get the value associated with the key. - /// - /// If the key is found and the entry has not expired, the value is returned. Otherwise, - /// `None` is returned. - #[must_use] - pub fn get(&self, key: &K) -> Option<&V> { - self.inner.get(key).and_then(|(timestamp, value)| { - if timestamp.elapsed() < self.ttl { - Some(value) - } else { - None - } - }) - } - - /// Remove the key and its associated value from the hashmap. - /// - /// If the key is found and the entry has not expired, the value is returned. Otherwise, - /// `None` is returned. - pub fn remove(&mut self, key: &K) -> Option { - self.inner.remove(key).and_then(|(timestamp, value)| { - if timestamp.elapsed() < self.ttl { - Some(value) - } else { - None - } - }) - } - - /// Returns the number of elements in the hashmap. - /// - /// This is the number of non-expired entries. - #[must_use] - pub fn len(&self) -> usize { - self.inner - .iter() - .filter(|(_, (timestamp, _))| timestamp.elapsed() < self.ttl) - .count() - } - - /// Returns whether the hashmap is empty. - /// - /// This is true if the hashmap is actually empty, or all its entries are expired. - #[must_use] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Returns the number of elements in the hashmap, including the expired ones. - /// - /// This is the number of all entries, expired and non-expired. - #[must_use] - pub fn len_all(&self) -> usize { - self.inner.len() - } - - /// Returns the current capacity of the hashmap. - #[must_use] - pub fn capacity(&self) -> usize { - self.inner.capacity() - } - - /// Shrinks the capacity of the hashmap as much as possible. - pub fn shrink_to_fit(&mut self) { - self.inner.shrink_to_fit(); - } - - /// Clear the hashmap, removing all entries. - pub fn clear(&mut self) { - self.inner.clear(); - } - - /// Cleanup the hashmap, removing all expired entries. - /// - /// After removing all expired entries, the inner hashmap is shrunk to fit the new capacity, - /// releasing the unused memory. - pub fn cleanup(&mut self) { - // Remove all expired entries - self.inner - .retain(|_, (timestamp, _)| timestamp.elapsed() < self.ttl); - - // Shrink the inner hashmap to fit the new size - self.shrink_to_fit(); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn insert_and_get_an_item() { - //* Given - let mut ttl_hash_map = TtlHashMap::new(); - - let key = "item"; - let value = 1337; - - //* When - ttl_hash_map.insert(key, value); - - //* Then - assert_eq!(ttl_hash_map.get(&key), Some(&value)); - assert_eq!(ttl_hash_map.len(), 1); - } - - #[test] - fn get_none_if_no_item_is_present() { - //* Given - let ttl_hash_map = TtlHashMap::<&str, ()>::new(); - - let key = "item"; - - //* When - let value = ttl_hash_map.get(&key); - - //* Then - assert_eq!(value, None); - } - - #[test] - fn get_none_if_the_item_is_expired() { - //* Given - let mut ttl_hash_map = TtlHashMap::with_ttl(Duration::from_millis(5)); - - let key = "item"; - let value = 1337; - - // Pre-populate the map - ttl_hash_map.insert(key, value); - - //* When - // Wait for the TTL to expire - std::thread::sleep(Duration::from_millis(10)); - - //* Then - assert_eq!(ttl_hash_map.get(&key), None); - assert_eq!(ttl_hash_map.len(), 0); - } - - #[test] - fn report_the_correct_length() { - //* Given - let mut ttl_hash_map = TtlHashMap::with_ttl(Duration::from_millis(5)); - - let key_1 = "expired_item_1"; - let value_1 = 1337; - - let key_2 = "expired_item_2"; - let value_2 = 42; - - let key_3 = "non_expired_item"; - let value_3 = 69; - - // Pre-populate the map - ttl_hash_map.insert(key_1, value_1); - ttl_hash_map.insert(key_2, value_2); - - //* When - // Wait for the TTL to expire - std::thread::sleep(Duration::from_millis(10)); - - // Insert a new item with different key - ttl_hash_map.insert(key_3, value_3); - - //* Then - // One non-expired item and two expired items - assert_eq!(ttl_hash_map.len(), 1); - assert_eq!(ttl_hash_map.len_all(), 3); - assert!(!ttl_hash_map.is_empty()); - } - - #[test] - fn insert_an_item_and_return_the_old_value_if_not_expired() { - //* Given - let mut ttl_hash_map = TtlHashMap::new(); - - let key = "item"; - let old_value = 1337; - let new_value = 42; - - // Pre-populate the map - ttl_hash_map.insert(key, old_value); - - //* When - let returned_old_value = ttl_hash_map.insert(key, new_value); - - //* Then - assert_eq!(returned_old_value, Some(old_value)); - assert_eq!(ttl_hash_map.get(&key), Some(&new_value)); - assert_eq!(ttl_hash_map.len(), 1); - assert_eq!(ttl_hash_map.len_all(), 1); - assert!(!ttl_hash_map.is_empty()); - } - - #[test] - fn insert_an_item_and_return_none_if_the_old_value_is_expired() { - //* Given - let mut ttl_hash_map = TtlHashMap::with_ttl(Duration::from_millis(5)); - - let key = "item"; - let old_value = 1337; - let new_value = 42; - - // Pre-populate the map - ttl_hash_map.insert(key, old_value); - - //* When - // Wait for the TTL to expire - std::thread::sleep(Duration::from_millis(10)); - - let returned_old_value = ttl_hash_map.insert(key, new_value); - - //* Then - assert_eq!(returned_old_value, None); - assert_eq!(ttl_hash_map.get(&key), Some(&new_value)); - assert_eq!(ttl_hash_map.len(), 1); - assert_eq!(ttl_hash_map.len_all(), 1); - assert!(!ttl_hash_map.is_empty()); - } - - #[test] - fn remove_an_item_and_return_it_if_not_expired() { - //* Given - let mut ttl_hash_map = TtlHashMap::new(); - - let key = "item"; - let value = 1337; - - // Pre-populate the map - ttl_hash_map.insert(key, value); - - //* When - let removed_value = ttl_hash_map.remove(&key); - - //* Then - assert_eq!(removed_value, Some(value)); - assert_eq!(ttl_hash_map.get(&key), None); - assert_eq!(ttl_hash_map.len(), 0); - assert_eq!(ttl_hash_map.len_all(), 0); - assert!(ttl_hash_map.is_empty()); - } - - #[test] - fn remove_an_item_and_return_none_if_expired() { - //* Given - let mut ttl_hash_map = TtlHashMap::with_ttl(Duration::from_millis(5)); - - let key = "item"; - let value = 1337; - - // Pre-populate the map - ttl_hash_map.insert(key, value); - - //* When - // Wait for the TTL to expire - std::thread::sleep(Duration::from_millis(10)); - - let removed_value = ttl_hash_map.remove(&key); - - //* Then - assert_eq!(removed_value, None); - assert_eq!(ttl_hash_map.get(&key), None); - assert_eq!(ttl_hash_map.len(), 0); - assert_eq!(ttl_hash_map.len_all(), 0); - assert!(ttl_hash_map.is_empty()); - } - - #[test] - fn clear_the_hashmap() { - //* Given - let mut ttl_hash_map = TtlHashMap::with_ttl_and_capacity(Duration::from_millis(5), 5); - - let key_1 = "item_1"; - let value_1 = 1337; - - let key_2 = "item_2"; - let value_2 = 42; - - // Pre-populate the map - ttl_hash_map.insert(key_1, value_1); - ttl_hash_map.insert(key_2, value_2); - - //* When - ttl_hash_map.clear(); - - //* Then - assert_eq!(ttl_hash_map.len(), 0); - assert_eq!(ttl_hash_map.len_all(), 0); - assert!(ttl_hash_map.is_empty()); - } - - #[test] - fn cleanup_the_hashmap_and_shrink_to_fit() { - //* Given - let mut ttl_hash_map = TtlHashMap::with_ttl_and_capacity(Duration::from_millis(5), 5); - - let key = "non_expired_item"; - let value = 69; - - // Pre-populate the map with 100 items - for i in 0..100 { - ttl_hash_map.insert(format!("expired_item_{i}"), i); - } - - //* When - // Wait for the TTL to expire - std::thread::sleep(Duration::from_millis(10)); - - // Insert a new item with different key - ttl_hash_map.insert(key.to_string(), value); - - // Remove expired entries and shrink the hashmap to fit the new size - ttl_hash_map.cleanup(); - - //* Then - assert_eq!(ttl_hash_map.len(), 1); - assert_eq!(ttl_hash_map.len_all(), 1); - assert!(ttl_hash_map.capacity() < 100); - assert!(!ttl_hash_map.is_empty()); - } -}