Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-744-Review-4 #565

Open
wants to merge 10 commits into
base: GH-744-Review-4-Base
Choose a base branch
from
30 changes: 17 additions & 13 deletions node/src/accountant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ mod tests {
use std::sync::Mutex;
use std::time::Duration;
use std::vec;
use web3::types::TransactionReceipt;
use crate::blockchain::blockchain_interface::blockchain_interface_web3::lower_level_interface_web3::{TransactionBlock, TxReceipt, TxStatus};

impl Handler<AssertionsMessage<Accountant>> for Accountant {
type Result = ();
Expand Down Expand Up @@ -3799,11 +3799,13 @@ mod tests {
.build();
let subject_addr = subject.start();
let transaction_hash_1 = make_tx_hash(4545);
let mut transaction_receipt_1 = TransactionReceipt::default();
transaction_receipt_1.transaction_hash = transaction_hash_1;
transaction_receipt_1.status = Some(U64::from(1)); //success
transaction_receipt_1.block_number = Some(U64::from(100));
transaction_receipt_1.block_hash = Some(Default::default());
let transaction_receipt_1 = TxReceipt {
transaction_hash: transaction_hash_1,
status: TxStatus::Succeeded(TransactionBlock {
block_hash: Default::default(),
block_number: U64::from(100),
}),
};
let fingerprint_1 = PendingPayableFingerprint {
rowid: 5,
timestamp: from_time_t(200_000_000),
Expand All @@ -3813,11 +3815,13 @@ mod tests {
process_error: None,
};
let transaction_hash_2 = make_tx_hash(3333333);
let mut transaction_receipt_2 = TransactionReceipt::default();
transaction_receipt_2.transaction_hash = transaction_hash_2;
transaction_receipt_2.status = Some(U64::from(1)); //success
transaction_receipt_2.block_number = Some(U64::from(200));
transaction_receipt_2.block_hash = Some(Default::default());
let transaction_receipt_2 = TxReceipt {
transaction_hash: transaction_hash_2,
status: TxStatus::Succeeded(TransactionBlock {
block_hash: Default::default(),
block_number: U64::from(200),
}),
};
let fingerprint_2 = PendingPayableFingerprint {
rowid: 10,
timestamp: from_time_t(199_780_000),
Expand All @@ -3829,11 +3833,11 @@ mod tests {
let msg = ReportTransactionReceipts {
fingerprints_with_receipts: vec![
(
TransactionReceiptResult::RpcResponse(transaction_receipt_1.into()),
TransactionReceiptResult::RpcResponse(transaction_receipt_1),
fingerprint_1.clone(),
),
(
TransactionReceiptResult::RpcResponse(transaction_receipt_2.into()),
TransactionReceiptResult::RpcResponse(transaction_receipt_2),
fingerprint_2.clone(),
),
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ mod tests {

use masq_lib::logger::Logger;
use masq_lib::test_utils::logging::{init_test_logging, TestLogHandler};
use web3::types::U256;
use masq_lib::test_utils::utils::TEST_DEFAULT_CHAIN;
use web3::types::U256;

fn blockchain_agent_null_constructor_works<C>(constructor: C)
where
Expand Down Expand Up @@ -192,5 +192,4 @@ mod tests {
assert_eq!(result, TEST_DEFAULT_CHAIN);
assert_error_log(test_name, "get_chain")
}

}
96 changes: 38 additions & 58 deletions node/src/accountant/scanners/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ use crate::accountant::scanners::scanners_utils::payable_scanner_utils::{
separate_errors, separate_rowids_and_hashes, PayableThresholdsGauge,
PayableThresholdsGaugeReal, PayableTransactingErrorEnum, PendingPayableMetadata,
};
use crate::accountant::scanners::scanners_utils::pending_payable_scanner_utils::{
elapsed_in_ms, handle_status_with_failure, handle_status_with_success,
PendingPayableScanReport,
};
use crate::accountant::scanners::scanners_utils::pending_payable_scanner_utils::{handle_none_receipt, handle_status_with_failure, handle_status_with_success, PendingPayableScanReport};
use crate::accountant::scanners::scanners_utils::receivable_scanner_utils::balance_and_age;
use crate::accountant::PendingPayableId;
use crate::accountant::{
Expand Down Expand Up @@ -654,49 +651,28 @@ impl PendingPayableScanner {
msg: ReportTransactionReceipts,
logger: &Logger,
) -> PendingPayableScanReport {
fn handle_none_receipt(
mut scan_report: PendingPayableScanReport,
payable: PendingPayableFingerprint,
error_msg: String,
logger: &Logger,
) -> PendingPayableScanReport {
debug!(logger,
"Interpreting a receipt for transaction {:?} but {}; attempt {}, {}ms since sending",
payable.hash, error_msg, payable.attempt,elapsed_in_ms(payable.timestamp)
);

scan_report
.still_pending
.push(PendingPayableId::new(payable.rowid, payable.hash));
scan_report
}

let scan_report = PendingPayableScanReport::default();
msg.fingerprints_with_receipts.into_iter().fold(
scan_report,
|scan_report_so_far, (receipt_result, fingerprint)| match receipt_result {
TransactionReceiptResult::RpcResponse(tx_receipt) => {
match tx_receipt.status {
TxStatus::Pending => {
handle_none_receipt(
scan_report_so_far,
fingerprint,
"none was given".to_string(),
logger,
)
}
TxStatus::Failed => {
handle_status_with_failure(scan_report_so_far, fingerprint, logger)
}
TxStatus::Succeeded(_) => {
handle_status_with_success(scan_report_so_far, fingerprint, logger)
}
TransactionReceiptResult::RpcResponse(tx_receipt) => match tx_receipt.status {
TxStatus::Pending => handle_none_receipt(
scan_report_so_far,
fingerprint,
"none was given",
logger,
),
TxStatus::Failed => {
handle_status_with_failure(scan_report_so_far, fingerprint, logger)
}
}
TxStatus::Succeeded(_) => {
handle_status_with_success(scan_report_so_far, fingerprint, logger)
}
},
TransactionReceiptResult::LocalError(e) => handle_none_receipt(
scan_report_so_far,
fingerprint,
format!("failed due to {}", e),
&format!("failed due to {}", e),
logger,
),
},
Expand Down Expand Up @@ -891,10 +867,10 @@ impl ReceivableScanner {
),
}
} else {
let mut txn = self
.receivable_dao
.as_mut()
.more_money_received(received_payments_msg.timestamp, &received_payments_msg.transactions);
let mut txn = self.receivable_dao.as_mut().more_money_received(
received_payments_msg.timestamp,
&received_payments_msg.transactions,
);
let new_start_block = received_payments_msg.new_start_block;
match self
.persistent_configuration
Expand Down Expand Up @@ -1139,7 +1115,7 @@ mod tests {
use std::time::{Duration, SystemTime};
use web3::types::{TransactionReceipt, H256};
use web3::Error;
use crate::blockchain::blockchain_interface::blockchain_interface_web3::lower_level_interface_web3::{TransactionReceiptResult, TxReceipt, TxStatus};
use crate::blockchain::blockchain_interface::blockchain_interface_web3::lower_level_interface_web3::{TransactionBlock, TransactionReceiptResult, TxReceipt, TxStatus};

#[test]
fn scanners_struct_can_be_constructed_with_the_respective_scanners() {
Expand Down Expand Up @@ -2504,9 +2480,9 @@ mod tests {
};
let msg = ReportTransactionReceipts {
fingerprints_with_receipts: vec![(
TransactionReceiptResult::RpcResponse(TxReceipt{
TransactionReceiptResult::RpcResponse(TxReceipt {
transaction_hash: hash,
status: TxStatus::Pending
status: TxStatus::Pending,
}),
fingerprint.clone(),
)],
Expand Down Expand Up @@ -2827,11 +2803,13 @@ mod tests {
.pending_payable_dao(pending_payable_dao)
.build();
let transaction_hash_1 = make_tx_hash(4545);
let mut transaction_receipt_1 = TransactionReceipt::default();
transaction_receipt_1.transaction_hash = transaction_hash_1;
transaction_receipt_1.status = Some(U64::from(1)); //success
transaction_receipt_1.block_number = Some(U64::from(1234));
transaction_receipt_1.block_hash = Some(Default::default());
let transaction_receipt_1 = TxReceipt {
transaction_hash: transaction_hash_1,
status: TxStatus::Succeeded(TransactionBlock {
block_hash: Default::default(),
block_number: U64::from(1234),
}),
};
let fingerprint_1 = PendingPayableFingerprint {
rowid: 5,
timestamp: from_time_t(200_000_000),
Expand All @@ -2841,11 +2819,13 @@ mod tests {
process_error: None,
};
let transaction_hash_2 = make_tx_hash(1234);
let mut transaction_receipt_2 = TransactionReceipt::default();
transaction_receipt_2.transaction_hash = transaction_hash_2;
transaction_receipt_2.status = Some(U64::from(1)); //success
transaction_receipt_2.block_number = Some(U64::from(2345));
transaction_receipt_2.block_hash = Some(Default::default());
let transaction_receipt_2 = TxReceipt {
transaction_hash: transaction_hash_2,
status: TxStatus::Succeeded(TransactionBlock {
block_hash: Default::default(),
block_number: U64::from(2345),
}),
};
let fingerprint_2 = PendingPayableFingerprint {
rowid: 10,
timestamp: from_time_t(199_780_000),
Expand All @@ -2857,11 +2837,11 @@ mod tests {
let msg = ReportTransactionReceipts {
fingerprints_with_receipts: vec![
(
TransactionReceiptResult::RpcResponse(transaction_receipt_1.into()),
TransactionReceiptResult::RpcResponse(transaction_receipt_1),
fingerprint_1.clone(),
),
(
TransactionReceiptResult::RpcResponse(transaction_receipt_2.into()),
TransactionReceiptResult::RpcResponse(transaction_receipt_2),
fingerprint_2.clone(),
),
],
Expand Down
21 changes: 21 additions & 0 deletions node/src/accountant/scanners/scanners_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,27 @@ pub mod pending_payable_scanner_utils {
scan_report.failures.push(fingerprint.into());
scan_report
}

pub fn handle_none_receipt(
mut scan_report: PendingPayableScanReport,
payable: PendingPayableFingerprint,
error_msg: &str,
logger: &Logger,
) -> PendingPayableScanReport {
debug!(
logger,
"Interpreting a receipt for transaction {:?} but {}; attempt {}, {}ms since sending",
payable.hash,
error_msg,
payable.attempt,
elapsed_in_ms(payable.timestamp)
);

scan_report
.still_pending
.push(PendingPayableId::new(payable.rowid, payable.hash));
scan_report
}
}

pub mod receivable_scanner_utils {
Expand Down
59 changes: 32 additions & 27 deletions node/src/blockchain/blockchain_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,8 @@ impl BlockchainBridge {
format!("Error while retrieving transactions: {:?}", e)
})
.and_then(move |retrieved_blockchain_transactions| {
received_payments_subs.try_send(ReceivedPayments {
received_payments_subs
.try_send(ReceivedPayments {
timestamp: SystemTime::now(),
new_start_block: retrieved_blockchain_transactions.new_start_block,
response_skeleton_opt: msg.response_skeleton_opt,
Expand Down Expand Up @@ -400,33 +401,32 @@ impl BlockchainBridge {
.process_transaction_receipts(transaction_hashes)
.map_err(move |e| e.to_string())
.and_then(move |transaction_receipts_results| {
let length = transaction_receipts_results.len();
let mut transactions_found = 0;
for transaction_receipt in &transaction_receipts_results {
if let TransactionReceiptResult::RpcResponse(tx_receipt) = transaction_receipt {
if let TxStatus::Succeeded(_) = tx_receipt.status {
transactions_found += 1;
let (successful_count, failed_count, pending_count) = transaction_receipts_results
.iter()
.fold((0, 0, 0), |(success, fail, pending), transaction_receipt| {
match transaction_receipt {
TransactionReceiptResult::RpcResponse(tx_receipt) => match tx_receipt.status {
TxStatus::Failed => (success, fail + 1, pending),
TxStatus::Pending => (success, fail, pending + 1),
TxStatus::Succeeded(_) => (success + 1, fail, pending),
},
TransactionReceiptResult::LocalError(_)=> (success, fail, pending + 1),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a couple of points about possibilities for minor optimisations.

  • line 401: move is redundant
  • if length != transactions_found { coupled with "Aborting scanning; {} transactions succeed and {} transactions failed" is controversial. It's because the scan hasn't differed from a successful one in the length of the process (you do the same amount of work to figure out all the transactions were successful as you do if there are some pending or failed transactions), therefore speaking of "aborting" is misinterpreting the truth. We didn't abort. We completed everything.

However there is more than that: you cannot count the succeeded transactions only and then assume that the rest of them have failed with no exception. That's not right. Be aware that the rest is a mix of legitimately failed transactions, yes, but also those still pending. I'm asking, do you think it's appropriate having the pending transactions counted in the failures? Because that's what you're doing in the debug!() log down here.

If you still think we'd benefit from having that log, I'd recommend to keep track of all three categories: Confirmed, Pending, Failed.

  • inside the function process_transaction_receipts(), there is a passage in the chain of Future-devoted methods that says .map_err(|e| e) which is a redundant call with no effect, you can delete that line
  • in the impl of get_transaction_receipt_in_batch(), you can see this snipped:
    let _ = hash_vec.into_iter().map(|hash| {
            self.web3_batch.eth().transaction_receipt(hash);
        });

Honestly, I'm surprised that this code even does anything because I've been taught that iterators in Rust are "lazy" and therefore their code isn't executed until a certain terminal method is reached and performed. Most of the time this method is the renowned collect(), then fold(), partition() and also for_each().
As you have used none of them in here, it makes me wonder how you got your code working, assuming this has proper test coverage. Anyway, without having to argue over this, we can easily put end to this question with the replacement of .map() by .for_each(). This swap will also rule out the assignment to an anonymous variable since for_each() is not allowed to return a value, naturally: It's meant to allow side effects only. I hope you'll be able to make out what I instruct you for.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this change! 😃

I really appreciated your insights on tracking the different transaction statuses and the iterator usage. I went ahead and implemented those suggestions.

As for the move and the redundant error mapping, I decided to leave those as is to keep the original function signature intact.

}
}
});
debug!(logger, "Scan results: Successful: {successful_count}, Pending: {pending_count}, Failed: {failed_count}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think you could pack this whole fold for the debug into a separate function.

I've got this in my mind:

You can use the trick with

logger.debug(||{
 <your function returns a String (the debug message)>
})

Therefore:

fn transaction_receipts_results_debug(transaction_receipts_results: <The proper type>){
       logger.debug({
              let (successful_count, failed_count, pending_count) = transaction_receipts_results
                       .iter()
                       .fold((0, 0, 0), |(success, fail, pending), transaction_receipt| {
                               match transaction_receipt {
                                    TransactionReceiptResult::RpcResponse(tx_receipt) => match tx_receipt.status {
                                        TxStatus::Failed => (success, fail + 1, pending),
                                        TxStatus::Pending => (success, fail, pending + 1),
                                        TxStatus::Succeeded(_) => (success + 1, fail, pending),
                                    },
                                   TransactionReceiptResult::LocalError(_)=> (success, fail, pending + 1),
                              }
              format!("Scan results: Successful: {successful_count}, Pending: {pending_count}, Failed: {failed_count}");
      })
}

A praise for that nice use of .fold(). 😉


let pairs = transaction_receipts_results
.into_iter()
.zip(msg.pending_payable.into_iter())
.collect_vec();

accountant_recipient
.try_send(ReportTransactionReceipts {
fingerprints_with_receipts: pairs,
response_skeleton_opt: msg.response_skeleton_opt,
})
.expect("Accountant is dead");
if length != transactions_found {
debug!(
logger,
"Aborting scanning; {} transactions succeed and {} transactions failed",
transactions_found,
length - transactions_found
);
};

Ok(())
}),
)
Expand Down Expand Up @@ -583,7 +583,7 @@ mod tests {
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use web3::types::{TransactionReceipt, H160};
use crate::blockchain::blockchain_interface::blockchain_interface_web3::lower_level_interface_web3::TxReceipt;
use crate::blockchain::blockchain_interface::blockchain_interface_web3::lower_level_interface_web3::{TransactionBlock, TxReceipt};

impl Handler<AssertionsMessage<Self>> for BlockchainBridge {
type Result = ();
Expand Down Expand Up @@ -1198,9 +1198,10 @@ mod tests {
pending_payable_fingerprint_1
),
(
TransactionReceiptResult::RpcResponse(TxReceipt{
TransactionReceiptResult::RpcResponse(TxReceipt {
transaction_hash: hash_2,
status: TxStatus::Pending }),
status: TxStatus::Pending
}),
pending_payable_fingerprint_2
),
],
Expand Down Expand Up @@ -1326,11 +1327,13 @@ mod tests {
amount: 7879,
process_error: None,
};
let mut transaction_receipt = TransactionReceipt::default();
transaction_receipt.block_number = Some(block_number);
transaction_receipt.block_hash = Some(Default::default());
transaction_receipt.contract_address = Some(contract_address);
transaction_receipt.status = Some(U64::from(1));
let transaction_receipt = TxReceipt {
transaction_hash: Default::default(),
status: TxStatus::Succeeded(TransactionBlock {
block_hash: Default::default(),
block_number,
}),
};
let blockchain_interface = make_blockchain_interface_web3(port);
let system = System::new("test_transaction_receipts");
let mut subject = BlockchainBridge::new(
Expand Down Expand Up @@ -1367,7 +1370,7 @@ mod tests {
ReportTransactionReceipts {
fingerprints_with_receipts: vec![
(TransactionReceiptResult::RpcResponse(TxReceipt{ transaction_hash: hash_1, status: TxStatus::Pending }), fingerprint_1),
(TransactionReceiptResult::RpcResponse(transaction_receipt.into()), fingerprint_2),
(TransactionReceiptResult::RpcResponse(transaction_receipt), fingerprint_2),
(TransactionReceiptResult::RpcResponse(TxReceipt{ transaction_hash: hash_3, status: TxStatus::Pending }), fingerprint_3),
(TransactionReceiptResult::LocalError("RPC error: Error { code: ServerError(429), message: \"The requests per second (RPS) of your requests are higher than your plan allows.\", data: None }".to_string()), fingerprint_4)
],
Expand All @@ -1377,7 +1380,9 @@ mod tests {
}),
}
);
TestLogHandler::new().exists_log_containing("DEBUG: BlockchainBridge: Aborting scanning; 1 transactions succeed and 3 transactions failed");
TestLogHandler::new().exists_log_containing(
"DEBUG: BlockchainBridge: Scan results: Successful: 1, Pending: 3, Failed: 0",
);
}

#[test]
Expand Down
Loading