Skip to content

Commit

Permalink
Add support for new subgraph IDs (#87)
Browse files Browse the repository at this point in the history
A upgrade to the GNS contracts will change the format of subgraph IDs to
be base58-encoded keccak hashes of the accoount ID and sequence ID that
make up the current subgraph IDs. To support the new IDs, and maintain
backward compatibility, All subgraph IDs will be parsed and stored in
the new ID format.
  • Loading branch information
Theodus authored Dec 3, 2021
1 parent a926e27 commit fb4f39f
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 40 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ secp256k1 = "0.20"
serde = "1.0"
serde_json = {version = "1.0", features = ["raw_value"]}
serde_yaml = "0.8"
sha3 = "0.9"
single = "1"
structopt = "0.3"
structopt-derive = "0.4"
Expand Down
9 changes: 6 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,11 @@ async fn handle_subgraph_query_inner(
data.inputs.clone(),
);
let url_params = request.match_info();
let subgraph = if let Some(name) = url_params.get("subgraph_id") {
Subgraph::Name(name.into())
let subgraph = if let Some(id) = url_params
.get("subgraph_id")
.and_then(|id| id.parse::<SubgraphID>().ok())
{
Subgraph::ID(id)
} else if let Some(deployment) = url_params
.get("deployment_id")
.and_then(|id| SubgraphDeploymentID::from_ipfs_hash(&id))
Expand Down Expand Up @@ -484,7 +487,7 @@ async fn handle_subgraph_query_inner(
fee: result.query.fee,
domain: domain.to_string(),
subgraph: match subgraph {
Subgraph::Name(name) => Some(name),
Subgraph::ID(id) => Some(id.to_string()),
Subgraph::Deployment(_) => None,
},
});
Expand Down
89 changes: 82 additions & 7 deletions src/prelude/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use bs58;
use serde::{self, Deserialize, Deserializer, Serialize, Serializer};
use sha3::{Digest as _, Keccak256};
pub use std::{fmt, str::FromStr};

macro_rules! bytes_wrapper {
Expand Down Expand Up @@ -56,8 +58,64 @@ macro_rules! bytes_wrapper {

bytes_wrapper!(pub, Address, 20, "HexStr");
bytes_wrapper!(pub, Bytes32, 32, "HexStr");
bytes_wrapper!(pub, SubgraphID, 32);
bytes_wrapper!(pub, SubgraphDeploymentID, 32);

impl FromStr for SubgraphID {
type Err = InvalidSubgraphID;
fn from_str(s: &str) -> Result<Self, Self::Err> {
fn parse_v1(s: &str) -> Option<[u8; 32]> {
// Attempt to decode v1 format: '0x' <hex account_id> '-' <decimal sequence_id>
let (account_id, sequence_id) = s.strip_prefix("0x").and_then(|s| s.split_once("-"))?;
let account = account_id.parse::<Address>().ok()?;
// Assuming u256 big-endian, since that's the word-size of the EVM
let mut sequence_word = [0u8; 32];
let sequence_number = sequence_id.parse::<u64>().ok()?.to_be_bytes();
sequence_word[24..].copy_from_slice(&sequence_number);
let hash: [u8; 32] = Keccak256::default()
.chain(account.as_ref())
.chain(&sequence_word)
.finalize()
.into();
Some(hash)
}
fn parse_v2(s: &str) -> Option<[u8; 32]> {
// Attempt to decode v2 format: base58 of sha256 hash
let mut hash = [0u8; 32];
let len = bs58::decode(s).into(&mut hash).ok()?;
if len != hash.len() {
return None;
}
Some(hash)
}
if let Some(v2) = parse_v2(s) {
return Ok(v2.into());
}
parse_v1(s).map(Into::into).ok_or(InvalidSubgraphID)
}
}

impl fmt::Display for SubgraphID {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", bs58::encode(self.as_ref()).into_string())
}
}

impl fmt::Debug for SubgraphID {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self)
}
}

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct InvalidSubgraphID;

impl fmt::Display for InvalidSubgraphID {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Invalid Subgraph ID")
}
}

impl SubgraphDeploymentID {
pub fn from_ipfs_hash(hash: &str) -> Option<Self> {
let mut decoded = [0u8; 34];
Expand All @@ -73,28 +131,28 @@ impl SubgraphDeploymentID {
}

impl FromStr for SubgraphDeploymentID {
type Err = BadIPFSHash;
type Err = InvalidIPFSHash;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::from_ipfs_hash(s).ok_or(BadIPFSHash)
Self::from_ipfs_hash(s).ok_or(InvalidIPFSHash)
}
}

impl fmt::Debug for SubgraphDeploymentID {
impl fmt::Display for SubgraphDeploymentID {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.ipfs_hash())
}
}

impl fmt::Display for SubgraphDeploymentID {
impl fmt::Debug for SubgraphDeploymentID {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self)
write!(f, "{}", self)
}
}

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct BadIPFSHash;
pub struct InvalidIPFSHash;

