diff --git a/utils/frame/remote-externalities/src/lib.rs b/utils/frame/remote-externalities/src/lib.rs index 761f3c8859046..d846b34a00973 100644 --- a/utils/frame/remote-externalities/src/lib.rs +++ b/utils/frame/remote-externalities/src/lib.rs @@ -42,7 +42,7 @@ pub use sp_io::TestExternalities; use sp_runtime::{traits::Block as BlockT, StateVersion}; use spinners::{Spinner, Spinners}; use std::{ - cmp::max, + cmp::{max, min}, fs, ops::{Deref, DerefMut}, path::{Path, PathBuf}, @@ -360,7 +360,8 @@ where const PARALLEL_REQUESTS: usize = 4; const BATCH_SIZE_INCREASE_FACTOR: f32 = 1.10; const BATCH_SIZE_DECREASE_FACTOR: f32 = 0.50; - const INITIAL_BATCH_SIZE: usize = 5000; + const REQUEST_DURATION_TARGET: Duration = Duration::from_secs(2); + const INITIAL_BATCH_SIZE: usize = 10; // nodes by default will not return more than 1000 keys per request const DEFAULT_KEY_DOWNLOAD_PAGE: u32 = 1000; const KEYS_PAGE_MAX_RETRIES: usize = 12; @@ -521,6 +522,7 @@ where .insert(method, params.clone()) .map_err(|_| "Invalid batch method and/or params")? } + let request_started = Instant::now(); let batch_response = match client.batch_request::>(batch).await { Ok(batch_response) => batch_response, Err(e) => { @@ -530,20 +532,39 @@ where log::debug!( target: LOG_TARGET, - "Batch request failed, trying again with smaller batch size. {}", + "Batch request failed, resetting batch size to 1. Error: {}", e.to_string() ); - return Self::get_storage_data_dynamic_batch_size( - client, - payloads, - max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize), - bar, - ) - .await + // Request timed out or server errored. Try to get things moving again by starting + // again with just 1 item. + return Self::get_storage_data_dynamic_batch_size(client, payloads, 1, bar).await }, }; + // Request succeeded. Decide whether to increase or decrease the batch size for the next + // request, depending on if the elapsed time was greater than or less than the target. + let request_duration = request_started.elapsed(); + let next_batch_size = if request_duration > Self::REQUEST_DURATION_TARGET { + max(1, (batch_size as f32 * Self::BATCH_SIZE_DECREASE_FACTOR) as usize) + } else { + // Increase the batch size by *at most* the number of remaining payloads + min( + payloads.len(), + // Increase the batch size by *at least* 1 + max( + batch_size + 1, + (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize, + ), + ) + }; + + log::debug!( + target: LOG_TARGET, + "Request duration: {:?} Target duration: {:?} Last batch size: {} Next batch size: {}", + request_duration, Self::REQUEST_DURATION_TARGET, batch_size, next_batch_size + ); + // Collect the data from this batch let mut data: Vec> = vec![]; let batch_response_len = batch_response.len(); @@ -560,7 +581,7 @@ where let mut rest = Self::get_storage_data_dynamic_batch_size( client, remaining_payloads, - max(batch_size + 1, (batch_size as f32 * Self::BATCH_SIZE_INCREASE_FACTOR) as usize), + next_batch_size, bar, ) .await?; @@ -1345,7 +1366,7 @@ mod remote_tests { .execute_with(|| {}); } - #[tokio::test(flavor = "multi_thread")] + #[tokio::test] async fn can_create_snapshot() { const CACHE: &'static str = "can_create_snapshot"; init_logger(); @@ -1369,7 +1390,7 @@ mod remote_tests { .filter(|p| p.path().file_name().unwrap_or_default() == CACHE) .collect::>(); - let snap: Snapshot = Builder::::new().load_snapshot(CACHE.into()).unwrap(); + let snap: Snapshot = Snapshot::load(&PathBuf::from(CACHE)).unwrap(); assert!(matches!(snap, Snapshot { raw_storage, .. } if raw_storage.len() > 0)); assert!(to_delete.len() == 1); @@ -1401,7 +1422,7 @@ mod remote_tests { .filter(|p| p.path().file_name().unwrap_or_default() == CACHE) .collect::>(); - let snap: Snapshot = Builder::::new().load_snapshot(CACHE.into()).unwrap(); + let snap: Snapshot = Snapshot::load(&PathBuf::from(CACHE)).unwrap(); assert!(matches!(snap, Snapshot { raw_storage, .. } if raw_storage.len() > 0)); assert!(to_delete.len() == 1);