Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

[Join] Inline and parallelize tbb in getAllTableColumnFragments. #616

Merged
merged 1 commit into from
Oct 6, 2023

Conversation

Devjiu
Copy link
Contributor

@Devjiu Devjiu commented Aug 3, 2023

This commit refactors and simplifies method getAllTableColumnFragments.
Also some parallelization added.

Partially resolves: #574

Signed-off-by: Dmitrii Makarenko [email protected]

@Devjiu Devjiu force-pushed the dmitriim/remove_copies branch from 6cfba25 to 40fd548 Compare August 7, 2023 14:22
@Devjiu Devjiu changed the title Dmitriim/remove copies [Join] Remove redundant copies. Aug 7, 2023
@Devjiu Devjiu force-pushed the dmitriim/remove_copies branch from ab0e572 to eb8042f Compare August 8, 2023 14:38
@Devjiu Devjiu force-pushed the dmitriim/remove_copies branch 3 times, most recently from 1e7b776 to 32b48b4 Compare August 9, 2023 16:12
@Devjiu
Copy link
Contributor Author

Devjiu commented Aug 17, 2023

Will be rebased over #623 --- Done

@Devjiu Devjiu force-pushed the dmitriim/remove_copies branch 3 times, most recently from 13a559f to 93f9e17 Compare August 24, 2023 17:03
@Devjiu
Copy link
Contributor Author

Devjiu commented Aug 24, 2023

I was wrong here, it's inference of 2 issues.

Note: verify continue/break issue, for some reason continue issues following fails:

[ RUN      ] Select.Subqueries
/localdisk/dmitriim/hdk/omniscidb/Tests/ArrowSQLRunner/SQLiteComparator.cpp:66: Failure
Expected equality of these values:
  connector.getNumRows()
    Which is: 2
  omnisci_results->rowCount()
    Which is: 1
CPU: SELECT CASE WHEN test.x IN (SELECT x FROM test_inner) THEN x ELSE NULL END AS c, COUNT(*) AS n FROM test WHERE y > 40 GROUP BY c ORDER BY n DESC;
/localdisk/dmitriim/hdk/omniscidb/Tests/ArrowSQLRunner/SQLiteComparator.cpp:100: Failure
Expected equality of these values:
  ref_val
    Which is: 10
  omnisci_val
    Which is: 0
CPU: SELECT COUNT(*) FROM test WHERE str IN (SELECT DISTINCT str FROM test_inner) AND x IN (SELECT DISTINCT x FROM test_inner);
/localdisk/dmitriim/hdk/omniscidb/Tests/ArrowSQLRunner/SQLiteComparator.cpp:100: Failure
Expected equality of these values:
  ref_val
    Which is: 10
  omnisci_val
    Which is: 0
CPU: SELECT COUNT(*) FROM test WHERE str IN (SELECT DISTINCT str FROM test_inner) AND x IN (SELECT x FROM test_inner);
/localdisk/dmitriim/hdk/omniscidb/Tests/ArrowSQLRunner/SQLiteComparator.cpp:100: Failure
Expected equality of these values:
  ref_val
    Which is: 10
  omnisci_val
    Which is: 0
CPU: SELECT COUNT(*) FROM test WHERE str IN (SELECT str FROM test_inner) AND x IN (SELECT x FROM test_inner);
[  FAILED  ] Select.Subqueries (7655 ms)
[ RUN      ] Select.Joins_Arrays
[       OK ] Select.Joins_Arrays (465 ms)
[ RUN      ] Select.Joins_Fixed_Size_Array_Multi_Frag
[       OK ] Select.Joins_Fixed_Size_Array_Multi_Frag (690 ms)
[ RUN      ] Select.Joins_EmptyTable
[       OK ] Select.Joins_EmptyTable (180 ms)
[ RUN      ] Select.Joins_Fragmented_SelfJoin_And_LoopJoin
[       OK ] Select.Joins_Fragmented_SelfJoin_And_LoopJoin (409 ms)
[ RUN      ] Select.Joins_ImplicitJoins
[       OK ] Select.Joins_ImplicitJoins (1647 ms)
[ RUN      ] Select.Joins_DifferentIntegerTypes
[       OK ] Select.Joins_DifferentIntegerTypes (48 ms)
[ RUN      ] Select.Joins_FilterPushDown
[       OK ] Select.Joins_FilterPushDown (1818 ms)
[ RUN      ] Select.Joins_InnerJoin_TwoTables
[       OK ] Select.Joins_InnerJoin_TwoTables (1643 ms)
[ RUN      ] Select.Joins_InnerJoin_AtLeastThreeTables
/localdisk/dmitriim/hdk/omniscidb/Tests/ArrowSQLRunner/SQLiteComparator.cpp:66: Failure
Expected equality of these values:
  connector.getNumRows()
    Which is: 2
  omnisci_results->rowCount()
    Which is: 1