impl fmt::Display for BadIPFSHash {
impl fmt::Display for InvalidIPFSHash {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Invalid IPFS hash")
}
Expand All @@ -104,6 +162,23 @@ impl fmt::Display for BadIPFSHash {
mod tests {
use super::*;

#[tokio::test]
async fn subgraph_id_encoding() {
let bytes = hex::decode("67486e65165b1474898247760a4b852d70d95782c6325960e5b6b4fd82fed1bd")
.unwrap();
let v1 = "0xdeadbeef678b513255cea949017921c8c9f6ef82-1";
let v2 = "7xB3yxxD8okmq4dZPky3eP1nYRgLfZrwMyUQBGo32t4U";

let id1 = v1.parse::<SubgraphID>().unwrap();
let id2 = v2.parse::<SubgraphID>().unwrap();

assert_eq!(*id1, bytes.as_slice());
assert_eq!(&id1.to_string(), v2);
assert_eq!(*id2, bytes.as_slice());
assert_eq!(&id2.to_string(), v2);
assert_eq!(id1, id2);
}

#[tokio::test]
async fn subgraph_deployment_id_encoding() {
let ipfs_hash = "QmWmyoMoctfbAaiEs2G46gpeUmhqFRDW6KWo64y5r581Vz";
Expand Down
22 changes: 12 additions & 10 deletions src/query_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use uuid::Uuid;

#[derive(Clone, Debug)]
pub enum Subgraph {
Name(String),
ID(SubgraphID),
Deployment(SubgraphDeploymentID),
}

Expand Down Expand Up @@ -158,14 +158,14 @@ pub struct Config {
#[derive(Clone)]
pub struct Inputs {
pub indexers: Arc<Indexers>,
pub current_deployments: Eventual<Ptr<HashMap<String, SubgraphDeploymentID>>>,
pub current_deployments: Eventual<Ptr<HashMap<SubgraphID, SubgraphDeploymentID>>>,
pub deployment_indexers: Eventual<Ptr<HashMap<SubgraphDeploymentID, im::Vector<Address>>>>,
}

pub struct InputWriters {
pub indexer_inputs: indexer_selection::InputWriters,
pub indexers: Arc<Indexers>,
pub current_deployments: EventualWriter<Ptr<HashMap<String, SubgraphDeploymentID>>>,
pub current_deployments: EventualWriter<Ptr<HashMap<SubgraphID, SubgraphDeploymentID>>>,
pub deployment_indexers:
EventualWriter<Ptr<HashMap<SubgraphDeploymentID, im::Vector<Address>>>>,
}
Expand Down Expand Up @@ -194,7 +194,7 @@ impl Inputs {

pub struct QueryEngine<I: IndexerInterface + Clone + Send> {
indexers: Arc<Indexers>,
current_deployments: Eventual<Ptr<HashMap<String, SubgraphDeploymentID>>>,
current_deployments: Eventual<Ptr<HashMap<SubgraphID, SubgraphDeploymentID>>>,
deployment_indexers: Eventual<Ptr<HashMap<SubgraphDeploymentID, im::Vector<Address>>>>,
subgraph_info: SubgraphInfoMap,
block_resolvers: Arc<HashMap<String, BlockResolver>>,
Expand Down Expand Up @@ -233,19 +233,21 @@ impl<I: IndexerInterface + Clone + Send + 'static> QueryEngine<I> {
);
let api_key = query.api_key.key.clone();
let query_start = Instant::now();
let name_timer = if let Subgraph::Name(subgraph_name) = &query.subgraph {
with_metric(&METRICS.subgraph_name_duration, &[subgraph_name], |hist| {
hist.start_timer()
})
let name_timer = if let Subgraph::ID(id) = &query.subgraph {
with_metric(
&METRICS.subgraph_name_duration,
&[&id.to_string()],
|hist| hist.start_timer(),
)
} else {
None
};
let deployment = match &query.subgraph {
Subgraph::Deployment(deployment) => deployment.clone(),
Subgraph::Name(name) => self
Subgraph::ID(id) => self
.current_deployments
.value_immediate()
.and_then(|map| map.get(name).cloned())
.and_then(|map| map.get(id).cloned())
.ok_or_else(|| QueryEngineError::SubgraphNotFound)?,
};
if !query.api_key.deployments.is_empty() && !query.api_key.deployments.contains(&deployment)
Expand Down
45 changes: 27 additions & 18 deletions src/query_engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ struct TopologyConfig {
struct NetworkTopology {
name: String,
blocks: Vec<BlockPointer>,
subgraphs: BTreeMap<String, SubgraphTopology>,
subgraphs: BTreeMap<SubgraphID, SubgraphTopology>,
}

#[derive(Clone, Debug)]
struct SubgraphTopology {
name: String,
id: SubgraphID,
deployments: Vec<DeploymentTopology>,
}

Expand Down Expand Up @@ -183,19 +183,28 @@ impl Topology {
}

let invalid_network = self.gen_str(*self.config.networks.end() + 1);
let invalid_subgraph = self.gen_str(*self.config.subgraphs.end() + 1);
let invalid_subgraph = "FzFAtwLSSzxhug4sEvUeusLxFdjAS3ByYqgseUYkFpkN"
.parse::<SubgraphID>()
.unwrap();
let rng = &mut self.rng;
let mut network = choose_name(rng, self.networks.keys(), &invalid_network);
let mut subgraph = self
.networks
.get(&network)
.map(|net| choose_name(rng, net.subgraphs.keys(), &invalid_subgraph))
.unwrap_or(invalid_subgraph.clone());
.map(|net| {
*net.subgraphs
.keys()
.copied()
.collect::<Vec<SubgraphID>>()
.choose(rng)
.unwrap_or(&invalid_subgraph)
})
.unwrap_or(invalid_subgraph);
if self.flip_coin(32) {
network = invalid_network;
}
if self.flip_coin(32) {
subgraph = invalid_subgraph;
subgraph = invalid_subgraph.clone();
}
let query = if self.flip_coin(32) { "?" } else { BASIC_QUERY };
ClientQuery {
Expand All @@ -204,7 +213,7 @@ impl Topology {
query: query.into(),
variables: None,
network,
subgraph: Subgraph::Name(subgraph),
subgraph: Subgraph::ID(subgraph),
}
}

Expand All @@ -220,24 +229,24 @@ impl Topology {
}
for _ in 0..self.gen_len(self.config.subgraphs.clone(), 32) {
let subgraph = self.gen_subgraph();
network.subgraphs.insert(subgraph.name.clone(), subgraph);
network.subgraphs.insert(subgraph.id, subgraph);
}
network
}

fn gen_subgraph(&mut self) -> SubgraphTopology {
let mut name = self.gen_str(log_2(*self.config.networks.end()));
let mut id = self.gen_bytes().into();
// TODO: For now, subgraph names must be unique across networks
while self
.subgraphs()
.iter()
.any(|(_, subgraph)| subgraph.name == name)
.any(|(_, subgraph)| subgraph.id == id)
{
name = self.gen_str(log_2(*self.config.networks.end()));
id = self.gen_bytes().into();
}

let mut subgraph = SubgraphTopology {
name,
id,
deployments: Vec::new(),
};
for _ in 1..self.gen_len(self.config.deployments.clone(), 32) {
Expand Down Expand Up @@ -330,7 +339,7 @@ impl Topology {
}
}

fn subgraph(&self, network: &str, subgraph: &str) -> Option<&SubgraphTopology> {
fn subgraph(&self, network: &str, subgraph: &SubgraphID) -> Option<&SubgraphTopology> {
self.networks
.get(network)
.and_then(|net| net.subgraphs.get(subgraph))
Expand Down Expand Up @@ -412,7 +421,7 @@ impl Topology {
self.inputs.current_deployments.write(Ptr::new(
self.subgraphs()
.into_iter()
.filter_map(|(_, subgraph)| Some((subgraph.name, subgraph.deployments.last()?.id)))
.filter_map(|(_, subgraph)| Some((subgraph.id, subgraph.deployments.last()?.id)))
.collect(),
));
self.inputs.deployment_indexers.write(Ptr::new(
Expand Down Expand Up @@ -440,12 +449,12 @@ impl Topology {
let mut trace = Vec::new();
trace.push(format!("{:#?}", query));
trace.push(format!("{:#?}", result));
let subgraph_name = match &query.subgraph {
Subgraph::Name(name) => name,
let subgraph_id = match &query.subgraph {
Subgraph::ID(id) => id,
Subgraph::Deployment(_) => panic!("Unexpected SubgraphDeploymentID"),
};
// Return SubgraphNotFound if the subgraph does not exist.
let subgraph = match self.subgraph(&query.network, subgraph_name) {
let subgraph = match self.subgraph(&query.network, subgraph_id) {
Some(subgraph) => subgraph,
None => {
if let Err(QueryEngineError::SubgraphNotFound) = result {
Expand All @@ -455,7 +464,7 @@ impl Topology {
// expected. In the future there must be a check that the query network matches
// with the expected network of the subgraph.
if self.subgraphs().iter().all(|(network, subgraph)| {
(network.name != query.network) || (&subgraph.name != subgraph_name)
(network.name != query.network) || (&subgraph.id != subgraph_id)
}) {
return Ok(());
}
Expand Down
4 changes: 2 additions & 2 deletions src/sync_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ struct CurrentDeployments;

fn parse_current_deployments(
data: current_deployments::ResponseData,
) -> Option<Ptr<HashMap<String, SubgraphDeploymentID>>> {
) -> Option<Ptr<HashMap<SubgraphID, SubgraphDeploymentID>>> {
use current_deployments::{CurrentDeploymentsData, ResponseData};
let values = match data {
ResponseData {
Expand All @@ -404,7 +404,7 @@ fn parse_current_deployments(
};
let parsed = values.into_iter().filter_map(|value| {
Some((
value.subgraph,
value.subgraph.parse::<SubgraphID>().ok()?,
SubgraphDeploymentID::from_ipfs_hash(&value.deployment)?,
))
});
Expand Down

0 comments on commit fb4f39f

Please sign in to comment.