Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
[Fetch Data] Linearized buffer memory managment
Browse files Browse the repository at this point in the history
This commit removes freeLinearizedBuf() to avoid chunks that have
references to deleted buffers. They will now be marked as deleted when
unpinned. All buffers should now be removed via BufferManagers::free.

Signed-off-by: Dmitrii Makarenko <[email protected]>
  • Loading branch information
Devjiu committed Oct 11, 2023
1 parent ba0b4a7 commit f28d59c
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 57 deletions.
33 changes: 33 additions & 0 deletions omniscidb/DataMgr/BufferMgr/Buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,37 @@ int8_t* Buffer::getMemoryPtr() {
void Buffer::setMemoryPtr(int8_t* new_ptr) {
mem_ = new_ptr;
}

void Buffer::deleteSelf() {
// ZeroCopy buffers don't have correct iterators by design.
// To delete it we are detecting them and delete explicitly without removing segment as
// it done in deleteBuffer(...)
if (seg_it_ == BufferList::iterator()) {
delete this;
return;
}

bm_->deleteBuffer(seg_it_->chunk_key);
}

int Buffer::unPin() {
std::unique_lock<std::mutex> pin_lock(pin_mutex_);
int res = (--pin_count_);
if (!res && delete_on_unpin_) {
pin_lock.unlock();
// deleteSelf
deleteSelf();
}
return res;
}

void Buffer::deleteWhenUnpinned() {
std::unique_lock<std::mutex> pin_lock(pin_mutex_);
if (pin_count_) {
delete_on_unpin_ = true;
} else {
pin_lock.unlock();
deleteSelf();
}
}
} // namespace Buffer_Namespace
22 changes: 5 additions & 17 deletions omniscidb/DataMgr/BufferMgr/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,28 +134,14 @@ class Buffer : public AbstractBuffer {
return (++pin_count_);
}

inline int unPin() override {
std::lock_guard<std::mutex> pin_lock(pin_mutex_);
int res = (--pin_count_);
if (!res && delete_on_unpin_) {
delete this;
}
return res;
}
int unPin() override;

inline int getPinCount() override {
std::lock_guard<std::mutex> pin_lock(pin_mutex_);
return (pin_count_);
}

inline void deleteWhenUnpinned() override {
std::unique_lock<std::mutex> pin_lock(pin_mutex_);
if (pin_count_) {
delete_on_unpin_ = true;
} else {
pin_lock.unlock();
delete this;
}
}
void deleteWhenUnpinned() override;

// Added for testing.
int32_t getSlabNum() const { return seg_it_->slab_num; }
Expand All @@ -177,6 +163,8 @@ class Buffer : public AbstractBuffer {
const MemoryLevel src_buffer_type = CPU_LEVEL,
const int src_device_id = -1) = 0;

void deleteSelf();

