Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fatxpool: event streams moved to view domain #7545

Merged
merged 12 commits into from
Feb 19, 2025
Merged
9 changes: 9 additions & 0 deletions prdoc/pr_7545.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
title: '`fatxpool`: event streams moved to view domain'
doc:
- audience: Node Dev
description: |-
This pull request refactors the transaction pool `graph` module by renaming components for better clarity and decouples `graph` module from `view` module related specifics.
This PR does not introduce changes in the logic.
crates:
- name: sc-transaction-pool
bump: minor
2 changes: 1 addition & 1 deletion substrate/client/transaction-pool/benches/basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ fn uxt(transfer: TransferData) -> Extrinsic {
ExtrinsicBuilder::new_bench_call(transfer).build()
}

fn bench_configured(pool: Pool<TestApi>, number: u64, api: Arc<TestApi>) {
fn bench_configured(pool: Pool<TestApi, ()>, number: u64, api: Arc<TestApi>) {
let source = TimedTransactionSource::new_external(false);
let mut futures = Vec::new();
let mut tags = Vec::new();
Expand Down
4 changes: 3 additions & 1 deletion substrate/client/transaction-pool/src/common/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

//! Testing related primitives for internal usage in this crate.

use crate::graph::{BlockHash, ChainApi, ExtrinsicFor, NumberFor, Pool, RawExtrinsicFor};
use crate::graph::{BlockHash, ChainApi, ExtrinsicFor, NumberFor, RawExtrinsicFor};
use codec::Encode;
use parking_lot::Mutex;
use sc_transaction_pool_api::error;
Expand All @@ -36,6 +36,8 @@ use substrate_test_runtime::{
ExtrinsicBuilder, Hashing, RuntimeCall, Transfer, TransferData, H256,
};

type Pool<Api> = crate::graph::Pool<Api, ()>;

pub(crate) const INVALID_NONCE: u64 = 254;

/// Test api that implements [`ChainApi`].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ pub enum DroppedReason<Hash> {
}

/// Dropped-logic related event from the single view.
pub type ViewStreamEvent<C> = crate::graph::TransactionStatusEvent<ExtrinsicHash<C>, BlockHash<C>>;
pub type ViewStreamEvent<C> =
crate::fork_aware_txpool::view::TransactionStatusEvent<ExtrinsicHash<C>, BlockHash<C>>;

/// Dropped-logic stream of events coming from the single view.
type ViewStream<C> = Pin<Box<dyn futures::Stream<Item = ViewStreamEvent<C>> + Send>>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ where
};

if let Ok((Some(best_tree_route), Some(best_view))) = best_result {
let tmp_view: View<ChainApi> =
let (tmp_view, _, _): (View<ChainApi>, _, _) =
View::new_from_other(&best_view, &HashAndNumber { hash: at, number: block_number });

let mut all_extrinsics = vec![];
Expand Down Expand Up @@ -1085,26 +1085,28 @@ where
?tree_route,
"build_new_view"
);
let mut view = if let Some(origin_view) = origin_view {
let mut view = View::new_from_other(&origin_view, at);
if !tree_route.retracted().is_empty() {
view.pool.clear_recently_pruned();
}
view
} else {
debug!(
target: LOG_TARGET,
?at,
"creating non-cloned view"
);
View::new(
self.api.clone(),
at.clone(),
self.options.clone(),
self.metrics.clone(),
self.is_validator.clone(),
)
};
let (mut view, view_dropped_stream, view_aggregated_stream) =
if let Some(origin_view) = origin_view {
let (mut view, view_dropped_stream, view_aggragated_stream) =
View::new_from_other(&origin_view, at);
if !tree_route.retracted().is_empty() {
view.pool.clear_recently_pruned();
}
(view, view_dropped_stream, view_aggragated_stream)
} else {
debug!(
target: LOG_TARGET,
?at,
"creating non-cloned view"
);
View::new(
self.api.clone(),
at.clone(),
self.options.clone(),
self.metrics.clone(),
self.is_validator.clone(),
)
};

let start = Instant::now();
// 1. Capture all import notification from the very beginning, so first register all
Expand All @@ -1114,15 +1116,13 @@ where
view.pool.validated_pool().import_notification_stream().boxed(),
);

self.view_store.dropped_stream_controller.add_view(
view.at.hash,
view.pool.validated_pool().create_dropped_by_limits_stream().boxed(),
);
self.view_store
.dropped_stream_controller
.add_view(view.at.hash, view_dropped_stream.boxed());

self.view_store.listener.add_view_aggregated_stream(
view.at.hash,
view.pool.validated_pool().create_aggregated_stream().boxed(),
);
self.view_store
.listener
.add_view_aggregated_stream(view.at.hash, view_aggregated_stream.boxed());
// sync the transactions statuses and referencing views in all the listeners with newly
// cloned view.
view.pool.validated_pool().retrigger_notifications();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@
//! [`MultiViewListener`]: crate::fork_aware_txpool::multi_view_listener::MultiViewListener
//! [`Pool`]: crate::graph::Pool
//! [`Watcher`]: crate::graph::watcher::Watcher
//! [`AggregatedStream`]: crate::graph::AggregatedStream
//! [`AggregatedStream`]: crate::fork_aware_txpool::view::AggregatedStream
//! [`Options`]: crate::graph::Options
//! [`vp::import_notification_stream`]: ../graph/validated_pool/struct.ValidatedPool.html#method.import_notification_stream
//! [`vp::enforce_limits`]: ../graph/validated_pool/struct.ValidatedPool.html#method.enforce_limits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

use crate::{
common::tracing_log_xt::log_xt_trace,
fork_aware_txpool::stream_map_util::next_event,
graph::{self, BlockHash, ExtrinsicHash, TransactionStatusEvent},
fork_aware_txpool::{stream_map_util::next_event, view::TransactionStatusEvent},
graph::{self, BlockHash, ExtrinsicHash},
LOG_TARGET,
};
use futures::{Future, FutureExt, Stream, StreamExt};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,9 @@ mod tests {
let api = Arc::new(TestApi::default());
let block0 = api.expect_hash_and_number(0);

let view = Arc::new(View::new(
api.clone(),
block0,
Default::default(),
Default::default(),
false.into(),
));
let view = Arc::new(
View::new(api.clone(), block0, Default::default(), Default::default(), false.into()).0,
);
let queue = Arc::new(RevalidationQueue::new());

let uxt = uxt(Transfer {
Expand Down
Loading
Loading