CPU: SELECT a.x, b.x FROM test_inner a JOIN test_inner b ON a.x = b.x ORDER BY a.x;
[  FAILED  ] Select.Joins_InnerJoin_AtLeastThreeTables (1589 ms)

@Devjiu Devjiu force-pushed the dmitriim/remove_copies branch from 93f9e17 to 235508d Compare August 24, 2023 22:33
@Devjiu
Copy link
Contributor Author

Devjiu commented Aug 25, 2023

Should be separated into 2 PRs, also new CPU implementation: fill_hash_join_buff_bucketized_cpu should be moved into new sources - HashJoinRuntimeCpu.cpp/h.

Some overall details in #574 (comment)

@Devjiu
Copy link
Contributor Author

Devjiu commented Aug 25, 2023

Benchmark status (with non-lazy):

 ./_launcher/solution.R --solution=pyhdk --task=join --nrow=1e8
[1] "./pyhdk/join-pyhdk.py"
# join-pyhdk.py
pyhdk data_name:  J1_1e8_NA_0_0
loading datasets J1_1e8_NA_0_0, J1_1e8_1e2_0_0, J1_1e8_1e5_0_0, J1_1e8_1e8_0_0
Using fragment size 32000000
100000000
100
100000
100000000
joining...
(89997128, 9)
(89997128, 9)
(89995511, 11)
(89995511, 11)
(100000000, 11)
(100000000, 11)
(89995511, 11)
(89995511, 11)
(90000000, 13)
(90000000, 13)
joining finished, took 73s
   on_disk               question run time_sec
1    FALSE     small inner on int   1    2.227
2    FALSE     small inner on int   2    1.914
3    FALSE    medium inner on int   1    2.315
4    FALSE    medium inner on int   2    2.167
5    FALSE    medium outer on int   1    2.257
6    FALSE    medium outer on int   2    2.475
7    FALSE medium inner on factor   1    2.227
8    FALSE medium inner on factor   2    2.240
9    FALSE       big inner on int   1    4.320
10   FALSE       big inner on int   2    3.970

 ./_launcher/solution.R --solution=pyhdk --task=join --nrow=1e8
[1] "./pyhdk/join-pyhdk.py"
# join-pyhdk.py
pyhdk data_name:  J1_1e8_NA_0_0
loading datasets J1_1e8_NA_0_0, J1_1e8_1e2_0_0, J1_1e8_1e5_0_0, J1_1e8_1e8_0_0
Using fragment size 4000000
100000000
100
100000
100000000
joining...
(89997128, 9)
(89997128, 9)
(89995511, 11)
(89995511, 11)
(100000000, 11)
(100000000, 11)
(89995511, 11)
(89995511, 11)
(90000000, 13)
(90000000, 13)
joining finished, took 59s
   on_disk               question run time_sec
1    FALSE     small inner on int   1    1.077
2    FALSE     small inner on int   2    0.765
3    FALSE    medium inner on int   1    1.063
4    FALSE    medium inner on int   2    1.017
5    FALSE    medium outer on int   1    0.704
6    FALSE    medium outer on int   2    0.726
7    FALSE medium inner on factor   1    1.056
8    FALSE medium inner on factor   2    1.004
9    FALSE       big inner on int   1    1.506
10   FALSE       big inner on int   2    1.293

@Devjiu Devjiu force-pushed the dmitriim/remove_copies branch 2 times, most recently from b81d607 to a6dbaa4 Compare September 13, 2023 12:50
@Devjiu
Copy link
Contributor Author

Devjiu commented Sep 13, 2023

Rebased over #663 . Let's merge #663 first.

@Devjiu Devjiu marked this pull request as ready for review September 13, 2023 13:07
@Devjiu Devjiu requested a review from ienkovich September 13, 2023 13:07
@Devjiu
Copy link
Contributor Author

Devjiu commented Sep 13, 2023

