Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
[Fetch Data] Linearized buffer memory managment
Browse files Browse the repository at this point in the history
This commit removes freeLinearizedBuf() to avoid chunks that have
references to deleted buffers. They will now be marked as deleted when
unpinned. All buffers should now be removed via BufferManagers::free.

Signed-off-by: Dmitrii Makarenko <[email protected]>
  • Loading branch information
Devjiu committed Sep 28, 2023
1 parent dca67ff commit 4a0de96
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 55 deletions.
20 changes: 20 additions & 0 deletions omniscidb/DataMgr/BufferMgr/Buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,24 @@ int8_t* Buffer::getMemoryPtr() {
void Buffer::setMemoryPtr(int8_t* new_ptr) {
mem_ = new_ptr;
}

int Buffer::unPin() {
std::unique_lock<std::mutex> pin_lock(pin_mutex_);
int res = (--pin_count_);
if (!res && delete_on_unpin_) {
pin_lock.unlock();
bm_->free(this);
}
return res;
}

void Buffer::deleteWhenUnpinned() {
std::unique_lock<std::mutex> pin_lock(pin_mutex_);
if (pin_count_) {
delete_on_unpin_ = true;
} else {
pin_lock.unlock();
bm_->free(this);
}
}
} // namespace Buffer_Namespace
20 changes: 3 additions & 17 deletions omniscidb/DataMgr/BufferMgr/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,28 +134,14 @@ class Buffer : public AbstractBuffer {
return (++pin_count_);
}

inline int unPin() override {
std::lock_guard<std::mutex> pin_lock(pin_mutex_);
int res = (--pin_count_);
if (!res && delete_on_unpin_) {
delete this;
}
return res;
}
int unPin() override;

inline int getPinCount() override {
std::lock_guard<std::mutex> pin_lock(pin_mutex_);
return (pin_count_);
}

inline void deleteWhenUnpinned() override {
std::unique_lock<std::mutex> pin_lock(pin_mutex_);
if (pin_count_) {
delete_on_unpin_ = true;
} else {
pin_lock.unlock();
delete this;
}
}
void deleteWhenUnpinned() override;

