Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: Fix memory reservation and allocation problems for SortExec #14644

Merged

Conversation

Kontinuation
Copy link
Member

@Kontinuation Kontinuation commented Feb 13, 2025

Which issue does this PR close?

Rationale for this change

I had a hard time making DataFusion Comet work on cloud instances with 4GB memory per CPU core, partially because DataFusion is very likely to allocates more memory than reserved and run into OOM, or run into various kinds of memory reservation failures. In cases when the partition to process is larger than available memory, we expect spilling to happen to run the query to completion, but got tons of failures instead. We found operators involving SortExec such as sort-merge join triggers the aforementioned problems frequently.

#10073 reports that SortExec may allocate 2X memory than it reserves (see "the second problem" in the issue), and we found that it contributed to most of the OOM cases we encountered when using Comet. We have also found several other problems related to SortExec that are critical for our memory-limited use cases, and this PR tries to accommodate some of them.

What changes are included in this PR?

This PR contains several fixes:

  1. Don't try_collect the result of merging all at once. We consume the merged batches one after another and reserve memory for each batch. Once the reservation fails we switch to "spill mode" and write all future batches into the spill file. This resolves the 2X memory allocation problem ("the second problem") reported by Memory account not adding up in SortExec #10073, as well as this comment: External sorting not working for (maybe only for string columns??) #12136 (comment)
  2. shrink_to_fit every sorted batches reduce the memory footprint of sorted batches, otherwise sorted string arrays may take 2X the original space in the worst case, due to exponential growth of MutableBuffer for storing variable length binary values. shrink_to_fit is a no-op for primitive-type columns returned by take_arrays since they already have the right capacity, and benchmarking showed no significant performance regression for non-primitive types such as string arrays, so I think it is a good change. This resolves "the first problem" reported by Memory account not adding up in SortExec #10073.
  3. Reserves more memory for ingested batches to leave some room for merging. This PR reserves 2X memory for each batch, this works for most of the queries in sort-tpch benchmark (all except Q5 and Q8). User still have to configure sort_spill_reservation_bytes when memory reserved is not big enough for merging. I don't think it is a good change but this is the only solution I can think of to compensate for the extra memory usage for the row representation of sorted columns.

The problems with SortExec are not easy to solve without introducing significant changes to the overall design, 3) of this PR is mostly a bandaid solutions. I believe that the implementation needs to be revamped to make all the memory reservation/spilling behave correctly.

Are these changes tested?

Yes. It passes all the tests.

Are there any user-facing changes?

Uses may find that sort operator is more likely to spill when running with memory constraints. The old configurations they had to make sort operator work may not be optimal after applying this PR. For instance, user may configure a super large sort_spill_reservation_bytes to make merging work, but this PR reduces the optimal value of sort_spill_reservation_bytes for the same workload.

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate labels Feb 13, 2025
Fix batch memory consumption growth after sorting;
Reserve memory more aggressively to compensate for memory needed for merging.
@Kontinuation Kontinuation force-pushed the pr-fix-sort-mem-reservation-and-usage branch from 2f7f403 to 8cc9aea Compare February 13, 2025 13:18
Comment on lines 191 to +201
let value = self.sort.expr.evaluate(batch)?;
let array = value.into_array(batch.num_rows())?;
let size_in_mem = array.get_buffer_memory_size();
let array = array.as_any().downcast_ref::<T>().expect("field values");
Ok(ArrayValues::new(self.sort.options, array))
let mut array_reservation = self.reservation.new_empty();
array_reservation.try_grow(size_in_mem)?;
Ok(ArrayValues::new(
self.sort.options,
array,
array_reservation,
))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this reservation is needed.

When the sort expression is a simple column reference, the array simply reuses the buffer in batch, this is the case where reservation is not needed. However, the sort expression can be a complex expression such as l_linenumber + 1, the result of evaluation takes additional space in this case. Always reserving array is a more conservative approach that prevents allocations from overshooting the limit.

Comment on lines -372 to +359
.with_reservation(self.reservation.new_empty())
.with_reservation(self.merge_reservation.new_empty())
Copy link
Member Author

@Kontinuation Kontinuation Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should use merge_reservation here, because the allocations happening in the stream built here are for merging.

@Kontinuation Kontinuation marked this pull request as ready for review February 13, 2025 13:27
Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Kontinuation this looks incredibly helpful
cc @tustvold @Dandandan @crepererum

@comphead
Copy link
Contributor

@andygrove cc as this ticket directly related on Comet working on cloud instances

// write sorted batches to disk when the memory is insufficient.
let mut spill_writer: Option<IPCWriter> = None;
// Leave at least 1/3 of spill reservation for sort/merge the next batch. Here the
// 1/3 is simply an arbitrary chosen number.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking should we have this number configurable as datafusion parameter? feeling depending on data this number can fluctuate? It might be different for short and plain data vs deeply nested wide rows?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that SortPreservingMergeStream will never reserve memory after producing the first merged batch when merging in-memory batches, so there's no need to reserve additional space for merging in this place. I have removed the additional memory reservation in a new commit.

@2010YOUY01
Copy link
Contributor

I had a hard time making DataFusion Comet work on cloud instances with 4GB memory per CPU core, partially because DataFusion is very likely to allocates more memory than reserved and run into OOM, or run into various kinds of memory reservation failures.

Yes, this feature is quite bug-prone, perhaps we should mark it as experimental to prevent someone to use it in production. Thank you so much for the efforts.

Here are my thoughts on the changes

  1. Don't try_collect the result of merging all at once. We consume the merged batches one after another and reserve memory for each batch. Once the reservation fails we switch to "spill mode" and write all future batches into the spill file. This resolves the 2X memory allocation problem ("the second problem") reported by Memory account not adding up in SortExec #10073, as well as this comment: External sorting not working for (maybe only for string columns??) #12136 (comment)
  1. Reserves more memory for ingested batches to leave some room for merging. This PR reserves 2X memory for each batch, this works for most of the queries in sort-tpch benchmark (all except Q5 and Q8). User still have to configure sort_spill_reservation_bytes when memory reserved is not big enough for merging. I don't think it is a good change but this is the only solution I can think of to compensate for the extra memory usage for the row representation of sorted columns.