BufferMgr* bm_;
BufferList::iterator seg_it_;
size_t page_size_; /// the size of each page in the buffer
Expand Down
12 changes: 11 additions & 1 deletion omniscidb/DataMgr/BufferMgr/BufferMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ void BufferMgr::clear() {
// for removal to have them deleted when unpinned.
for (auto& buf : chunk_index_) {
if (buf.second->buffer) {
// WARN !!!!!!!!!!!!!!!!!!
// deleteWhenUnpinned(...) will call free(...) that will call deleteBuffer(...) in
// case when segment iterator is valid. That method will try to acquire
// chunk_index_mutex_ that already acquired here. To avoid deadlock and remove all
// the stuff, we are cleaning segments later we are removing segment iterator.
buf.second->buffer->seg_it_ = BufferList::iterator();
buf.second->buffer->deleteWhenUnpinned();
buf.second->buffer = nullptr;
}
Expand Down Expand Up @@ -623,6 +629,7 @@ void BufferMgr::deleteBuffer(const ChunkKey& key, const bool) {
chunk_index_lock.unlock();
std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
if (seg_it->buffer) {
CHECK_EQ(seg_it->buffer->getPinCount(), 0);
delete seg_it->buffer; // Delete Buffer for segment
seg_it->buffer = 0;
}
Expand Down Expand Up @@ -826,12 +833,15 @@ AbstractBuffer* BufferMgr::alloc(const size_t num_bytes) {
return createBuffer(chunk_key, page_size_, num_bytes);
}

// all buffer deletions should be done via free(...)
void BufferMgr::free(AbstractBuffer* buffer) {
Buffer* casted_buffer = dynamic_cast<Buffer*>(buffer);
if (casted_buffer == 0) {
LOG(FATAL) << "Wrong buffer type - expects base class pointer to Buffer type.";
}
deleteBuffer(casted_buffer->seg_it_->chunk_key);
CHECK_EQ(casted_buffer->getPinCount(), 1);
casted_buffer->deleteWhenUnpinned();
casted_buffer->unPin();
}

size_t BufferMgr::getNumChunks() {
Expand Down
9 changes: 8 additions & 1 deletion omniscidb/DataMgr/Chunk/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ class Chunk {
: buffer_(nullptr), index_buf_(nullptr), column_info_(col_info) {}

Chunk(AbstractBuffer* b, AbstractBuffer* ib, ColumnInfoPtr col_info)
: buffer_(b), index_buf_(ib), column_info_(col_info) {}
: buffer_(b), index_buf_(ib), column_info_(col_info) {
if (buffer_) {
buffer_->pin();
}
if (index_buf_) {
index_buf_->pin();
}
}

~Chunk() { unpinBuffer(); }

Expand Down
35 changes: 4 additions & 31 deletions omniscidb/QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,10 @@ const int8_t* ColumnFetcher::linearizeColumnFragments(
}
}
CHECK(res.first); // check merged data buffer
// This buffers are associated with Chunk, that created by hands, not with
// Chunk::getChunk(...) method So it should be removed to do it we mark both buffers to
// delete on unpin in ColumnFetcher dtor. Pin means that none of chunks are uses this
// buffer.
if (!type->isFixedLenArray()) {
CHECK(res.second); // check merged index buffer
}
Expand Down Expand Up @@ -1061,34 +1065,3 @@ ChunkIter ColumnFetcher::prepareChunkIter(AbstractBuffer* merged_data_buf,
merged_chunk_iter.elem_type_size = chunk_iter.elem_type_size;
return merged_chunk_iter;
}

void ColumnFetcher::freeLinearizedBuf() {
std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
auto buffer_provider = executor_->getBufferProvider();

if (!linearized_data_buf_cache_.empty()) {
for (auto& kv : linearized_data_buf_cache_) {
for (auto& kv2 : kv.second) {
buffer_provider->free(kv2.second);
}
}
}

if (!linearized_idx_buf_cache_.empty()) {
for (auto& kv : linearized_idx_buf_cache_) {
for (auto& kv2 : kv.second) {
buffer_provider->free(kv2.second);
}
}
}
}

void ColumnFetcher::freeTemporaryCpuLinearizedIdxBuf() {
std::lock_guard<std::mutex> linearized_col_cache_guard(linearized_col_cache_mutex_);
auto buffer_provider = executor_->getBufferProvider();
if (!linearlized_temporary_cpu_index_buf_cache_.empty()) {
for (auto& kv : linearlized_temporary_cpu_index_buf_cache_) {
buffer_provider->free(kv.second);
}
}
}
29 changes: 26 additions & 3 deletions omniscidb/QueryEngine/ColumnFetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,32 @@ class ColumnFetcher {
ColumnFetcher(Executor* executor,
DataProvider* data_provider,
const ColumnCacheMap& column_cache);
~ColumnFetcher() {
if (!linearized_data_buf_cache_.empty()) {
for (auto& kv : linearized_data_buf_cache_) {
for (auto& kv2 : kv.second) {
kv2.second->deleteWhenUnpinned();
kv2.second->unPin();
}
}
}

if (!linearized_idx_buf_cache_.empty()) {
for (auto& kv : linearized_idx_buf_cache_) {
for (auto& kv2 : kv.second) {
kv2.second->deleteWhenUnpinned();
kv2.second->unPin();
}
}
}

if (!linearlized_temporary_cpu_index_buf_cache_.empty()) {
for (auto& kv : linearlized_temporary_cpu_index_buf_cache_) {
kv.second->deleteWhenUnpinned();
kv.second->unPin();
}
}
};

//! Gets one chunk's pointer and element count on either CPU or GPU.
static std::pair<const int8_t*, size_t> getOneColumnFragment(
Expand Down Expand Up @@ -93,9 +119,6 @@ class ColumnFetcher {
DeviceAllocator* device_allocator,
const size_t thread_idx) const;

void freeTemporaryCpuLinearizedIdxBuf();
void freeLinearizedBuf();

DataProvider* getDataProvider() const { return data_provider_; }

private:
Expand Down
4 changes: 0 additions & 4 deletions omniscidb/QueryEngine/Execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2186,10 +2186,6 @@ hdk::ResultSetTable Executor::executeWorkUnitImpl(
do {
SharedKernelContext shared_context(query_infos);
ColumnFetcher column_fetcher(this, data_provider, column_cache);
ScopeGuard scope_guard = [&column_fetcher] {
column_fetcher.freeLinearizedBuf();
column_fetcher.freeTemporaryCpuLinearizedIdxBuf();
};

if (ra_exe_unit.isShuffle()) {
allocateShuffleBuffers(query_infos, ra_exe_unit, row_set_mem_owner, shared_context);
Expand Down
79 changes: 79 additions & 0 deletions omniscidb/Tests/JoinHashTableTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,85 @@ TEST(Other, Regression) {
dropTable("table_b");
}

TEST(Other, FixedLenArr) {
createTable("table_a",
{{"si", ctx().int16()}, {"FixedLen", ctx().arrayFixed(2, ctx().int64())}},
{2});

std::ostringstream oss;
oss << "{\"si\": 1, \"FixedLen\": [" << 7 << ", " << 233 << "]}" << std::endl;
oss << "{\"si\": 1, \"FixedLen\": [" << 5 << ", " << 47 << "]}" << std::endl;
oss << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl;
oss << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl;
oss << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl;
oss << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl;
oss << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl;
insertJsonValues("table_a", oss.str());

createTable("table_b",
{{"si", ctx().int8()}, {"FixedLen", ctx().arrayFixed(2, ctx().int32())}},
{2});

std::ostringstream oss_2;
oss_2 << "{\"si\": 1, \"FixedLen\": [" << 2 << ", " << 33 << "]}" << std::endl;
oss_2 << "{\"si\": 2, \"FixedLen\": [" << 1 << ", " << 17 << "]}" << std::endl;
oss_2 << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl;
oss_2 << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl;
oss_2 << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl;
oss_2 << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl;
oss_2 << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl;
insertJsonValues("table_b", oss_2.str());

EXPECT_NO_THROW(run_multiple_agg(R"(
SELECT * FROM table_a, table_b WHERE table_a.si = table_b.si;
)",
ExecutorDeviceType::CPU));

dropTable("table_a");
dropTable("table_b");
}

TEST(Other, FixedLenArr2) {
config().rs.enable_lazy_fetch = false;

createTable("table_a",
{{"si", ctx().int16()}, {"FixedLen", ctx().arrayFixed(2, ctx().int16())}},
{1});

std::ostringstream oss;
oss << "{\"si\": 1, \"FixedLen\": [" << 7 << ", " << 233 << "]}" << std::endl;
oss << "{\"si\": 1, \"FixedLen\": [" << 5 << ", " << 47 << "]}" << std::endl;
oss << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl;
oss << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl;
oss << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl;
oss << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl;
oss << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl;
insertJsonValues("table_a", oss.str());

createTable("table_b",
{{"si", ctx().int8()}, {"FixedLen", ctx().arrayFixed(2, ctx().int32())}},
{2});

std::ostringstream oss_2;
oss_2 << "{\"si\": 1, \"FixedLen\": [" << 2 << ", " << 33 << "]}" << std::endl;
oss_2 << "{\"si\": 2, \"FixedLen\": [" << 1 << ", " << 17 << "]}" << std::endl;
oss_2 << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl;
oss_2 << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl;
oss_2 << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl;
oss_2 << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl;
oss_2 << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl;
insertJsonValues("table_b", oss_2.str());

EXPECT_NO_THROW(run_multiple_agg(R"(
SELECT * FROM table_a INNER JOIN table_b ON table_a.FixedLen[1] =
table_b.FixedLen[1];
)",
ExecutorDeviceType::CPU));

dropTable("table_a");
dropTable("table_b");
}

int main(int argc, char** argv) {
TestHelpers::init_logger_stderr_only(argc, argv);
testing::InitGoogleTest(&argc, argv);
Expand Down

0 comments on commit f28d59c

Please sign in to comment.