diff --git a/servers/su/src/domain/clients/gateway.rs b/servers/su/src/domain/clients/gateway.rs index c9fdd4644..07aacd29f 100644 --- a/servers/su/src/domain/clients/gateway.rs +++ b/servers/su/src/domain/clients/gateway.rs @@ -28,19 +28,9 @@ impl From for String { } } -/* - Right now we dont need all the fields - but later we can add to these types to - pull more data from gql responses -*/ -#[derive(Deserialize, Debug, Clone)] -struct Node { - id: String, -} - #[derive(Deserialize, Debug)] struct Edge { - node: Node, + node: GatewayTx, } #[derive(Deserialize, Debug)] @@ -219,14 +209,82 @@ impl Gateway for ArweaveGateway { } } + async fn raw(&self, tx_id: &String) -> Result, String> { + let config = AoConfig::new(Some("su".to_string())).expect("Failed to read configuration"); + let arweave_url = config.arweave_url; + + let url = match Url::parse(&arweave_url) { + Ok(u) => u, + Err(e) => return Err(format!("{}", e)), + }; + + let client = Client::new(); + + let response = client + .get( + url.join(&format!("raw/{}", tx_id)) + .map_err(|e| GatewayErrorType::StatusError(e.to_string()))?, + ) + .send() + .await + .map_err(|e| GatewayErrorType::StatusError(e.to_string()))?; + + if response.status().is_success() { + let body = response + .bytes() + .await + .map_err(|e| GatewayErrorType::StatusError(e.to_string()))?; + Ok(body.to_vec()) + } else { + Err(format!( + "Failed to get status. Status code: {}", + response.status() + )) + } + } + async fn gql_tx(&self, tx_id: &String) -> Result { let config = AoConfig::new(Some("su".to_string())).expect("Failed to read configuration"); let graphql_url = config.graphql_url; let client = Client::new(); + /* + id + signature + anchor + owner { + address + key + } + tags { + name + value + } + recipient + */ + let query = serde_json::json!({ "query": format!( - "query {{ transactions (ids: [\"{}\"]){{ edges {{ node {{ id }} }} }} }}", + "query {{ + transactions(ids: [\"{}\"]) {{ + edges {{ + node {{ + id + signature + anchor + owner {{ + address + key + }} + tags {{ + name + value + }} + recipient + }} + }} + }} + }}", tx_id ), "variables": {} @@ -250,9 +308,7 @@ impl Gateway for ArweaveGateway { .map_err(|e| GatewayErrorType::JsonParseError(e.to_string()))?; if let Some(edge) = body.data.transactions.edges.get(0) { - Ok(GatewayTx { - id: edge.node.clone().id, - }) + Ok(edge.node.clone()) } else { Err("Transaction not found".to_string()) } diff --git a/servers/su/src/domain/clients/local_store/store.rs b/servers/su/src/domain/clients/local_store/store.rs index ae598a5f6..8fc867a39 100644 --- a/servers/su/src/domain/clients/local_store/store.rs +++ b/servers/su/src/domain/clients/local_store/store.rs @@ -111,6 +111,7 @@ impl LocalStoreClient { ("process_ordering".to_string(), opts_index.clone()), ("message".to_string(), opts_index.clone()), ("message_ordering".to_string(), opts_index.clone()), + ("deep_hash".to_string(), opts_index.clone()), ] } @@ -162,6 +163,13 @@ impl LocalStoreClient { )) } + fn deep_hash_key(&self, process_id: &String, deep_hash: &String) -> Result { + Ok(format!( + "deep_hash:{}:{}", + process_id, deep_hash + )) + } + /* This is the core method of this program used for querying message ranges for the /processid @@ -220,7 +228,7 @@ impl LocalStoreClient { if let Some(ref to_str) = to { if let Ok(to_timestamp) = to_str.parse::() { if timestamp > to_timestamp { - has_next_page = true; + has_next_page = false; break; } } @@ -288,6 +296,7 @@ impl DataStore for LocalStoreClient { &self, message: &Message, bundle_in: &[u8], + deep_hash: Option<&String>, ) -> Result { let message_id = message.message_id()?; let assignment_id = message.assignment_id()?; @@ -314,6 +323,19 @@ impl DataStore for LocalStoreClient { let assignment_key = self.msg_assignment_key(&assignment_id); self.file_db.put(assignment_key.as_bytes(), bundle_in)?; + let cf = self.index_db.cf_handle("deep_hash").ok_or_else(|| { + StoreErrorType::DatabaseError("Column family 'message_ordering' not found".to_string()) + })?; + + match deep_hash { + Some(dh) => { + let deep_hash_key = self.deep_hash_key(&message.process_id()?, dh)?; + self.index_db + .put_cf(cf, deep_hash_key.as_bytes(), message.process_id()?.as_bytes())?; + }, + None => () + }; + Ok("Message saved".to_string()) } @@ -399,6 +421,23 @@ impl DataStore for LocalStoreClient { } } + async fn check_existing_deep_hash(&self, process_id: &String, deep_hash: &String) -> Result<(), StoreErrorType> { + let cf = self.index_db.cf_handle("deep_hash").ok_or_else(|| { + StoreErrorType::DatabaseError("Column family 'deep_hash' not found".to_string()) + })?; + + let deep_hash_key = self.deep_hash_key(process_id, deep_hash)?; + match self.index_db.get_cf(cf, deep_hash_key) { + Ok(dh) => { + match dh { + Some(_) => return Err(StoreErrorType::MessageExists("Deep hash already exists".to_string())), + None => return Ok(()) + } + }, + Err(_) => return Ok(()) + } + } + /* Message list retrieval for the /processid query, this returns a paginated list of messages @@ -424,7 +463,7 @@ impl DataStore for LocalStoreClient { */ let include_process = process_in.assignment.is_some() && match from { - Some(ref from_nonce) => from_nonce == &process_in.nonce()?.to_string(), + Some(ref _from_timestamp) => false, /* No 'from' means it's the first page */ @@ -441,6 +480,26 @@ impl DataStore for LocalStoreClient { actual_limit -= 1; } + /* + handles an edge case where "to" is the message right + after the process, and limit is 1 + */ + if include_process && actual_limit == 0 { + match to { + Some(t) => { + let timestamp: i64 = t.parse()?; + if timestamp == process_in.timestamp()? { + return Ok(PaginatedMessages::from_messages(messages, false)?); + } else if timestamp > process_in.timestamp()? { + return Ok(PaginatedMessages::from_messages(messages, true)?); + } + }, + None => { + return Ok(PaginatedMessages::from_messages(messages, false)?); + } + } + } + let (paginated_keys, has_next_page) = self .fetch_message_range(process_id, from, to, &Some(actual_limit)) .await?; @@ -453,6 +512,11 @@ impl DataStore for LocalStoreClient { for (_, assignment_id) in paginated_keys { let assignment_key = self.msg_assignment_key(&assignment_id); + /* + It is possible the file isnt finished saving and + available on the file db yet that is why this retry loop + is here. + */ for _ in 0..10 { if let Some(message_data) = self.file_db.get(assignment_key.as_bytes())? { let message: Message = Message::from_bytes(message_data)?; diff --git a/servers/su/src/domain/clients/local_store/tests.rs b/servers/su/src/domain/clients/local_store/tests.rs index a4dfbd51d..61f351f86 100644 --- a/servers/su/src/domain/clients/local_store/tests.rs +++ b/servers/su/src/domain/clients/local_store/tests.rs @@ -67,7 +67,7 @@ mod tests { let message_bundle = create_test_message_bundle(); let test_message = Message::from_bytes(message_bundle.clone())?; - client.save_message(&test_message, &message_bundle).await?; + client.save_message(&test_message, &message_bundle, None).await?; let retrieved_message = client.get_message(&test_message.assignment.id)?; assert_eq!(retrieved_message.assignment.id, test_message.assignment.id); @@ -86,7 +86,7 @@ mod tests { // Save all messages for bundle in message_bundles.iter() { let test_message = Message::from_bytes(bundle.clone())?; - client.save_message(&test_message, &bundle).await?; + client.save_message(&test_message, &bundle, None).await?; } // Retrieve messages and check nonce order and continuity @@ -124,7 +124,7 @@ mod tests { for bundle in message_bundles.iter() { let test_message = Message::from_bytes(bundle.clone())?; - client.save_message(&test_message, &bundle).await?; + client.save_message(&test_message, &bundle, None).await?; } // Case 1: Default parameters @@ -202,7 +202,7 @@ mod tests { // Save half of the messages for bundle in message_bundles.iter().take(message_bundles.len() / 2) { let test_message = Message::from_bytes(bundle.clone())?; - client.save_message(&test_message, &bundle).await?; + client.save_message(&test_message, &bundle, None).await?; } let (process_bundle_2, message_bundles_2) = bundle_list_2(); @@ -212,19 +212,19 @@ mod tests { // Save half of the messages of next process for bundle in message_bundles_2.iter().take(message_bundles_2.len() / 2) { let test_message = Message::from_bytes(bundle.clone())?; - client.save_message(&test_message, &bundle).await?; + client.save_message(&test_message, &bundle, None).await?; } // Save second half of messages for the first process for bundle in message_bundles.iter().skip(message_bundles.len() / 2) { let test_message = Message::from_bytes(bundle.clone())?; - client.save_message(&test_message, &bundle).await?; + client.save_message(&test_message, &bundle, None).await?; } // Save second half of messages for the second process for bundle in message_bundles_2.iter().skip(message_bundles_2.len() / 2) { let test_message = Message::from_bytes(bundle.clone())?; - client.save_message(&test_message, &bundle).await?; + client.save_message(&test_message, &bundle, None).await?; } // Retrieve messages and check length, nonce order, and continuity diff --git a/servers/su/src/domain/clients/store.rs b/servers/su/src/domain/clients/store.rs index 80a441501..d4aa34406 100644 --- a/servers/su/src/domain/clients/store.rs +++ b/servers/su/src/domain/clients/store.rs @@ -814,10 +814,21 @@ impl DataStore for StoreClient { } } + async fn check_existing_deep_hash(&self, process_id: &String, deep_hash: &String) -> Result<(), StoreErrorType> { + if self.bytestore.is_ready() { + match self.bytestore.deep_hash_exists(process_id, deep_hash) { + true => return Err(StoreErrorType::MessageExists("Deep hash already exists".to_string())), + false => return Ok(()) + } + } + Ok(()) + } + async fn save_message( &self, message: &Message, bundle_in: &[u8], + deep_hash: Option<&String>, ) -> Result { use super::schema::messages::dsl::*; let conn = &mut self.get_conn()?; @@ -853,6 +864,15 @@ impl DataStore for StoreClient { message.timestamp()?.to_string(), bundle_in.to_vec(), )?; + match deep_hash { + Some(dh) => { + bytestore.save_deep_hash( + &message.process_id()?, + dh + )?; + }, + None => () + }; } Ok("saved".to_string()) } @@ -1541,6 +1561,62 @@ mod bytestore { false } } + + pub fn save_deep_hash( + &self, + process_id: &String, + deep_hash: &String + ) -> Result<(), String> { + let key = format!( + "deephash___{}___{}", + process_id, + deep_hash + ).into_bytes(); + + let value = format!( + "{}", + process_id + ).into_bytes(); + + let db = match self.db.read() { + Ok(r) => r, + Err(_) => return Err("Failed to acquire read lock".into()), + }; + + if let Some(ref db) = *db { + db.put(key, value) + .map_err(|e| format!("Failed to write to RocksDB: {:?}", e))?; + Ok(()) + } else { + Err("Database is not initialized".into()) + } + } + + pub fn deep_hash_exists( + &self, + process_id: &String, + deep_hash: &String, + ) -> bool { + let key = format!( + "deephash___{}___{}", + process_id, + deep_hash + ).into_bytes(); + + let db = match self.db.read() { + Ok(r) => r, + Err(_) => return false, + }; + + if let Some(ref db) = *db { + match db.get(&key) { + Ok(Some(_)) => true, + _ => false, + } + } else { + false + } + } } } diff --git a/servers/su/src/domain/config.rs b/servers/su/src/domain/config.rs index 749f0892f..804a98605 100644 --- a/servers/su/src/domain/config.rs +++ b/servers/su/src/domain/config.rs @@ -39,6 +39,8 @@ pub struct AoConfig { pub use_local_store: bool, pub su_file_db_dir: String, pub su_index_db_dir: String, + + pub enable_deep_hash_checks: bool, } fn get_db_dirs() -> (String, String) { @@ -139,6 +141,10 @@ impl AoConfig { Err(_e) => false, }; let (su_file_db_dir, su_index_db_dir) = get_db_dirs(); + let enable_deep_hash_checks = match env::var("ENABLE_DEEP_HASH_CHECKS") { + Ok(val) => val == "true", + Err(_e) => false, + }; Ok(AoConfig { database_url: env::var("DATABASE_URL")?, database_read_url, @@ -161,6 +167,7 @@ impl AoConfig { use_local_store, su_file_db_dir, su_index_db_dir, + enable_deep_hash_checks }) } } @@ -175,4 +182,7 @@ impl Config for AoConfig { fn enable_process_assignment(&self) -> bool { self.enable_process_assignment.clone() } + fn enable_deep_hash_checks(&self) -> bool { + self.enable_deep_hash_checks.clone() + } } diff --git a/servers/su/src/domain/core/builder.rs b/servers/su/src/domain/core/builder.rs index 3d277ec80..02f2f2dfc 100644 --- a/servers/su/src/domain/core/builder.rs +++ b/servers/su/src/domain/core/builder.rs @@ -1,17 +1,15 @@ use std::sync::Arc; use super::tags::Tag; -use dashmap::DashMap; use super::bytes::{ByteErrorType, DataBundle, DataItem}; -use super::dal::{Gateway, Log, ScheduleProvider, Signer, TxStatus}; +use super::dal::{Gateway, Log, ScheduleProvider, Signer, TxStatus, GatewayTx}; use super::json::Process; pub struct Builder<'a> { gateway: Arc, signer: Arc, - logger: &'a Arc, - cache: Arc>>, + logger: &'a Arc } pub struct BuildResult { @@ -52,8 +50,7 @@ impl<'a> Builder<'a> { Ok(Builder { gateway, signer, - logger, - cache: Arc::new(DashMap::new()), + logger }) } @@ -205,12 +202,7 @@ impl<'a> Builder<'a> { tx_id: &String, process: &Process, base_layer: &Option, - ) -> Result<(), BuilderErrorType> { - // Check if the result is in the DashMap cache - if let Some(cached_result) = self.cache.get(tx_id) { - return cached_result.clone(); - } - + ) -> Result, BuilderErrorType> { // Process the assignment verification let result = match base_layer { Some(_) => { @@ -230,23 +222,20 @@ impl<'a> Builder<'a> { }; match status.number_of_confirmations { - n if n >= threshold => Ok(()), + n if n >= threshold => Ok(None), _ => Err(BuilderErrorType::BuilderError( "Not enough confirmations to assign".to_string(), )), } } None => { - self.gateway.gql_tx(&tx_id).await?; - Ok(()) + Ok(Some(self.gateway.gql_tx(&tx_id).await?)) } }; - // Store the result in the DashMap cache - self.cache.insert(tx_id.clone(), result.clone()); - result } + } #[cfg(test)] @@ -280,8 +269,16 @@ mod tests { async fn gql_tx(&self, _tx_id: &String) -> Result { Ok(GatewayTx { id: "id".to_string(), + signature: "sig".to_string(), + anchor: None, + tags: vec![], + recipient: None, }) } + + async fn raw(&self, _tx_id: &String) -> Result, String> { + Ok(vec![]) + } } struct MockSigner; diff --git a/servers/su/src/domain/core/bytes.rs b/servers/su/src/domain/core/bytes.rs index e7bf1187a..e30bc7a9a 100644 --- a/servers/su/src/domain/core/bytes.rs +++ b/servers/su/src/domain/core/bytes.rs @@ -38,6 +38,12 @@ impl From for ByteErrorType { } } +impl From for ByteErrorType { + fn from(error: base64_url::base64::DecodeError) -> Self { + ByteErrorType::ByteError(format!("Byte error: {:?}", error)) + } +} + #[derive(Clone)] pub struct DataBundle { pub items: Vec, @@ -326,7 +332,7 @@ impl DataItem { Ok(DataItem { signature_type: SignerMap::Arweave, signature: vec![], - owner: owner, + owner, target, anchor, tags, @@ -529,6 +535,73 @@ impl DataItem { Ok(b) } + /* + Utilized for deduplicating incoming messages even + if they have the same id + */ + pub fn deep_hash(&mut self) -> Result { + let data_chunk = match &mut self.data { + Data::None => DeepHashChunk::Chunk(Bytes::new()), + Data::Bytes(data) => DeepHashChunk::Chunk(data.clone().into()) + }; + + let encoded_tags = if !self.tags.is_empty() { + self.tags.encode()? + } else { + Bytes::default() + }; + + let deep_hash_vec = deep_hash_sync(DeepHashChunk::Chunks(vec![ + DeepHashChunk::Chunk(DATAITEM_AS_BUFFER.into()), + DeepHashChunk::Chunk(ONE_AS_BUFFER.into()), + DeepHashChunk::Chunk(ONE_AS_BUFFER.into()), + // this is where the owner normally would be + DeepHashChunk::Chunk(Bytes::new()), + DeepHashChunk::Chunk(self.target.to_vec().into()), + DeepHashChunk::Chunk(self.anchor.to_vec().into()), + DeepHashChunk::Chunk(encoded_tags.clone()), + data_chunk, + ]))?; + + Ok(base64_url::encode(&deep_hash_vec)) + } + + pub fn deep_hash_fields( + target: Option, + anchor: Option, + tags: Vec, + data: Vec + ) -> Result { + let target_chunk = match target { + None => DeepHashChunk::Chunk(Bytes::new()), + Some(t) => DeepHashChunk::Chunk(base64_url::decode(&t)?.into()) + }; + + let anchor_chunk = match anchor { + None => DeepHashChunk::Chunk(Bytes::new()), + Some(a) => DeepHashChunk::Chunk(base64_url::decode(&a)?.into()) + }; + + let encoded_tags = if !tags.is_empty() { + tags.encode()? + } else { + Bytes::default() + }; + + let deep_hash_vec = deep_hash_sync(DeepHashChunk::Chunks(vec![ + DeepHashChunk::Chunk(DATAITEM_AS_BUFFER.into()), + DeepHashChunk::Chunk(ONE_AS_BUFFER.into()), + DeepHashChunk::Chunk(ONE_AS_BUFFER.into()), + DeepHashChunk::Chunk(Bytes::new()), + target_chunk, + anchor_chunk, + DeepHashChunk::Chunk(encoded_tags.clone()), + DeepHashChunk::Chunk(Bytes::from(data)), + ]))?; + + Ok(base64_url::encode(&deep_hash_vec)) + } + pub fn raw_id(&self) -> Vec { let mut hasher = Sha256::new(); hasher.update(&self.signature); diff --git a/servers/su/src/domain/core/dal.rs b/servers/su/src/domain/core/dal.rs index 4a3edb0fc..f7b400a44 100644 --- a/servers/su/src/domain/core/dal.rs +++ b/servers/su/src/domain/core/dal.rs @@ -4,6 +4,7 @@ use serde::Deserialize; pub use super::bytes::DataItem; pub use super::json::{JsonErrorType, Message, PaginatedMessages, Process}; pub use super::router::{ProcessScheduler, Scheduler}; +pub use super::tags::Tag; /* Interfaces for core dependencies. Implement these traits @@ -22,9 +23,13 @@ pub struct TxStatus { pub number_of_confirmations: i32, } -#[derive(Deserialize)] +#[derive(Deserialize, Debug, Clone)] pub struct GatewayTx { pub id: String, + pub signature: String, + pub anchor: Option, + pub tags: Vec, + pub recipient: Option, } #[async_trait] @@ -33,6 +38,7 @@ pub trait Gateway: Send + Sync { async fn network_info(&self) -> Result; async fn status(&self, tx_id: &String) -> Result; async fn gql_tx(&self, tx_id: &String) -> Result; + async fn raw(&self, tx_id: &String) -> Result, String>; } pub trait Wallet: Send + Sync { @@ -62,6 +68,7 @@ pub trait Config: Send + Sync { fn mode(&self) -> String; fn scheduler_list_path(&self) -> String; fn enable_process_assignment(&self) -> bool; + fn enable_deep_hash_checks(&self) -> bool; } #[derive(Debug)] @@ -121,6 +128,7 @@ pub trait DataStore: Send + Sync { &self, message: &Message, bundle_in: &[u8], + deep_hash: Option<&String>, ) -> Result; async fn get_messages( &self, @@ -135,6 +143,7 @@ pub trait DataStore: Send + Sync { process_id_in: &str, ) -> Result, StoreErrorType>; fn check_existing_message(&self, message_id: &String) -> Result<(), StoreErrorType>; + async fn check_existing_deep_hash(&self, process_id: &String, deep_hash: &String) -> Result<(), StoreErrorType>; } #[async_trait] diff --git a/servers/su/src/domain/core/flows.rs b/servers/su/src/domain/core/flows.rs index b76232207..56fb9fcdf 100644 --- a/servers/su/src/domain/core/flows.rs +++ b/servers/su/src/domain/core/flows.rs @@ -9,6 +9,7 @@ use simd_json::to_string as simd_to_string; use super::builder::Builder; use super::json::{Message, Process}; use super::scheduler; +use super::bytes::DataItem; use super::dal::{ Config, CoreMetrics, DataStore, Gateway, Log, RouterDataStore, Signer, Uploader, Wallet, @@ -132,7 +133,8 @@ pub async fn write_item( set cache that gets set before the lock is released */ if let Some(ref item) = data_item { - deps.data_store.check_existing_message(&item.id())? + deps.data_store.check_existing_message(&item.id())?; + }; deps.logger.log(format!("checked for message existence- {}", &target_id)); @@ -148,7 +150,11 @@ pub async fn write_item( deps.logger.log(format!("incrememted scheduler - {}", &target_id)); - // XOR, if we have one of these, we must have both. + /* + XOR, if we have one of these, we must have both. + The else if condition here contains the flow for a + POST of an assignment + */ if process_id.is_some() ^ assign.is_some() { return Err("If sending assign or process-id, you must send both.".to_string()); } else if let (Some(process_id), Some(assign)) = (process_id.clone(), assign.clone()) { @@ -162,18 +168,49 @@ pub async fn write_item( .await?; let process = deps.data_store.get_process(&process_id).await?; - builder + + let gateway_tx = match builder .verify_assignment(&assign, &process, &base_layer) - .await?; + .await? { + Some(g) => g, + None => return Err("Invalid gateway tx for assignming".to_string()) + }; + /* + If this is an assignment of an AO Message, + check for a duplicate deep hash and throw + an error if we find one + */ + let deep_hash = match &base_layer { + Some(_) => None, + None => { + let tx_data = deps.gateway.raw(&assign).await?; + let dh = DataItem::deep_hash_fields( + gateway_tx.recipient, + gateway_tx.anchor, + gateway_tx.tags, + tx_data, + ) + .map_err(|_| "Unable to calculate deep hash".to_string())?; + + if deps.config.enable_deep_hash_checks() { + deps.data_store + .check_existing_deep_hash(&process_id, &dh) + .await?; + } + + Some(dh) + } + }; + let aid = assignment.id(); let return_aid = assignment.id(); let build_result = builder.bundle_items(vec![assignment]).await?; let message = Message::from_bundle(&build_result.bundle)?; deps.data_store - .save_message(&message, &build_result.binary) + .save_message(&message, &build_result.binary, deep_hash.as_ref()) .await?; - deps.logger.log(format!("saved message - {:?}", &message)); + deps.logger.log(format!("saved message")); /* we set the id of the previous assignment @@ -192,6 +229,11 @@ pub async fn write_item( return id_res(&deps, return_aid, start_top_level); } + /* + The rest of this function handles writing a Process + or a Message data item. + */ + let data_item = match data_item { Some(d) => d, None => return Err("Unable to parse data item".to_string()), @@ -204,124 +246,146 @@ pub async fn write_item( return Err("Data-Protocol tag not present".to_string()); } - deps.logger.log(format!("tags cloned - {}", &target_id)); + let type_tag = match type_tag { + Some(t) => t, + None => return Err("Invalid Type Tag".to_string()) + }; - if let Some(type_tag) = type_tag { - if type_tag.value == "Process" { - let mod_tag_exists = tags.iter().any(|tag| tag.name == "Module"); - let sched_tag_exists = tags.iter().any(|tag| tag.name == "Scheduler"); + if type_tag.value == "Process" { + let mod_tag_exists = tags.iter().any(|tag| tag.name == "Module"); + let sched_tag_exists = tags.iter().any(|tag| tag.name == "Scheduler"); - if !mod_tag_exists || !sched_tag_exists { - return Err( - "Required Module and Scheduler tags for Process type not present".to_string(), - ); - } + if !mod_tag_exists || !sched_tag_exists { + return Err( + "Required Module and Scheduler tags for Process type not present".to_string(), + ); + } - /* - If we dont enable_process_assignment, the - su will follow the old flow and not generate - an assignment for the process. + /* + If we dont enable_process_assignment, the + su will follow the old flow and not generate + an assignment for the process. - As a result, no process will be returned - in the messages list either, and the Nonce - will start at 0 for the first message - */ - if deps.config.enable_process_assignment() { - match data_item.tags().iter().find(|tag| tag.name == "On-Boot") { - Some(boot_tag) => match boot_tag.value.as_str() { - "Data" => (), - tx_id => { - if !deps.gateway.check_head(tx_id.to_string()).await? { - return Err("Invalid tx id for On-Boot tag".to_string()); - } + As a result, no process will be returned + in the messages list either, and the Nonce + will start at 0 for the first message + */ + if deps.config.enable_process_assignment() { + match data_item.tags().iter().find(|tag| tag.name == "On-Boot") { + Some(boot_tag) => match boot_tag.value.as_str() { + "Data" => (), + tx_id => { + if !deps.gateway.check_head(tx_id.to_string()).await? { + return Err("Invalid tx id for On-Boot tag".to_string()); } - }, - None => (), - }; - - deps.logger.log(format!("boot load check complete - {}", &target_id)); - - let assignment = builder - .gen_assignment(None, data_item.id(), &next_schedule_info, &None) - .await?; - - deps.logger.log(format!("assignment generated - {}", &target_id)); - - let aid = assignment.id(); - let did = data_item.id(); - let build_result = builder.bundle_items(vec![assignment, data_item]).await?; - - deps.logger.log(format!("data bundled - {}", &target_id)); - - let process = Process::from_bundle(&build_result.bundle)?; - deps.data_store - .save_process(&process, &build_result.binary)?; - deps.logger.log(format!("saved process - {:?}", &process)); - - deps.scheduler - .commit(&mut *schedule_info, &next_schedule_info, did, aid); - drop(schedule_info); - - deps.logger.log(format!("scheduler committed cloned - {}", &target_id)); - - upload(&deps, build_result.binary.to_vec()).await?; + } + }, + None => (), + }; - deps.logger.log(format!("upload triggered - {}", &target_id)); - return id_res(&deps, process.process.process_id.clone(), start_top_level); - } else { - let build_result = builder.build_process(input, &next_schedule_info).await?; - let process = Process::from_bundle_no_assign( - &build_result.bundle, - &build_result.bundle_data_item, - )?; - deps.data_store - .save_process(&process, &build_result.binary)?; - deps.logger.log(format!("saved process - {:?}", &process)); - - /* - We dont commit and schedule info change here - because the process is not getting a Nonce. - However we dont drop the lock until the Process - is successfully saved to the database - */ - drop(schedule_info); - - upload(&deps, build_result.binary.to_vec()).await?; - return id_res(&deps, process.process.process_id.clone(), start_top_level); - } - } else if type_tag.value == "Message" { let assignment = builder - .gen_assignment( - Some(data_item.id()), - data_item.target(), - &next_schedule_info, - &None, - ) + .gen_assignment(None, data_item.id(), &next_schedule_info, &None) .await?; let aid = assignment.id(); - let dtarget = data_item.target(); + let did = data_item.id(); let build_result = builder.bundle_items(vec![assignment, data_item]).await?; - let message = Message::from_bundle(&build_result.bundle)?; + + let process = Process::from_bundle(&build_result.bundle)?; deps.data_store - .save_message(&message, &build_result.binary) - .await?; - deps.logger.log(format!("saved message - {:?}", &message)); + .save_process(&process, &build_result.binary)?; - /* - we set the id of the previous assignment - for the next message to be able to use - in its Hash Chain - */ deps.scheduler - .commit(&mut *schedule_info, &next_schedule_info, dtarget, aid); + .commit(&mut *schedule_info, &next_schedule_info, did, aid); drop(schedule_info); upload(&deps, build_result.binary.to_vec()).await?; - return id_res(&deps, message.message_id()?, start_top_level); + + return id_res(&deps, process.process.process_id.clone(), start_top_level); } else { - return Err("Type tag not present".to_string()); + let build_result = builder.build_process(input, &next_schedule_info).await?; + let process = Process::from_bundle_no_assign( + &build_result.bundle, + &build_result.bundle_data_item, + )?; + deps.data_store + .save_process(&process, &build_result.binary)?; + deps.logger.log(format!("saved process")); + + /* + We dont commit and schedule info change here + because the process is not getting a Nonce. + However we dont drop the lock until the Process + is successfully saved to the database + */ + drop(schedule_info); + + upload(&deps, build_result.binary.to_vec()).await?; + return id_res(&deps, process.process.process_id.clone(), start_top_level); } + } else if type_tag.value == "Message" { + let assignment = builder + .gen_assignment( + Some(data_item.id()), + data_item.target(), + &next_schedule_info, + &None, + ) + .await?; + + let aid = assignment.id(); + let dtarget = data_item.target(); + + let deep_hash = match tags.iter().find(|tag| tag.name == "From-Process") { + /* + If the Message contains a From-Process tag it is + a pushed message so we should dedupe it, otherwise + it is a user message and we should not + */ + Some(_) => { + let mut mutable_item = data_item.clone(); + let deep_hash = match mutable_item.deep_hash() { + Ok(d) => d, + Err(_) => return Err("Unable to calculate deep hash".to_string()) + }; + + /* + Throw an error if we detect a duplicated pushed + message + */ + if deps.config.enable_deep_hash_checks() { + deps.data_store + .check_existing_deep_hash(&dtarget, &deep_hash) + .await?; + } + + Some(deep_hash) + }, + None => { + None + } + }; + + let build_result = builder.bundle_items(vec![assignment, data_item]).await?; + let message = Message::from_bundle(&build_result.bundle)?; + + deps.data_store + .save_message(&message, &build_result.binary, deep_hash.as_ref()) + .await?; + + deps.logger.log(format!("saved message")); + + /* + we set the id of the previous assignment + for the next message to be able to use + in its Hash Chain + */ + deps.scheduler + .commit(&mut *schedule_info, &next_schedule_info, dtarget, aid); + drop(schedule_info); + + upload(&deps, build_result.binary.to_vec()).await?; + return id_res(&deps, message.message_id()?, start_top_level); } else { return Err("Type tag not present".to_string()); } diff --git a/servers/su/su b/servers/su/su index 703b5177a..d5f7c220d 100644 Binary files a/servers/su/su and b/servers/su/su differ