// Added for testing.
int32_t getSlabNum() const { return seg_it_->slab_num; }
Expand Down
16 changes: 16 additions & 0 deletions omniscidb/DataMgr/BufferMgr/BufferMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ void BufferMgr::clear() {
// for removal to have them deleted when unpinned.
for (auto& buf : chunk_index_) {
if (buf.second->buffer) {
// WARN !!!!!!!!!!!!!!!!!!
// deleteWhenUnpinned(...) will call free(...) that will call deleteBuffer(...) in
// case when segment iterator is valid. That method will try to acquire
// chunk_index_mutex_ that already acquired here. To avoid deadlock and remove all
// the stuff, we are cleaning segments later we are removing segment iterator.
buf.second->buffer->seg_it_ = BufferList::iterator();
buf.second->buffer->deleteWhenUnpinned();
buf.second->buffer = nullptr;
}
Expand Down Expand Up @@ -623,6 +629,7 @@ void BufferMgr::deleteBuffer(const ChunkKey& key, const bool) {
chunk_index_lock.unlock();
std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
if (seg_it->buffer) {
CHECK_EQ(seg_it->buffer->getPinCount(), 0);
delete seg_it->buffer; // Delete Buffer for segment
seg_it->buffer = 0;
}
Expand Down Expand Up @@ -826,11 +833,20 @@ AbstractBuffer* BufferMgr::alloc(const size_t num_bytes) {
return createBuffer(chunk_key, page_size_, num_bytes);
}

// all buffer deletions should be done via free(...)
void BufferMgr::free(AbstractBuffer* buffer) {
Buffer* casted_buffer = dynamic_cast<Buffer*>(buffer);
if (casted_buffer == 0) {
LOG(FATAL) << "Wrong buffer type - expects base class pointer to Buffer type.";
}
// ZeroCopy buffers don't have correct iterators by design.
// To delete it we are detecting them and delete explicitly without removing segment as
// it done in deleteBuffer(...)
if (casted_buffer->seg_it_ == BufferList::iterator()) {
delete buffer;
return;
}

deleteBuffer(casted_buffer->seg_it_->chunk_key);
}

Expand Down
36 changes: 5 additions & 31 deletions omniscidb/QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,13 @@ const int8_t* ColumnFetcher::linearizeColumnFragments(
}
}
CHECK(res.first); // check merged data buffer
// This buffers are associated with Chunk, that created by hands, not with
// Chunk::getChunk(...) method So it should be removed to do it we mark both buffers to
// delete on unpin. Pin means that none of chunks are uses this buffer.
res.first->deleteWhenUnpinned();
if (!type->isFixedLenArray()) {
CHECK(res.second); // check merged index buffer
res.second->deleteWhenUnpinned();
}
auto merged_data_buffer = res.first;
auto merged_index_buffer = res.second;
Expand Down Expand Up @@ -1009,34 +1014,3 @@ ChunkIter ColumnFetcher::prepareChunkIter(AbstractBuffer* merged_data_buf,
merged_chunk_iter.elem_type_size = chunk_iter.elem_type_size;
return merged_chunk_iter;
}

void ColumnFetcher::freeLinearizedBuf() {
std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
auto buffer_provider = executor_->getBufferProvider();

if (!linearized_data_buf_cache_.empty()) {
for (auto& kv : linearized_data_buf_cache_) {
for (auto& kv2 : kv.second) {
buffer_provider->free(kv2.second);
}
}
}

if (!linearized_idx_buf_cache_.empty()) {
for (auto& kv : linearized_idx_buf_cache_) {
for (auto& kv2 : kv.second) {
buffer_provider->free(kv2.second);
}
}
}
}

void ColumnFetcher::freeTemporaryCpuLinearizedIdxBuf() {
std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
auto buffer_provider = executor_->getBufferProvider();
if (!linearlized_temporary_cpu_index_buf_cache_.empty()) {
for (auto& kv : linearlized_temporary_cpu_index_buf_cache_) {
buffer_provider->free(kv.second);
}
}
}
3 changes: 0 additions & 3 deletions omniscidb/QueryEngine/ColumnFetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ class ColumnFetcher {
DeviceAllocator* device_allocator,
const size_t thread_idx) const;

void freeTemporaryCpuLinearizedIdxBuf();
void freeLinearizedBuf();

DataProvider* getDataProvider() const { return data_provider_; }

private:
Expand Down
4 changes: 0 additions & 4 deletions omniscidb/QueryEngine/Execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2186,10 +2186,6 @@ hdk::ResultSetTable Executor::executeWorkUnitImpl(
do {
SharedKernelContext shared_context(query_infos);
ColumnFetcher column_fetcher(this, data_provider, column_cache);
ScopeGuard scope_guard = [&column_fetcher] {
column_fetcher.freeLinearizedBuf();
column_fetcher.freeTemporaryCpuLinearizedIdxBuf();
};

if (ra_exe_unit.isShuffle()) {
allocateShuffleBuffers(query_infos, ra_exe_unit, row_set_mem_owner, shared_context);
Expand Down
79 changes: 79 additions & 0 deletions omniscidb/Tests/JoinHashTableTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,85 @@ TEST(Other, Regression) {
dropTable("table_b");
}

TEST(Other, FixedLenArr) {
createTable("table_a",
{{"si", ctx().int16()}, {"FixedLen", ctx().arrayFixed(2, ctx().int64())}},
{2});

std::ostringstream oss;
oss << "{\"si\": 1, \"FixedLen\": [" << 7 << ", " << 233 << "]}" << std::endl;
oss << "{\"si\": 1, \"FixedLen\": [" << 5 << ", " << 47 << "]}" << std::endl;
oss << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl;
oss << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl;
oss << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl;
oss << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl;
oss << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl;
insertJsonValues("table_a", oss.str());

createTable("table_b",
{{"si", ctx().int8()}, {"FixedLen", ctx().arrayFixed(2, ctx().int32())}},
{2});

std::ostringstream oss_2;
oss_2 << "{\"si\": 1, \"FixedLen\": [" << 2 << ", " << 33 << "]}" << std::endl;
oss_2 << "{\"si\": 2, \"FixedLen\": [" << 1 << ", " << 17 << "]}" << std::endl;
oss_2 << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl;
oss_2 << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl;
oss_2 << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl;
oss_2 << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl;
oss_2 << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl;
insertJsonValues("table_b", oss_2.str());

EXPECT_NO_THROW(run_multiple_agg(R"(
SELECT * FROM table_a, table_b WHERE table_a.si = table_b.si;
)",
ExecutorDeviceType::CPU));

dropTable("table_a");
dropTable("table_b");
}

TEST(Other, FixedLenArr2) {
config().rs.enable_lazy_fetch = false;

createTable("table_a",
{{"si", ctx().int16()}, {"FixedLen", ctx().arrayFixed(2, ctx().int16())}},
{1});

std::ostringstream oss;
oss << "{\"si\": 1, \"FixedLen\": [" << 7 << ", " << 233 << "]}" << std::endl;
oss << "{\"si\": 1, \"FixedLen\": [" << 5 << ", " << 47 << "]}" << std::endl;
oss << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl;
oss << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl;
oss << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl;
oss << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl;
oss << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl;
insertJsonValues("table_a", oss.str());

createTable("table_b",
{{"si", ctx().int8()}, {"FixedLen", ctx().arrayFixed(2, ctx().int32())}},
{2});

std::ostringstream oss_2;
oss_2 << "{\"si\": 1, \"FixedLen\": [" << 2 << ", " << 33 << "]}" << std::endl;
oss_2 << "{\"si\": 2, \"FixedLen\": [" << 1 << ", " << 17 << "]}" << std::endl;
oss_2 << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl;
oss_2 << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl;
oss_2 << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl;
oss_2 << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl;
oss_2 << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl;
insertJsonValues("table_b", oss_2.str());

EXPECT_NO_THROW(run_multiple_agg(R"(
SELECT * FROM table_a INNER JOIN table_b ON table_a.FixedLen[1] =
table_b.FixedLen[1];
)",
ExecutorDeviceType::CPU));

dropTable("table_a");
dropTable("table_b");
}

int main(int argc, char** argv) {
TestHelpers::init_logger_stderr_only(argc, argv);
testing::InitGoogleTest(&argc, argv);
Expand Down

0 comments on commit 4a0de96

Please sign in to comment.