Skip to content

Commit

Permalink
chore: Used unbounded channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Weijun-H committed Feb 10, 2025
1 parent 4263b0f commit 846afdd
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
14 changes: 8 additions & 6 deletions datafusion/physical-plan/src/repartition/distributor_channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,12 @@ pub fn channels<T>(
pub fn tokio_channels<T>(
n: usize,
) -> (
Vec<tokio::sync::mpsc::Sender<T>>,
Vec<tokio::sync::mpsc::Receiver<T>>,
Vec<tokio::sync::mpsc::UnboundedSender<T>>,
Vec<tokio::sync::mpsc::UnboundedReceiver<T>>,
) {
// only used for OnDemandRepartitionExec, so no need for unbounded capacity
let (senders, receivers) = (0..n).map(|_| tokio::sync::mpsc::channel(2)).unzip();
let (senders, receivers) = (0..n)
.map(|_| tokio::sync::mpsc::unbounded_channel())
.unzip();
(senders, receivers)
}

Expand All @@ -103,8 +104,9 @@ pub fn partition_aware_channels<T>(
(0..n_in).map(|_| channels(n_out)).unzip()
}

type OnDemandPartitionAwareSenders<T> = Vec<Vec<tokio::sync::mpsc::Sender<T>>>;
type OnDemandPartitionAwareReceivers<T> = Vec<Vec<tokio::sync::mpsc::Receiver<T>>>;
type OnDemandPartitionAwareSenders<T> = Vec<Vec<tokio::sync::mpsc::UnboundedSender<T>>>;
type OnDemandPartitionAwareReceivers<T> =
Vec<Vec<tokio::sync::mpsc::UnboundedReceiver<T>>>;

pub fn on_demand_partition_aware_channels<T>(
n_in: usize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ type PartitionChannels = (Vec<Sender<usize>>, Vec<Receiver<usize>>);
/// └─────────────────┘ Distribute data to the output partitions
///
/// ```
type OnDemandDistributionSender = tokio::sync::mpsc::Sender<MaybeBatch>;
type OnDemandDistributionReceiver = tokio::sync::mpsc::Receiver<MaybeBatch>;
type OnDemandDistributionSender = tokio::sync::mpsc::UnboundedSender<MaybeBatch>;
type OnDemandDistributionReceiver = tokio::sync::mpsc::UnboundedReceiver<MaybeBatch>;

type OnDemandInputPartitionsToCurrentPartitionSender = Vec<OnDemandDistributionSender>;
type OnDemandInputPartitionsToCurrentPartitionReceiver =
Expand Down Expand Up @@ -631,10 +631,8 @@ impl OnDemandRepartitionExec {
metrics: OnDemandRepartitionMetrics,
context: Arc<TaskContext>,
) -> Result<()> {
let num_output_partition = partitioning.partition_count();
// initialize buffer channel so that we can pre-fetch from input
let (buffer_tx, mut buffer_rx) =
tokio::sync::mpsc::channel(num_output_partition * 2);
let (buffer_tx, mut buffer_rx) = tokio::sync::mpsc::channel(2);
// execute the child operator in a separate task
// that pushes batches into buffer channel with limited capacity
let processing_task = SpawnedTask::spawn(Self::process_input(
Expand Down Expand Up @@ -683,7 +681,7 @@ impl OnDemandRepartitionExec {
if let Some((tx, reservation)) = output_channels.get_mut(&partition) {
reservation.lock().try_grow(size)?;

if tx.send(Some(Ok(batch))).await.is_err() {
if tx.send(Some(Ok(batch))).is_err() {
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
reservation.lock().shrink(size);
output_channels.remove(&partition);
Expand Down Expand Up @@ -737,7 +735,7 @@ impl OnDemandRepartitionExec {
"Join Error".to_string(),
Box::new(DataFusionError::External(Box::new(Arc::clone(&e)))),
));
tx.send(Some(err)).await.ok();
tx.send(Some(err)).ok();
}
}
// Error from running input task
Expand All @@ -748,14 +746,14 @@ impl OnDemandRepartitionExec {
for (_, tx) in txs {
// wrap it because need to send error to all output partitions
let err = Err(DataFusionError::from(&e));
tx.send(Some(err)).await.ok();
tx.send(Some(err)).ok();
}
}
// Input task completed successfully
Ok(Ok(())) => {
// notify each output partition that this input partition has no more data
for (_, tx) in txs {
tx.send(None).await.ok();
tx.send(None).ok();
}
}
}
Expand Down

0 comments on commit 846afdd

Please sign in to comment.