export FRAGMENT_SIZE=4000000
/localdisk/dmitriim/benchmarks/db-benchmark ⑂master* $ ./_launcher/solution.R --solution=pyhdk --task=join --nrow=1e7
[1] "./pyhdk/join-pyhdk.py"
# join-pyhdk.py
pyhdk data_name:  J1_1e7_NA_0_0
loading datasets J1_1e7_NA_0_0, J1_1e7_1e1_0_0, J1_1e7_1e4_0_0, J1_1e7_1e7_0_0
Using fragment size 4000000
10000000
10
10000
10000000
joining...
(8998860, 9)
(8998860, 9)
(8998412, 11)
(8998412, 11)
(10000000, 11)
(10000000, 11)
(8998412, 11)
(8998412, 11)
(9000000, 13)
(9000000, 13)
joining finished, took 9s
   on_disk               question run time_sec
1    FALSE     small inner on int   1    0.346
2    FALSE     small inner on int   2    0.304
3    FALSE    medium inner on int   1    0.409
4    FALSE    medium inner on int   2    0.373
5    FALSE    medium outer on int   1    0.274
6    FALSE    medium outer on int   2    0.284
7    FALSE medium inner on factor   1    0.354
8    FALSE medium inner on factor   2    0.320
9    FALSE       big inner on int   1    0.774
10   FALSE       big inner on int   2    0.482

./_launcher/solution.R --solution=pyhdk --task=join --nrow=1e8
[1] "./pyhdk/join-pyhdk.py"
# join-pyhdk.py
pyhdk data_name:  J1_1e8_NA_0_0
loading datasets J1_1e8_NA_0_0, J1_1e8_1e2_0_0, J1_1e8_1e5_0_0, J1_1e8_1e8_0_0
Using fragment size 4000000
100000000
100
100000
100000000
joining...
(89997128, 9)
(89997128, 9)
(89995511, 11)
(89995511, 11)
(100000000, 11)
(100000000, 11)
(89995511, 11)
(89995511, 11)
(90000000, 13)
(90000000, 13)
joining finished, took 64s
   on_disk               question run time_sec
1    FALSE     small inner on int   1    1.009
2    FALSE     small inner on int   2    0.903
3    FALSE    medium inner on int   1    0.970
4    FALSE    medium inner on int   2    0.971
5    FALSE    medium outer on int   1    0.754
6    FALSE    medium outer on int   2    0.840
7    FALSE medium inner on factor   1    1.086
8    FALSE medium inner on factor   2    0.999
9    FALSE       big inner on int   1    1.501
10   FALSE       big inner on int   2    1.338

./_launcher/solution.R --solution=pyhdk --task=join --nrow=1e9
[1] "./pyhdk/join-pyhdk.py"
# join-pyhdk.py
pyhdk data_name:  J1_1e9_NA_0_0
loading datasets J1_1e9_NA_0_0, J1_1e9_1e3_0_0, J1_1e9_1e6_0_0, J1_1e9_1e9_0_0
Using fragment size 4000000
1000000000
1000
1000000
1000000000
joining...
(899999033, 9)
[thread 984394 also had an error][thread 983253 also had an error][thread 984245 also had an error][thread 986622 also had an error]



[thread 988259 also had an error][thread 985551 also had an error]

[thread 982395 also had an error]
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f7723a7c2d6, pid=858290, tid=984071
#
# JRE version: OpenJDK Runtime Environment (20.0) (build 20-internal-adhoc..src)
# Java VM: OpenJDK 64-Bit Server VM (20-internal-adhoc..src, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, g1 gc, linux-amd64)
# Problematic frame:
# C  [libResultSet.so+0xbb2d6]  ResultSet::getRowAt[abi:cxx11](unsigned long, bool, bool, bool, std::vector<bool, std::allocator<bool> > const&) const+0x1d6
#
# Core dump will be written. Default location: Core dumps may be processed with "/usr/share/apport/apport -p%p -s%s -c%c -d%d -P%P -u%u -g%g -- %E" (or dumping to /localdisk/dmitriim/benchmarks/db-benchmark/core.858290)
#
# An error report file with more information is saved as:
# /localdisk/dmitriim/benchmarks/db-benchmark/hs_err_pid858290.log
#
# If you would like to submit a bug report, please visit:
#   https://bugreport.java.com/bugreport/crash.jsp
#
Aborted (core dumped)

@Devjiu
Copy link
Contributor Author

Devjiu commented Sep 14, 2023

Looks like #616 (comment) large joins are failing in all backends.
image

https://duckdb.org/2023/04/14/h2oai.html

@ienkovich
Copy link
Contributor

Looks like #616 (comment) large joins are failing in all backends.

It looks like it's simply due to the fact that the machine doesn't have enough memory for such a big join. It has just 160GB of memory and IIUC each of the joined tables is 50GB.

@Devjiu Devjiu force-pushed the dmitriim/remove_copies branch from a6dbaa4 to 3e26fa2 Compare September 15, 2023 16:06
@Devjiu Devjiu mentioned this pull request Sep 15, 2023
Copy link
Contributor