The 2X memory problem is: specifying a query to run under 100M memory, the measured physical memory is 200M. (though the reality is even worse than 2X 🤦🏼 , see #14142)
This is caused by, when the first time OOM happens:

  • there are already 1X batches in memory
  • Then it will be sorted and merged at once, meaning original batches should hold until all sorted runs are generated (now memory footprint is already 2X)
  • In the merging phase, additional col->row conversion consumes some extra memory (2X+)

I can get the high-level idea of the more conservative memory accounting in point 3, it is used to also account for merged batches, but I get lost in the memory estimation details in the implementation (especially is this 2X estimation amplification only for merged batches, or also intermediate Rows), could you explain with a concrete example how everything is calculated? (for example, there is a ExternalSorter with 100M memory budget, and it will consume 10 batches, each with 20M size, how memory estimation is calculated in each step)

  1. shrink_to_fit every sorted batches reduce the memory footprint of sorted batches, otherwise sorted string arrays may take 2X the original space in the worst case, due to exponential growth of MutableBuffer for storing variable length binary values. shrink_to_fit is a no-op for primitive-type columns returned by take_arrays since they already have the right capacity, and benchmarking showed no significant performance regression for non-primitive types such as string arrays, so I think it is a good change. This resolves "the first problem" reported by Memory account not adding up in SortExec #10073.

I think the buffer resizing mechanism is not doubling each time, the default policy will allocate new constant size buffers https://docs.rs/arrow-array/54.1.0/src/arrow_array/builder/generic_bytes_view_builder.rs.html#120-122, so this change might not help

@Kontinuation
Copy link
Member Author

  1. shrink_to_fit every sorted batches reduce the memory footprint of sorted batches, otherwise sorted string arrays may take 2X the original space in the worst case, due to exponential growth of MutableBuffer for storing variable length binary values. shrink_to_fit is a no-op for primitive-type columns returned by take_arrays since they already have the right capacity, and benchmarking showed no significant performance regression for non-primitive types such as string arrays, so I think it is a good change. This resolves "the first problem" reported by Memory account not adding up in SortExec #10073.

I think the buffer resizing mechanism is not doubling each time, the default policy will allocate new constant size buffers https://docs.rs/arrow-array/54.1.0/src/arrow_array/builder/generic_bytes_view_builder.rs.html#120-122, so this change might not help

Actually it helps. I have added a new test case test_sort_spill_utf8_strings. It will fail after removing the shrink_to_fit calls.

Here is where the 2X buffer growth come from:

  1. sort_batch calls take_arrays, which calls take_bytes for string columns
  2. take_bytes allocates a MutableBuffer for storing strings taken from the input array
  3. take_bytes calls the extend_from_slice method of the values mutable buffer to append strings to the buffer, which in turn calls reserve to grow its space
  4. reserve grows the size exponentially by a factor of 2

@Kontinuation
Copy link
Member Author

I can get the high-level idea of the more conservative memory accounting in point 3, it is used to also account for merged batches, but I get lost in the memory estimation details in the implementation (especially is this 2X estimation amplification only for merged batches, or also intermediate Rows), could you explain with a concrete example how everything is calculated? (for example, there is a ExternalSorter with 100M memory budget, and it will consume 10 batches, each with 20M size, how memory estimation is calculated in each step)

The 2X amplification is mainly for intermediate Rows, not for merged batches.

Let's assume that each batch is 10 MB, and we have 100 MB memory budget. The following diagram shows how the memory consumption become 100MB when performing merging.

datafusion-sort-merge drawio

Here is the detailed explanation:

  1. We reserve 2X memory for each batch on insertion, so when in_mem_batches holds 5 batches and consumes 50 MB memory, we have already reserved 100 MB memory. The next insertion will trigger a merge, and possibly a spill.
  2. When merge happens, each batch in in_mem_batches was sorted individually using sort_batch and fed into StreamingMergeBuilder to build a SortPreservingMergeStream. The batches were taken away from in_mem_batches, and the original batches will be dropped immediately after retrieving a sorted batch. We assume that the sorted batches has the same size as the original batches.
  3. SortPreservingMergeStream polls one batch from each sorted stream, create a row representation or sorted array representation for each batch. The sorted batches were saved into in_progress and the row/sorted array representation were saved into cursors. We assume that the row representation or sorted array has the same size as the sorted batch. Now we have consumed 100MB.
  4. SortPreservingMergeStream produces merged batches. We can assume that the overall memory consumption remains unchanged during this process, and certainly we need to reserve memory for merged batches. Each time we poll a merged batche from SortPreservingMergeStream, we try reserving memory for it. If the reservation fails, all future merged batches polled from the merged stream will be directly written to the spill file.

@Kontinuation
Copy link
Member Author

BTW, the 2X amplification for intermediate Rows seems to be conservative, but it is still not enough for some of the queries. The following query creates Rows larger than the original batches, and the query will fail when reserving memory for Rows:

-- sort-tpch Q5: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + no payload column

SELECT l_linenumber, l_suppkey, l_orderkey
FROM lineitem
ORDER BY l_linenumber, l_suppkey, l_orderkey

This can be worked around by configuring a larger sort_spill_reservation_bytes.

@Kontinuation
Copy link
Member Author

I had another interesting observation: spilling sort can be faster than memory unbounded sort in datafusion.

I tried running sort-tpch Q3 using this PR with #14642 cherry-picked onto it, and configured parquet.schema_force_view_types = false to mitigate #12136 (comment). Here are the test results obtained on a cloud instance with Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz CPU:

$./target/release/dfbench sort-tpch --iterations 1 --path benchmarks/data/tpch_sf10 --memory-limit 1000M -q 3 -n1
Q3 iteration 0 took 93339.0 ms and returned 59986052 rows
Q3 avg time: 93339.00 ms
$./target/release/dfbench sort-tpch --iterations 1 --path benchmarks/data/tpch_sf10 --memory-limit 500M -q 3 -n1
Q3 iteration 0 took 81831.2 ms and returned 59986052 rows
Q3 avg time: 81831.18 ms
$./target/release/dfbench sort-tpch --iterations 1 --path benchmarks/data/tpch_sf10 --memory-limit 200M -q 3 -n1
Q3 iteration 0 took 77046.4 ms and returned 59986052 rows
Q3 avg time: 77046.36 ms
$./target/release/dfbench sort-tpch --iterations 1 --path benchmarks/data/tpch_sf10 -q 3 -n1
Q3 iteration 0 took 170416.1 ms and returned 59986052 rows
Q3 avg time: 170416.10 ms

When running without memory limit, we are merging tons of small sorted streams, this seems to be bad for performance. Memory limit enforces us to do merging before ingesting all the batches, so we are doing several smaller merges first and do a final merge at last to produce the result set. Coalescing batches into larger streams before merging seems to be a good idea.

@kazuyukitanimura
Copy link
Contributor

Not directly related to the point of this PR but regarding
I had a hard time making DataFusion Comet work on cloud instances with 4GB memory per CPU core, partially because DataFusion is very likely to allocates more memory than reserved and run into OOM, or run into various kinds of memory reservation failures

I hope this apache/datafusion-comet#1369 helps. This may cause more spilling but I expect the chance of OOM is reduced. Still accurate tracking of memory is important, otherwise mis-tracking still causes OOM

@zhuqi-lucas
Copy link
Contributor

zhuqi-lucas commented Feb 15, 2025

use arrow::array::{RecordBatch, StringBuilder};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use futures::TryStreamExt;
use std::sync::Arc;

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
pub async fn main() {
    build_parquet();

    let env = RuntimeEnvBuilder::new()
        .with_disk_manager(DiskManagerConfig::default())
        .with_memory_pool(Arc::new(FairSpillPool::new(100 * 1024 * 1024)))
        .build_arc()
        .unwrap();

    let mut config = SessionConfig::new().with_sort_spill_reservation_bytes(32 * 1024 * 1024);
    config.options_mut().execution.parquet.schema_force_view_types = false;

    let ctx = SessionContext::new_with_config_rt(config, env);

    ctx.register_parquet(
        "big_strings",
        "/tmp/big_strings.parquet",
        ParquetReadOptions::default(),
    )
        .await
        .unwrap();

    let sql = "SELECT * FROM big_strings ORDER BY strings";
    println!("Sorting strings");
    ctx.sql(sql)
        .await
        .unwrap()
        .execute_stream()
        .await
        .unwrap()
        .try_for_each(|_| std::future::ready(Ok(())))
        .await
        .unwrap();
}

fn build_parquet() {
    if std::fs::File::open("/tmp/big_strings.parquet").is_ok() {
        println!("Using existing file at /tmp/big_strings.parquet");
        return;
    }
    println!("Generating test file at /tmp/big_strings.parquet");
    let file = std::fs::File::create("/tmp/big_strings.parquet").unwrap();
    let schema = Arc::new(Schema::new(vec![Field::new(
        "strings",
        DataType::Utf8,
        false,
    )]));
    let mut writer = ArrowWriter::try_new(file, schema.clone(), None).unwrap();

    for batch_idx in 0..100 {
        println!("Generating batch {} of 100", batch_idx);
        let mut string_array_builder =
            StringBuilder::with_capacity(1024 * 1024, 1024 * 1024 * 3 * 14);
        for i in 0..(1024 * 1024) {
            string_array_builder
                .append_value(format!("string-{}string-{}string-{}", i, i, i));
        }
        let array = Arc::new(string_array_builder.finish());
        let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
        writer.write(&batch).unwrap();
    }
    writer.close().unwrap();
}
called `Result::unwrap()` on an `Err` value: ResourcesExhausted("Failed to allocate additional 353536 bytes for ExternalSorterMerge[1] with 22948928 bytes already allocated for this reservation - 127190 bytes remain available for the total pool")

Thank you @kazuyukitanimura for the PR, i applied the PR try to fix the testing, but the above testing is still failed for me, i am not sure if i am missing something.

@Kontinuation
Copy link
Member Author

Thank you @kazuyukitanimura for the PR, i applied the PR try to fix the testing, but the above testing is still failed for me, i am not sure if i am missing something.

There are 2 problems:

  1. The DataSourceExec may have many partitions, and each SortExec on that partition will only get a fair share of the 100MB pool, so each partition won't get enough memory to operate. This is still the case even with worker_threads = 1. If you still want to sort 4.2GB parquet file using 100MBs of memory, you can set .with_target_partitions(1) in your session config.
  2. 100MB is not enough for the final merging with spill-reads. There will be roughly 200 spill files generated after ingesting all the batches, the size of a typical batch for this workload is 352380 bytes. The memory needed for merging will be 200 * (352380 bytes) * 2 > 100MB. Merging phase is unspillable so it requires a minimum amount of memory to operate. Raising the memory limit to 200MB will work for this particular workload.

One possible fix for problem 2 is to use a smaller batch size when writing batches to spill files, so that the unspillable memory required for the final spill-read merging will be smaller. Or we simply leave this problem as is and requires the user to raise the memory limit.

@zhuqi-lucas
Copy link
Contributor

Thank you @kazuyukitanimura for the PR, i applied the PR try to fix the testing, but the above testing is still failed for me, i am not sure if i am missing something.

There are 2 problems:

  1. The DataSourceExec may have many partitions, and each SortExec on that partition will only get a fair share of the 100MB pool, so each partition won't get enough memory to operate. This is still the case even with worker_threads = 1. If you still want to sort 4.2GB parquet file using 100MBs of memory, you can set .with_target_partitions(1) in your session config.
  2. 100MB is not enough for the final merging with spill-reads. There will be roughly 200 spill files generated after ingesting all the batches, the size of a typical batch for this workload is 352380 bytes. The memory needed for merging will be 200 * (352380 bytes) * 2 > 100MB. Merging phase is unspillable so it requires a minimum amount of memory to operate. Raising the memory limit to 200MB will work for this particular workload.

One possible fix for problem 2 is to use a smaller batch size when writing batches to spill files, so that the unspillable memory required for the final spill-read merging will be smaller. Or we simply leave this problem as is and requires the user to raise the memory limit.

Thank you @Kontinuation for good explain, it makes sense to me, i will try it.

And for the problem 2, is it possible we introduce a spillable merging phase, will it be more safe?

self.sort_or_spill_in_mem_batches().await?;
// We've already freed more than half of reserved memory,
// so we can grow the reservation again. There's nothing we can do
// if this try_grow fails.
Copy link
Contributor

@zhuqi-lucas zhuqi-lucas Feb 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reserves more memory for ingested batches to leave some room for merging. This PR reserves 2X memory for each batch, this works for most of the queries in sort-tpch benchmark (all except Q5 and Q8). User still have to configure sort_spill_reservation_bytes when memory reserved is not big enough for merging. I don't think it is a good change but this is the only solution I can think of to compensate for the extra memory usage for the row representation of sorted columns.

Is it possible to make the merging phase also spillable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can implement a more complicated multi-stage merging phase which merges a small portion of streams each time, and perform multiple rounds of merges to produce the final merged stream. However this approach is quite complex and out of the scope of this PR. I believe that there should be dedicated discussions around this problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we can add more dedicated discussions around this problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can implement a more complicated multi-stage merging phase which merges a small portion of streams each time, and perform multiple rounds of merges to produce the final merged stream. However this approach is quite complex and out of the scope of this PR. I believe that there should be dedicated discussions around this problem.

I agree we should implement such a feature as a dedicated, follow on PR / Project.

@zhuqi-lucas or @Kontinuation is there one of you can file a ticket to track the work. I think especially highlighting what cases the current code won't work well is important.

It sounds like there are several issues:

  1. ensuring we can always spill data (now spilling will sometimes fail if we run out of memory to sort the batches in)
  2. Ensuring that we can always merge the data that was spilled, even if it had a really wide fanout (like 1000 of spill files)

I think problem 1 could be solved by potentially spilling unsorted batches (and then sorting them separately). This woudl be less efficient (read/write some tuples twice but would work.

Problem 2 could use the multi-pass merge that @Kontinuation describes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @alamb for double check, i created a follow-up issue now:
#14692

We can mark and further discuss about the potential complete solution about spill data in sort.

@zhuqi-lucas
Copy link
Contributor

Thank you @kazuyukitanimura for the PR, i applied the PR try to fix the testing, but the above testing is still failed for me, i am not sure if i am missing something.

There are 2 problems:

  1. The DataSourceExec may have many partitions, and each SortExec on that partition will only get a fair share of the 100MB pool, so each partition won't get enough memory to operate. This is still the case even with worker_threads = 1. If you still want to sort 4.2GB parquet file using 100MBs of memory, you can set .with_target_partitions(1) in your session config.
  2. 100MB is not enough for the final merging with spill-reads. There will be roughly 200 spill files generated after ingesting all the batches, the size of a typical batch for this workload is 352380 bytes. The memory needed for merging will be 200 * (352380 bytes) * 2 > 100MB. Merging phase is unspillable so it requires a minimum amount of memory to operate. Raising the memory limit to 200MB will work for this particular workload.

One possible fix for problem 2 is to use a smaller batch size when writing batches to spill files, so that the unspillable memory required for the final spill-read merging will be smaller. Or we simply leave this problem as is and requires the user to raise the memory limit.

Updated, it works after change to 1 partition and increase the memory limit.

// data types due to exponential growth when building the sort columns. We shrink the columns
// to prevent memory reservation failures, as well as excessive memory allocation when running
// merges in `SortPreservingMergeStream`.
columns.iter_mut().for_each(|c| {
Copy link
Contributor

@zhuqi-lucas zhuqi-lucas Feb 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be expensive for all columns and each batch to do this, or can we filter those accurate columns which need to shrink?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems you already benchmarked it, we may can the benchmark code also.

Copy link
Member Author

@Kontinuation Kontinuation Feb 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shrink_to_fit is basically a no-op for primitive type columns produced by take_arrays because their internal buffer already has the right capacity, and it will only perform reallocations for columns with variable length data. so I don't there's a need for cherry-picking which columns to shrink.

I've also benchmarked sorting using utf8 columns and have not observed significant performance overhead:

merge sorted utf8 low cardinality
                        time:   [3.8824 ms 3.8903 ms 3.8987 ms]
                        change: [-2.8027% -2.1227% -1.5573%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
  3 (3.00%) high mild
  1 (1.00%) high severe

sort merge utf8 low cardinality
                        time:   [4.2295 ms 4.2360 ms 4.2430 ms]
                        change: [+0.5975% +0.8722% +1.1242%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 15 outliers among 100 measurements (15.00%)
  2 (2.00%) low mild
  5 (5.00%) high mild
  8 (8.00%) high severe

sort utf8 low cardinality
                        time:   [6.4265 ms 6.4369 ms 6.4483 ms]
                        change: [-0.3276% -0.0658% +0.1908%] (p = 0.62 > 0.05)
                        No change in performance detected.
Found 7 outliers among 100 measurements (7.00%)
  5 (5.00%) high mild
  2 (2.00%) high severe

sort partitioned utf8 low cardinality
                        time:   [343.87 µs 347.16 µs 351.07 µs]
                        change: [-1.1291% +0.3066% +1.8160%] (p = 0.68 > 0.05)
                        No change in performance detected.
Found 15 outliers among 100 measurements (15.00%)
  5 (5.00%) high mild
  10 (10.00%) high severe

merge sorted utf8 high cardinality
                        time:   [5.9968 ms 6.0083 ms 6.0207 ms]
                        change: [-1.9398% -1.6215% -1.2928%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  7 (7.00%) high mild
  1 (1.00%) high severe

sort merge utf8 high cardinality
                        time:   [6.4266 ms 6.4399 ms 6.4558 ms]
                        change: [-0.5594% -0.2292% +0.1020%] (p = 0.19 > 0.05)
                        No change in performance detected.
Found 6 outliers among 100 measurements (6.00%)
  2 (2.00%) high mild
  4 (4.00%) high severe

sort utf8 high cardinality
                        time:   [7.7403 ms 7.7541 ms 7.7693 ms]
                        change: [-2.7779% -2.1541% -1.6176%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
  2 (2.00%) high mild
  4 (4.00%) high severe

sort partitioned utf8 high cardinality
                        time:   [364.21 µs 370.21 µs 376.41 µs]
                        change: [+2.2461% +4.2833% +6.3333%] (p = 0.00 < 0.05)
                        Performance has regressed.

sort, sort_tpch and tpch10 benchmarks also showed not much difference in performance.

Comparing main and fix-sort-mem-usage-reserve-mem-for-sort-merging
--------------------
Benchmark sort.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃       main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Qsort utf8   │ 28514.66ms │                                      28451.16ms │ no change │
│ Qsort int    │ 29749.57ms │                                      29879.78ms │ no change │
│ Qsort        │ 28608.24ms │                                      29085.31ms │ no change │
│ decimal      │            │                                                 │           │
│ Qsort        │ 31013.98ms │                                      31126.24ms │ no change │
│ integer      │            │                                                 │           │
│ tuple        │            │                                                 │           │
│ Qsort utf8   │ 28925.23ms │                                      29281.38ms │ no change │
│ tuple        │            │                                                 │           │
│ Qsort mixed  │ 30579.63ms │                                      30550.25ms │ no change │
│ tuple        │            │                                                 │           │
└──────────────┴────────────┴─────────────────────────────────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary                                              ┃             ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (main)                                              │ 177391.31ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging)   │ 178374.12ms │
│ Average Time (main)                                            │  29565.22ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │  29729.02ms │
│ Queries Faster                                                 │           0 │
│ Queries Slower                                                 │           0 │
│ Queries with No Change                                         │           6 │
└────────────────────────────────────────────────────────────────┴─────────────┘
--------------------
Benchmark sort_tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃     main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Q1           │ 187.27ms │                                        187.75ms │ no change │
│ Q2           │ 154.92ms │                                        157.82ms │ no change │
│ Q3           │ 885.18ms │                                        893.74ms │ no change │
│ Q4           │ 184.50ms │                                        189.54ms │ no change │
│ Q5           │ 315.13ms │                                        322.19ms │ no change │
│ Q6           │ 335.00ms │                                        338.65ms │ no change │
│ Q7           │ 584.88ms │                                        594.44ms │ no change │
│ Q8           │ 452.66ms │                                        460.51ms │ no change │
│ Q9           │ 472.15ms │                                        475.38ms │ no change │
│ Q10          │ 681.58ms │                                        685.07ms │ no change │
└──────────────┴──────────┴─────────────────────────────────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                                              ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main)                                              │ 4253.28ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging)   │ 4305.10ms │
│ Average Time (main)                                            │  425.33ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │  430.51ms │
│ Queries Faster                                                 │         0 │
│ Queries Slower                                                 │         0 │
│ Queries with No Change                                         │        10 │
└────────────────────────────────────────────────────────────────┴───────────┘
--------------------
Benchmark sort_tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1           │  2617.74ms │                                       2652.65ms │     no change │
│ Q2           │  2019.64ms │                                       2034.08ms │     no change │
│ Q3           │ 10748.55ms │                                      11028.78ms │     no change │
│ Q4           │  2565.69ms │                                       2581.39ms │     no change │
│ Q5           │  3182.88ms │                                       3226.93ms │     no change │
│ Q6           │  3379.76ms │                                       3432.35ms │     no change │
│ Q7           │  7200.46ms │                                       7245.30ms │     no change │
│ Q8           │  4932.09ms │                                       5133.81ms │     no change │
│ Q9           │  5488.64ms │                                       5473.89ms │     no change │
│ Q10          │ 18188.22ms │                                      17129.05ms │ +1.06x faster │
└──────────────┴────────────┴─────────────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                              ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                              │ 60323.67ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging)   │ 59938.23ms │
│ Average Time (main)                                            │  6032.37ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │  5993.82ms │
│ Queries Faster                                                 │          1 │
│ Queries Slower                                                 │          0 │
│ Queries with No Change                                         │          9 │
└────────────────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │ 1003.40ms │                                        974.06ms │    no change │
│ QQuery 2     │  142.90ms │                                        142.05ms │    no change │
│ QQuery 3     │  437.35ms │                                        429.21ms │    no change │
│ QQuery 4     │  218.20ms │                                        219.31ms │    no change │
│ QQuery 5     │  638.99ms │                                        633.81ms │    no change │
│ QQuery 6     │  152.49ms │                                        151.94ms │    no change │
│ QQuery 7     │  937.33ms │                                        952.74ms │    no change │
│ QQuery 8     │  690.88ms │                                        675.75ms │    no change │
│ QQuery 9     │ 1055.28ms │                                       1039.38ms │    no change │
│ QQuery 10    │  621.41ms │                                        632.68ms │    no change │
│ QQuery 11    │   93.62ms │                                        100.54ms │ 1.07x slower │
│ QQuery 12    │  321.36ms │                                        329.27ms │    no change │
│ QQuery 13    │  442.88ms │                                        434.09ms │    no change │
│ QQuery 14    │  252.07ms │                                        252.79ms │    no change │
│ QQuery 15    │  419.63ms │                                        414.17ms │    no change │
│ QQuery 16    │  106.30ms │                                        107.51ms │    no change │
│ QQuery 17    │ 1088.73ms │                                       1083.62ms │    no change │
│ QQuery 18    │ 1795.68ms │                                       1785.46ms │    no change │
│ QQuery 19    │  462.31ms │                                        458.10ms │    no change │
│ QQuery 20    │  403.54ms │                                        428.10ms │ 1.06x slower │
│ QQuery 21    │ 1453.76ms │                                       1454.77ms │    no change │
│ QQuery 22    │  158.43ms │                                        151.23ms │    no change │
└──────────────┴───────────┴─────────────────────────────────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                              ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                              │ 12896.55ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging)   │ 12850.56ms │
│ Average Time (main)                                            │   586.21ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │   584.12ms │
│ Queries Faster                                                 │          0 │
│ Queries Slower                                                 │          2 │
│ Queries with No Change                                         │         20 │
└────────────────────────────────────────────────────────────────┴────────────┘

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense.

// We'll gradually collect the sorted stream into self.in_mem_batches, or directly
// write sorted batches to disk when the memory is insufficient.
let mut spill_writer: Option<IPCWriter> = None;
while let Some(batch) = sorted_stream.next().await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point to avoid OOM for try_collect().

@2010YOUY01
Copy link
Contributor

Here is the detailed explanation:

  1. We reserve 2X memory for each batch on insertion, so when in_mem_batches holds 5 batches and consumes 50 MB memory, we have already reserved 100 MB memory. The next insertion will trigger a merge, and possibly a spill.
  2. When merge happens, each batch in in_mem_batches was sorted individually using sort_batch and fed into StreamingMergeBuilder to build a SortPreservingMergeStream. The batches were taken away from in_mem_batches, and the original batches will be dropped immediately after retrieving a sorted batch. We assume that the sorted batches has the same size as the original batches.
  3. SortPreservingMergeStream polls one batch from each sorted stream, create a row representation or sorted array representation for each batch. The sorted batches were saved into in_progress and the row/sorted array representation were saved into cursors. We assume that the row representation or sorted array has the same size as the sorted batch. Now we have consumed 100MB.
  4. SortPreservingMergeStream produces merged batches. We can assume that the overall memory consumption remains unchanged during this process, and certainly we need to reserve memory for merged batches. Each time we poll a merged batche from SortPreservingMergeStream, we try reserving memory for it. If the reservation fails, all future merged batches polled from the merged stream will be directly written to the spill file.

Thanks for the nice diagram, this explanation is super clear

Regarding point 3, I thought of a edge case can cause Rows way larger than 1X:
During merging, now only order key will be converted to row format.

  • If select c1,c2 from t order by c1, c2, all columns will be convereted to Row, the estimation is close to 1X
  • If select c1, ... c10 from t order by c1, only 1 column will be converted, the extra Row overhead is 0.1X. (I think this is okay to estimate more to be conservative)

Edge case: let's say input is a deduplicated StringViewArray (like a 10k rows batch with only 100 distinct values, but payload content are stored without duplication, the array elements are just referencing to the payload range), after converting to Row format, every row will be materialized, then the Row format will have 100X expansion
I think we need some mechanism to deal with this kind of edge case, perhaps this also applies to dictionary representation

For point 4, are the memory budget to hold merged batches come from sort_spill_reservation_bytes? Small sorted runs, and converted rows should have taken up all memory spaces at this stage.

@Kontinuation
Copy link
Member Author

Kontinuation commented Feb 15, 2025

Edge case: let's say input is a deduplicated StringViewArray (like a 10k rows batch with only 100 distinct values, but payload content are stored without duplication, the array elements are just referencing to the payload range), after converting to Row format, every row will be materialized, then the Row format will have 100X expansion I think we need some mechanism to deal with this kind of edge case, perhaps this also applies to dictionary representation

I agree that the current implementation uses a very rough estimation, and it could be way off from the actual memory consumption.

A better approach is to sort and generate the row representation of the batch right after we ingesting it, then we would know the exact size of sorted batches and their row representations held in memory. The merge phase for handling spilling could simply take away these data and perform merging without reserving more memory. However, this conflicts between some of the optimizations we did in the past:

For point 4, are the memory budget to hold merged batches come from sort_spill_reservation_bytes? Small sorted runs, and converted rows should have taken up all memory spaces at this stage.

Yes. It may come from sort_spill_reservation_bytes, or the reduced memory usage after per-batch sorting because of the fetch option.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, thank you so much @Kontinuation and @zhuqi-lucas - I think this PR is very nicely coded, and commented and is quite clear to read.

I think the only other thing left in my mind is to add a few more fuzz tests, but otherwise this is nice

@2010YOUY01 made a nice set of tests too to verify that the memory accounting was accurate, but they are currently disabled. Maybe we can also run them here too 🤔

cc @kazuyukitanimura
cc @westonpace as you filed #10073

self.sort_or_spill_in_mem_batches().await?;
// We've already freed more than half of reserved memory,
// so we can grow the reservation again. There's nothing we can do
// if this try_grow fails.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can implement a more complicated multi-stage merging phase which merges a small portion of streams each time, and perform multiple rounds of merges to produce the final merged stream. However this approach is quite complex and out of the scope of this PR. I believe that there should be dedicated discussions around this problem.

I agree we should implement such a feature as a dedicated, follow on PR / Project.

@zhuqi-lucas or @Kontinuation is there one of you can file a ticket to track the work. I think especially highlighting what cases the current code won't work well is important.

It sounds like there are several issues:

  1. ensuring we can always spill data (now spilling will sometimes fail if we run out of memory to sort the batches in)
  2. Ensuring that we can always merge the data that was spilled, even if it had a really wide fanout (like 1000 of spill files)

I think problem 1 could be solved by potentially spilling unsorted batches (and then sorting them separately). This woudl be less efficient (read/write some tuples twice but would work.

Problem 2 could use the multi-pass merge that @Kontinuation describes

Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a good high-level design to solve the 2X memory issue in spilling sort queries. I also checked the implementation, overall it looks good to me, I left some nit-picking suggestions.
Now I think the TODOs before merging are adding physical memory consumption validation, and file issues for known tasks.

Validate physical consumption

In theory, I think this PR can solve the 2X memory problem for memory-limited sort queries, but still can't solve 2X memory issue if running without memory limit and spilling enabled.
Examples:

  • This PR can't solve
    select * from t consumes 2GB memory to fully materialize the output, in the best case select * from t order by c1 consumes around 2GB memory, given sorting is an O(1) space operation. The current situation is it consumes 4GB+ (more than 2 times).
  • This PR can solve
    If you specify the memory limit to 1GB, the actual physical memory consumption would be more than 2GB. The ideal actual consumption is around 1GB.

Problem

I tried one query and this PR is not working as expected, I specified one query to run under 5GB memory (select * without order requires 7GB) but it's still consuming around 10GB, could you double check? I suspect we missed some details.
I made a mistake, there is no problem: the result to check is measured RSS < memory limit + result size

Reproducer

Under datafusion-cli, run

/usr/bin/time -l cargo run --release -- --mem-pool-type fair -m 5G -f '/Users/yongting/Code/scripts/external_sort.sql'

SQL script (have to modify tpch_sf10 path), I did not include string columns because I think there are some known issues.

CREATE EXTERNAL TABLE IF NOT EXISTS lineitem (
        l_orderkey BIGINT,
        l_partkey BIGINT,
        l_suppkey BIGINT,
        l_linenumber INTEGER,
        l_quantity DECIMAL(15, 2),
        l_extendedprice DECIMAL(15, 2),
        l_discount DECIMAL(15, 2),
        l_tax DECIMAL(15, 2),
        l_returnflag VARCHAR,
        l_linestatus VARCHAR,
        l_shipdate DATE,
        l_commitdate DATE,
        l_receiptdate DATE,
        l_shipinstruct VARCHAR,
        l_shipmode VARCHAR,
        l_comment VARCHAR
) STORED AS parquet
LOCATION '/Users/yongting/Code/datafusion/benchmarks/data/tpch_sf10/lineitem';

-- selected all non-varchar columns
SELECT l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_shipdate, l_commitdate, l_receiptdate FROM lineitem ORDER BY l_orderkey;

Result is around 10GB:

 10703093760  maximum resident set size

Once fixed, I think the following memory bounds can be set smaller (e.g. *3 -> *1.5), for regression test.

Known issues

I think this PR already made improvements on the current status, so those issues only need to be tracked.
Update: filed #14748 and apache/arrow-rs#7151

self.reservation.free();
self.spills.push(spill_file);
} else {
self.in_mem_batches.push(batch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding another field in_mem_batches_sorted for this purpose can slightly improve readability

// Release the memory reserved for merge back to the pool so
// there is some left when `in_memo_sort_stream` requests an
// there is some left when `in_mem_sort_stream` requests an
// allocation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// allocation.
// allocation. At the end of this function, memory will be reserved again for the next spill.

I found this pre-reserved memory confusing when reading this code for the first time, so I would like to make it more clear.

@@ -612,6 +659,20 @@ impl ExternalSorter {
}
}

/// Estimate how much memory is needed to sort a `RecordBatch`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to explicitly point out the extra memory is reserved for potential Row conversion of the original array, which is used to speed up comparison in sorting and merging.

@Kontinuation
Copy link
Member Author

Kontinuation commented Feb 16, 2025

This PR can't solve
select * from t consumes 2GB memory to fully materialize the output, in the best case select * from t order by c1 consumes around 2GB memory, given sorting is an O(1) space operation. The current situation is it consumes 4GB+ (more than 2 times).

I think this is by design. The physical operators should not reserve memory for the batches it produces. It is the parent operator's duty to reserve memory for the batches fetched from children operators if it needs to hold the batches in memory for a long time.

Problem
I tried one query and this PR is not working as expected, I specified one query to run under 5GB memory (select * without order requires 7GB) but it's still consuming around 10GB, could you double check? I suspect we missed some details.

It is a problem of datafusion-cli. If datafusion-cli decides to hold all the result batches in memory, it should create a memory consumer for itself and reserve memory for the result batches.

datafusion-cli calls collect to hold the entire result set in memory before displaying it, this is unnecessary when maxrows is not unlimited. I've tried the following code to replace the collect call, and the maximum resident set size has reduced to 4.8 GB:

// Bounded stream; collected results are printed after all input consumed.
let schema = physical_plan.schema();
let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
let mut results = vec![];
while let Some(batch) = stream.next().await {
    let batch = batch?;
    results.push(batch);
    if let MaxRows::Limited(max_rows) = print_options.maxrows {
        if results.len() >= max_rows {
            break;
        }
    }
}
adjusted.into_inner().print_batches(schema, &results, now)?;

@2010YOUY01
Copy link
Contributor

Sorry, I mistakenly edited your original reply @Kontinuation, I'm trying to revert it back.

@2010YOUY01
Copy link
Contributor

It is a problem of datafusion-cli. If datafusion-cli decides to hold all the result batches in memory, it should create a memory consumer for itself and reserve memory for the result batches.

Yes, you are right and the result is good. I made a mistake about the expected behavior: it should be measured RSS < memory limit + result size. I've tested several smaller memory limit and the result is correct.

@zhuqi-lucas
Copy link
Contributor

This PR can't solve
select * from t consumes 2GB memory to fully materialize the output, in the best case select * from t order by c1 consumes around 2GB memory, given sorting is an O(1) space operation. The current situation is it consumes 4GB+ (more than 2 times).

I think this is by design. The physical operators should not reserve memory for the batches it produces. It is the parent operator's duty to reserve memory for the batches fetched from children operators if it needs to hold the batches in memory for a long time.

Problem
I tried one query and this PR is not working as expected, I specified one query to run under 5GB memory (select * without order requires 7GB) but it's still consuming around 10GB, could you double check? I suspect we missed some details.

It is a problem of datafusion-cli. If datafusion-cli decides to hold all the result batches in memory, it should create a memory consumer for itself and reserve memory for the result batches.

datafusion-cli calls collect to hold the entire result set in memory before displaying it, this is unnecessary when maxrows is not unlimited. I've tried the following code to replace the collect call, and the maximum resident set size has reduced to 4.8 GB:

// Bounded stream; collected results are printed after all input consumed.
let schema = physical_plan.schema();
let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
let mut results = vec![];
while let Some(batch) = stream.next().await {
    let batch = batch?;
    results.push(batch);
    if let MaxRows::Limited(max_rows) = print_options.maxrows {
        if results.len() >= max_rows {
            break;
        }
    }
}
adjusted.into_inner().print_batches(schema, &results, now)?;

Good finding, may be we can file a issue to improve the datafusion-cli memory usage and reservation.

@Kontinuation Kontinuation force-pushed the pr-fix-sort-mem-reservation-and-usage branch from 2e87dbb to babe5cd Compare February 16, 2025 13:02
@alamb alamb mentioned this pull request Feb 16, 2025
@westonpace
Copy link
Member

westonpace commented Feb 18, 2025

Thanks for the ping and thanks for working on this! This is an important feature for us (for training secondary indices on string columns) so I'm very thankful to see the effort 😄

I tried the reproducer from #10073 on this branch (babe5cd) and wasn't able to get it to pass and so I agree it doesn't seem to address all issues. However, what you're describing does seem to address the problems that I was seeing and so I think it probably is making good progress.

I also tried setting session_config.options_mut().execution.parquet.schema_force_view_types = false; and various values of sort_spill_reservation_bytes but didn't have much luck.

@zhuqi-lucas
Copy link
Contributor

zhuqi-lucas commented Feb 18, 2025

Hi @westonpace , i think the problem is we need to setting the partition count and also to increase the memory limit also for your case:

1. setting the partition count to 1:
 .with_target_partitions(1)

2. increasing the memory limit for example to 300MB

I can share the code to pass for your case, also clean the build parquet file and retry:

use arrow::array::{RecordBatch, StringBuilder};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use futures::TryStreamExt;
use std::sync::Arc;

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
pub async fn main() {
    build_parquet();

    let env = RuntimeEnvBuilder::new()
        .with_disk_manager(DiskManagerConfig::default())
        .with_memory_pool(Arc::new(FairSpillPool::new(300 * 1024 * 1024)))
        .build_arc()
        .unwrap();

    let mut config = SessionConfig::new().with_sort_spill_reservation_bytes(32 * 1024 * 1024).with_target_partitions(1);
    config.options_mut().execution.parquet.schema_force_view_types = false;

    let ctx = SessionContext::new_with_config_rt(config, env);

    ctx.register_parquet(
        "big_strings",
        "/tmp/big_strings.parquet",
        ParquetReadOptions::default(),
    )
        .await
        .unwrap();

    let sql = "SELECT * FROM big_strings ORDER BY strings";
    println!("Sorting strings");
    ctx.sql(sql)
        .await
        .unwrap()
        .execute_stream()
        .await
        .unwrap()
        .try_for_each(|_| std::future::ready(Ok(())))
        .await
        .unwrap();
}

fn build_parquet() {
    if std::fs::File::open("/tmp/big_strings.parquet").is_ok() {
        println!("Using existing file at /tmp/big_strings.parquet");
        return;
    }
    println!("Generating test file at /tmp/big_strings.parquet");
    let file = std::fs::File::create("/tmp/big_strings.parquet").unwrap();
    let schema = Arc::new(Schema::new(vec![Field::new(
        "strings",
        DataType::Utf8,
        false,
    )]));
    let mut writer = ArrowWriter::try_new(file, schema.clone(), None).unwrap();

    for batch_idx in 0..100 {
        println!("Generating batch {} of 100", batch_idx);
        let mut string_array_builder =
            StringBuilder::with_capacity(1024 * 1024, 1024 * 1024 * 3 * 14);
        for i in 0..(1024 * 1024) {
            string_array_builder
                .append_value(format!("string-{}string-{}string-{}", i, i, i));
        }
        let array = Arc::new(string_array_builder.finish());
        let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
        writer.write(&batch).unwrap();
    }
    writer.close().unwrap();
}

Details reason here:

#14644 (comment)

  1. The DataSourceExec may have many partitions, and each SortExec on that partition will only get a fair share of the 100MB pool, so each partition won't get enough memory to operate. This is still the case even with worker_threads = 1. If you still want to sort 4.2GB parquet file using 100MBs of memory, you can set .with_target_partitions(1) in your session config.

  2. 100MB is not enough for the final merging with spill-reads. There will be roughly 200 spill files generated after ingesting all the batches, the size of a typical batch for this workload is 352380 bytes. The memory needed for merging will be 200 * (352380 bytes) * 2 > 100MB. Merging phase is unspillable so it requires a minimum amount of memory to operate. Raising the memory limit to 200MB will work for this particular workload.

And for future work we can have a complete spill solution tracking here:

#14692

Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for the nice work.

@alamb
Copy link
Contributor

alamb commented Feb 18, 2025

This PR can't solve
select * from t consumes 2GB memory to fully materialize the output, in the best case select * from t order by c1 consumes around 2GB memory, given sorting is an O(1) space operation. The current situation is it consumes 4GB+ (more than 2 times).

I think this is by design. The physical operators should not reserve memory for the batches it produces. It is the parent operator's duty to reserve memory for the batches fetched from children operators if it needs to hold the batches in memory for a long time.

This is a great observation and I think holds true in practice

However I don't think it is reflected in the documention here
https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/trait.MemoryPool.html

Would someone be willing to update those docs?

@alamb
Copy link
Contributor

alamb commented Feb 18, 2025

Problem
I tried one query and this PR is not working as expected, I specified one query to run under 5GB memory (select * without order requires 7GB) but it's still consuming around 10GB, could you double check? I suspect we missed some details.

It is a problem of datafusion-cli. If datafusion-cli decides to hold all the result batches in memory, it should create a memory consumer for itself and reserve memory for the result batches.

datafusion-cli calls collect to hold the entire result set in memory before displaying it, this is unnecessary when maxrows is not unlimited. I've tried the following code to replace the collect call, and the maximum resident set size has reduced to 4.8 GB:

This is also another great observation that it would be great to get captured in a ticket

@alamb
Copy link
Contributor

alamb commented Feb 18, 2025

Thanks everyone -- this is a great example of collaboration and what the collective efforts of many contributors can do. I love it.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ 🦾

@westonpace
Copy link
Member

100MB is not enough for the final merging with spill-reads. There will be roughly 200 spill files generated after ingesting all the batches, the size of a typical batch for this workload is 352380 bytes. The memory needed for merging will be 200 * (352380 bytes) * 2 > 100MB. Merging phase is unspillable so it requires a minimum amount of memory to operate. Raising the memory limit to 200MB will work for this particular workload.

Thank you for this. You are correct, with this addition then my test passes. #14692 sounds awesome but I think we can also go far with these current fixes in the meantime 🚀

@zhuqi-lucas
Copy link
Contributor

Problem
I tried one query and this PR is not working as expected, I specified one query to run under 5GB memory (select * without order requires 7GB) but it's still consuming around 10GB, could you double check? I suspect we missed some details.

It is a problem of datafusion-cli. If datafusion-cli decides to hold all the result batches in memory, it should create a memory consumer for itself and reserve memory for the result batches.
datafusion-cli calls collect to hold the entire result set in memory before displaying it, this is unnecessary when maxrows is not unlimited. I've tried the following code to replace the collect call, and the maximum resident set size has reduced to 4.8 GB:

This is also another great observation that it would be great to get captured in a ticket

Created a follow-up for this improvement:
#14751

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @Kontinuation @zhuqi-lucas @2010YOUY01 @alamb and everyone, awesome team effort like Andrew mentioned. I think this ticket tests and experience would be beneficial for #14510

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Memory account not adding up in SortExec
7 participants