Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vince juliano/deephash 1111 #1123

Merged
merged 2 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 71 additions & 15 deletions servers/su/src/domain/clients/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,9 @@ impl From<GatewayErrorType> for String {
}
}

/*
Right now we dont need all the fields
but later we can add to these types to
pull more data from gql responses
*/
#[derive(Deserialize, Debug, Clone)]
struct Node {
id: String,
}

#[derive(Deserialize, Debug)]
struct Edge {
node: Node,
node: GatewayTx,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -219,14 +209,82 @@ impl Gateway for ArweaveGateway {
}
}

async fn raw(&self, tx_id: &String) -> Result<Vec<u8>, String> {
let config = AoConfig::new(Some("su".to_string())).expect("Failed to read configuration");
let arweave_url = config.arweave_url;

let url = match Url::parse(&arweave_url) {
Ok(u) => u,
Err(e) => return Err(format!("{}", e)),
};

let client = Client::new();

let response = client
.get(
url.join(&format!("raw/{}", tx_id))
.map_err(|e| GatewayErrorType::StatusError(e.to_string()))?,
)
.send()
.await
.map_err(|e| GatewayErrorType::StatusError(e.to_string()))?;

if response.status().is_success() {
let body = response
.bytes()
.await
.map_err(|e| GatewayErrorType::StatusError(e.to_string()))?;
Ok(body.to_vec())
} else {
Err(format!(
"Failed to get status. Status code: {}",
response.status()
))
}
}

async fn gql_tx(&self, tx_id: &String) -> Result<GatewayTx, String> {
let config = AoConfig::new(Some("su".to_string())).expect("Failed to read configuration");
let graphql_url = config.graphql_url;
let client = Client::new();

/*
id
signature
anchor
owner {
address
key
}
tags {
name
value
}
recipient
*/

let query = serde_json::json!({
"query": format!(
"query {{ transactions (ids: [\"{}\"]){{ edges {{ node {{ id }} }} }} }}",
"query {{
transactions(ids: [\"{}\"]) {{
edges {{
node {{
id
signature
anchor
owner {{
address
key
}}
tags {{
name
value
}}
recipient
}}
}}
}}
}}",
tx_id
),
"variables": {}
Expand All @@ -250,9 +308,7 @@ impl Gateway for ArweaveGateway {
.map_err(|e| GatewayErrorType::JsonParseError(e.to_string()))?;

if let Some(edge) = body.data.transactions.edges.get(0) {
Ok(GatewayTx {
id: edge.node.clone().id,
})
Ok(edge.node.clone())
} else {
Err("Transaction not found".to_string())
}
Expand Down
68 changes: 66 additions & 2 deletions servers/su/src/domain/clients/local_store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl LocalStoreClient {
("process_ordering".to_string(), opts_index.clone()),
("message".to_string(), opts_index.clone()),
("message_ordering".to_string(), opts_index.clone()),
("deep_hash".to_string(), opts_index.clone()),
]
}

Expand Down Expand Up @@ -162,6 +163,13 @@ impl LocalStoreClient {
))
}

fn deep_hash_key(&self, process_id: &String, deep_hash: &String) -> Result<String, StoreErrorType> {
Ok(format!(
"deep_hash:{}:{}",
process_id, deep_hash
))
}