@ienkovich ienkovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain what redundant copies you actually remove? For me, it looks like some inlining + parallelization using TBB.

Varlen part of the new code looks completely dysfunctional (size to copy in write_ptrs would be some negative int casted to size_t and should cause SEGFAULT on mempcy). Is it actually ever triggered in our tests? I feel like we don't actually support the whole column fetch for varlen data.

@@ -2899,6 +2899,9 @@ def if_then_else(self, cond, true_val, false_val):
"""
return self._builder.if_then_else(cond, true_val, false_val)

def clear_cache(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid unrelated change.

@@ -73,6 +73,7 @@ struct JoinColumnIterator {
DEVICE FORCE_INLINE JoinColumnIterator& operator++() {
index += step;
index_inside_chunk += step;
// this loop is made to find index_of_chunk by total index of element
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid unrelated changes.

const auto fragments_it = all_tables_fragments.find({db_id, table_id});
CHECK(fragments_it != all_tables_fragments.end());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this vertical space added for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for visual indention, will remove.

auto merged_results =
ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
const auto& fragment = (*fragments)[frag_id];
const auto& rows_in_frag = fragment.getNumTuples();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A reference here looks inappropriate.

total_row_count += rows_in_frag;
}

const auto& type_width = col_info->type->size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why reference?

valid_fragments.push_back(frag_id);
}

if (write_ptrs.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make an empty table check right after the row count computation?

@Devjiu
Copy link
Contributor Author

Devjiu commented Sep 19, 2023

Could you please explain what redundant copies you actually remove? For me, it looks like some inlining + parallelization using TBB.

Varlen part of the new code looks completely dysfunctional (size to copy in write_ptrs would be some negative int casted to size_t and should cause SEGFAULT on mempcy). Is it actually ever triggered in our tests? I feel like we don't actually support the whole column fetch for varlen data.

Okay, maybe current changes is simple parallelization case. Originally we had:

  1. copy in ColumnarResults
  2. in fetchBuffer ( from getBuffer)
  3. in merge

Currently ColumnarResults c-tor already fixed, fetchBuffer should be zero copy, merge inline and parallelized.

@Devjiu Devjiu force-pushed the dmitriim/remove_copies branch from 3e26fa2 to e342fae Compare September 27, 2023 14:08
@Devjiu Devjiu changed the title [Join] Remove redundant copies. [Join] Inline and parallelize tbb in getAllTableColumnFragments. Sep 27, 2023
@Devjiu Devjiu force-pushed the dmitriim/remove_copies branch from ba5af91 to a501ccd Compare September 28, 2023 11:14
Copy link
Contributor

@ienkovich ienkovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This version is good overall! I suggest a few clean-ups though.

@@ -239,6 +240,11 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
int db_id = col_info->db_id;
int table_id = col_info->table_id;
int col_id = col_info->column_id;
if (col_info->type->isString() || col_info->type->isArray()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a CHECK instead. It is an internal error, not some input error useful for the user.


size_t total_row_count = 0;
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
if (executor_->getConfig().exec.interrupt.enable_non_kernel_time_query_interrupt &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interruption check in this loop doesn't make much sense because we don't do any actual data fetch here. I suggest moving it out of the loop or removing it completely.

raw_write_ptrs.emplace_back(write_ptrs[i].first);
}

std::unique_ptr<ColumnarResults> merged_results(new ColumnarResults(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The vector ColumnarResults::ColumnarResults gets is not a pointer per fragment, it is a pointer per column (ColumnarResults can store multiple columns, but not multiple fragments). So actually you are supposed to pass a vector with a single pointer here. Your version works because the first vector element has a correct pointer and other elements are simply not used. But this code is still confusing, so let's fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. Do you mean that we are using only data from first fragment? If so why we are fetching them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, now I get it.

@Devjiu Devjiu force-pushed the dmitriim/remove_copies branch from a501ccd to 05b98f3 Compare October 2, 2023 14:25
This commit refactors and simplifies method `getAllTableColumnFragments`.
Also some parallelization added.

Partially resolves: #574

Signed-off-by: Dmitrii Makarenko <[email protected]>
@Devjiu Devjiu force-pushed the dmitriim/remove_copies branch from 05b98f3 to 8c4a252 Compare October 6, 2023 15:01
@Devjiu Devjiu merged commit f363014 into main Oct 6, 2023
@Devjiu Devjiu deleted the dmitriim/remove_copies branch October 6, 2023 20:15
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Perf][Bench] Join is slow on big tables.
2 participants