Skip to content

Commit

Permalink
fix(su): su assign now checking regular messages along with baselayer #…
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceJuliano committed May 21, 2024
1 parent 2e20820 commit 5dd3eae
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 10 deletions.
77 changes: 76 additions & 1 deletion servers/su/src/domain/clients/gateway.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::domain::config::AoConfig;
use crate::domain::core::dal::{Gateway, NetworkInfo, TxStatus};
use crate::domain::core::dal::{Gateway, GatewayTx, NetworkInfo, TxStatus};
use arweave_rs::network::NetworkInfoClient;
use async_trait::async_trait;
use reqwest::{Client, Url};
use serde_derive::Deserialize;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};
Expand All @@ -17,6 +18,8 @@ pub struct ArweaveGateway {
pub enum GatewayErrorType {
CheckHeadError(String),
StatusError(String),
GraphQLError(String),
JsonParseError(String),
}

impl From<GatewayErrorType> for String {
Expand All @@ -25,6 +28,36 @@ impl From<GatewayErrorType> for String {
}
}

/*
Right now we dont need all the fields
but later we can add to these types to
pull more data from gql responses
*/
#[derive(Deserialize, Debug, Clone)]
struct Node {
id: String,
}

#[derive(Deserialize, Debug)]
struct Edge {
node: Node,
}

#[derive(Deserialize, Debug)]
struct Transactions {
edges: Vec<Edge>,
}

#[derive(Deserialize, Debug)]
struct Data {
transactions: Transactions,
}

#[derive(Deserialize, Debug)]
struct GraphQLResponse {
data: Data,
}

impl ArweaveGateway {
pub async fn new() -> Result<Self, String> {
let network_info = ArweaveGateway::network_info_fetch().await?;
Expand Down Expand Up @@ -169,4 +202,46 @@ impl Gateway for ArweaveGateway {
))
}
}

async fn gql_tx(&self, tx_id: &String) -> Result<GatewayTx, String> {
let config = AoConfig::new(Some("su".to_string())).expect("Failed to read configuration");
let gateway_url = config.gateway_url;
let client = Client::new();

let query = serde_json::json!({
"query": format!(
"query {{ transactions (ids: [\"{}\"]){{ edges {{ node {{ id }} }} }} }}",
tx_id
),
"variables": {}
});

let query_string = serde_json::to_string(&query)
.map_err(|e| GatewayErrorType::GraphQLError(e.to_string()))?;

let response = client
.post(format!("{}/graphql", gateway_url))
.header("Content-Type", "application/json")
.body(query_string)
.send()
.await
.map_err(|e| GatewayErrorType::GraphQLError(e.to_string()))?;

if response.status().is_success() {
let body: GraphQLResponse = response
.json()
.await
.map_err(|e| GatewayErrorType::JsonParseError(e.to_string()))?;

if let Some(edge) = body.data.transactions.edges.get(0) {
Ok(GatewayTx {
id: edge.node.clone().id,
})
} else {
Err("Transaction not found".to_string())
}
} else {
Err(format!("Failed to fetch transaction: {}", response.status()).to_string())
}
}
}
7 changes: 4 additions & 3 deletions servers/su/src/domain/clients/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl StoreClient {
let database_url = config.database_url;
let database_read_url = match config.database_read_url {
Some(u) => u,
None => database_url.clone()
None => database_url.clone(),
};
let manager = ConnectionManager::<PgConnection>::new(database_url);
let read_manager = ConnectionManager::<PgConnection>::new(database_read_url);
Expand All @@ -84,9 +84,10 @@ impl StoreClient {
.test_on_check_out(true)
.build(read_manager)
.map_err(|_| {
StoreErrorType::DatabaseError("Failed to initialize read connection pool.".to_string())
StoreErrorType::DatabaseError(
"Failed to initialize read connection pool.".to_string(),
)
})?;


Ok(StoreClient { pool, read_pool })
}
Expand Down
4 changes: 1 addition & 3 deletions servers/su/src/domain/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ impl AoConfig {
};
let database_read_url = match env::var("DATABASE_READ_URL") {
Ok(val) => Some(val),
Err(_e) => {
None
}
Err(_e) => None,
};
Ok(AoConfig {
database_url: env::var("DATABASE_URL")?,
Expand Down
17 changes: 15 additions & 2 deletions servers/su/src/domain/core/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,22 @@ impl<'a> Builder<'a> {
)),
}
}
None => Ok(()),
None => {
/*
If this throws an error then the tx
is not available on the gateway
*/
self.gateway.gql_tx(&tx_id).await?;
Ok(())
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::domain::core::dal::NetworkInfo;
use crate::domain::core::dal::{GatewayTx, NetworkInfo};
use async_trait::async_trait;
use std::sync::Arc;

Expand All @@ -300,6 +307,12 @@ mod tests {
number_of_confirmations: 0,
})
}

async fn gql_tx(&self, _tx_id: &String) -> Result<GatewayTx, String> {
Ok(GatewayTx {
id: "id".to_string(),
})
}
}

struct MockSigner;
Expand Down
6 changes: 6 additions & 0 deletions servers/su/src/domain/core/dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@ pub struct TxStatus {
pub number_of_confirmations: i32,
}

#[derive(Deserialize)]
pub struct GatewayTx {
pub id: String,
}

#[async_trait]
pub trait Gateway: Send + Sync {
async fn check_head(&self, tx_id: String) -> Result<bool, String>;
async fn network_info(&self) -> Result<NetworkInfo, String>;
async fn status(&self, tx_id: &String) -> Result<TxStatus, String>;
async fn gql_tx(&self, tx_id: &String) -> Result<GatewayTx, String>;
}

pub trait Wallet: Send + Sync {
Expand Down
2 changes: 1 addition & 1 deletion servers/su/src/domain/core/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,4 @@ pub async fn health(deps: Arc<Deps>) -> Result<String, String> {
}
Err(e) => Err(format!("{:?}", e)),
}
}
}

0 comments on commit 5dd3eae

Please sign in to comment.