/*
This is the core method of this program used
for querying message ranges for the /processid
Expand Down Expand Up @@ -220,7 +228,7 @@ impl LocalStoreClient {
if let Some(ref to_str) = to {
if let Ok(to_timestamp) = to_str.parse::<i64>() {
if timestamp > to_timestamp {
has_next_page = true;
has_next_page = false;
break;
}
}
Expand Down Expand Up @@ -288,6 +296,7 @@ impl DataStore for LocalStoreClient {
&self,
message: &Message,
bundle_in: &[u8],
deep_hash: Option<&String>,
) -> Result<String, StoreErrorType> {
let message_id = message.message_id()?;
let assignment_id = message.assignment_id()?;
Expand All @@ -314,6 +323,19 @@ impl DataStore for LocalStoreClient {
let assignment_key = self.msg_assignment_key(&assignment_id);
self.file_db.put(assignment_key.as_bytes(), bundle_in)?;

let cf = self.index_db.cf_handle("deep_hash").ok_or_else(|| {
StoreErrorType::DatabaseError("Column family 'message_ordering' not found".to_string())
})?;

match deep_hash {
Some(dh) => {
let deep_hash_key = self.deep_hash_key(&message.process_id()?, dh)?;
self.index_db
.put_cf(cf, deep_hash_key.as_bytes(), message.process_id()?.as_bytes())?;
},
None => ()
};

Ok("Message saved".to_string())
}

Expand Down Expand Up @@ -399,6 +421,23 @@ impl DataStore for LocalStoreClient {
}
}

async fn check_existing_deep_hash(&self, process_id: &String, deep_hash: &String) -> Result<(), StoreErrorType> {
let cf = self.index_db.cf_handle("deep_hash").ok_or_else(|| {
StoreErrorType::DatabaseError("Column family 'deep_hash' not found".to_string())
})?;

let deep_hash_key = self.deep_hash_key(process_id, deep_hash)?;
match self.index_db.get_cf(cf, deep_hash_key) {
Ok(dh) => {
match dh {
Some(_) => return Err(StoreErrorType::MessageExists("Deep hash already exists".to_string())),
None => return Ok(())
}
},
Err(_) => return Ok(())
}
}

/*
Message list retrieval for the /processid
query, this returns a paginated list of messages
Expand All @@ -424,7 +463,7 @@ impl DataStore for LocalStoreClient {
*/
let include_process = process_in.assignment.is_some()
&& match from {
Some(ref from_nonce) => from_nonce == &process_in.nonce()?.to_string(),
Some(ref _from_timestamp) => false,
/*
No 'from' means it's the first page
*/
Expand All @@ -441,6 +480,26 @@ impl DataStore for LocalStoreClient {
actual_limit -= 1;
}

/*
handles an edge case where "to" is the message right
after the process, and limit is 1
*/
if include_process && actual_limit == 0 {
match to {
Some(t) => {
let timestamp: i64 = t.parse()?;
if timestamp == process_in.timestamp()? {
return Ok(PaginatedMessages::from_messages(messages, false)?);
} else if timestamp > process_in.timestamp()? {
return Ok(PaginatedMessages::from_messages(messages, true)?);
}
},
None => {
return Ok(PaginatedMessages::from_messages(messages, false)?);
}
}
}

let (paginated_keys, has_next_page) = self
.fetch_message_range(process_id, from, to, &Some(actual_limit))
.await?;
Expand All @@ -453,6 +512,11 @@ impl DataStore for LocalStoreClient {
for (_, assignment_id) in paginated_keys {
let assignment_key = self.msg_assignment_key(&assignment_id);

/*
It is possible the file isnt finished saving and
available on the file db yet that is why this retry loop
is here.
*/
for _ in 0..10 {
if let Some(message_data) = self.file_db.get(assignment_key.as_bytes())? {
let message: Message = Message::from_bytes(message_data)?;
Expand Down
14 changes: 7 additions & 7 deletions servers/su/src/domain/clients/local_store/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mod tests {
let message_bundle = create_test_message_bundle();
let test_message = Message::from_bytes(message_bundle.clone())?;

client.save_message(&test_message, &message_bundle).await?;
client.save_message(&test_message, &message_bundle, None).await?;
let retrieved_message = client.get_message(&test_message.assignment.id)?;

assert_eq!(retrieved_message.assignment.id, test_message.assignment.id);
Expand All @@ -86,7 +86,7 @@ mod tests {
// Save all messages
for bundle in message_bundles.iter() {
let test_message = Message::from_bytes(bundle.clone())?;
client.save_message(&test_message, &bundle).await?;
client.save_message(&test_message, &bundle, None).await?;
}

// Retrieve messages and check nonce order and continuity
Expand Down Expand Up @@ -124,7 +124,7 @@ mod tests {

for bundle in message_bundles.iter() {
let test_message = Message::from_bytes(bundle.clone())?;
client.save_message(&test_message, &bundle).await?;
client.save_message(&test_message, &bundle, None).await?;
}

// Case 1: Default parameters
Expand Down Expand Up @@ -202,7 +202,7 @@ mod tests {
// Save half of the messages
for bundle in message_bundles.iter().take(message_bundles.len() / 2) {
let test_message = Message::from_bytes(bundle.clone())?;
client.save_message(&test_message, &bundle).await?;
client.save_message(&test_message, &bundle, None).await?;
}

let (process_bundle_2, message_bundles_2) = bundle_list_2();
Expand All @@ -212,19 +212,19 @@ mod tests {
// Save half of the messages of next process
for bundle in message_bundles_2.iter().take(message_bundles_2.len() / 2) {
let test_message = Message::from_bytes(bundle.clone())?;
client.save_message(&test_message, &bundle).await?;
client.save_message(&test_message, &bundle, None).await?;
}

// Save second half of messages for the first process
for bundle in message_bundles.iter().skip(message_bundles.len() / 2) {
let test_message = Message::from_bytes(bundle.clone())?;
client.save_message(&test_message, &bundle).await?;
client.save_message(&test_message, &bundle, None).await?;
}

// Save second half of messages for the second process
for bundle in message_bundles_2.iter().skip(message_bundles_2.len() / 2) {
let test_message = Message::from_bytes(bundle.clone())?;
client.save_message(&test_message, &bundle).await?;
client.save_message(&test_message, &bundle, None).await?;
}

// Retrieve messages and check length, nonce order, and continuity
Expand Down
Loading