Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Remove charge_mode from LRUCache #55911

Merged
merged 4 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/exec/query_cache/cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ static void delete_cache_entry(const CacheKey& key, void* value) {

void CacheManager::populate(const std::string& key, const CacheValue& value) {
auto* cache_value = new CacheValue(value);
auto* handle = _cache.insert(key, cache_value, cache_value->size(), &delete_cache_entry, CachePriority::NORMAL);
size_t value_size = cache_value->size();
auto* handle = _cache.insert(key, cache_value, value_size, value_size, &delete_cache_entry, CachePriority::NORMAL);
_cache.release(handle);
}

Expand Down
13 changes: 7 additions & 6 deletions be/src/exprs/jit/jit_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,13 @@ Status JitObjectCache::register_func(JITScalarFunction func) {
// put into LRU cache
auto* cache = new JitCacheEntry(_obj_code, _func);
GlobalEnv::GetInstance()->jit_cache_mem_tracker()->consume(cache_func_size);
auto* handle = _lru_cache->insert(_cache_key, (void*)cache, cache_func_size, [](const CacheKey& key, void* value) {
auto* entry = ((JitCacheEntry*)value);
// maybe release earlier as the std::shared_ptr<llvm::MemoryBuffer> is hold by caller
GlobalEnv::GetInstance()->jit_cache_mem_tracker()->release(entry->obj_buff->getBufferSize());
delete entry;
});
auto* handle = _lru_cache->insert(
_cache_key, (void*)cache, cache_func_size, cache_func_size, [](const CacheKey& key, void* value) {
auto* entry = ((JitCacheEntry*)value);
// maybe release earlier as the std::shared_ptr<llvm::MemoryBuffer> is hold by caller
GlobalEnv::GetInstance()->jit_cache_mem_tracker()->release(entry->obj_buff->getBufferSize());
delete entry;
});
if (handle == nullptr) {
delete cache;
LOG(WARNING) << "JIT register func failed, func = " << _cache_key << ", ir size = " << cache_func_size;
Expand Down
2 changes: 1 addition & 1 deletion be/src/fs/fd_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ FdCache::~FdCache() {

FdCache::Handle* FdCache::insert(std::string_view path, int fd) {
void* value = reinterpret_cast<void*>(static_cast<uintptr_t>(fd)); // NOLINT
Cache::Handle* h = _cache->insert(CacheKey(path.data(), path.size()), value, 1, fd_deleter);
Cache::Handle* h = _cache->insert(CacheKey(path.data(), path.size()), value, 1, 1, fd_deleter);
return reinterpret_cast<FdCache::Handle*>(h);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/service/staros_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ absl::StatusOr<std::shared_ptr<StarOSWorker::FileSystem>> StarOSWorker::new_shar

// Put the FileSysatem into LRU cache
auto value = new CacheValue(fs);
handle = _fs_cache->insert(key, value, 1, cache_value_deleter);
handle = _fs_cache->insert(key, value, 1, 1, cache_value_deleter);
if (handle == nullptr) {
delete value;
} else {
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/metacache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Metacache::Metacache(int64_t cache_capacity) : _cache(new_lru_cache(cache_capaci
Metacache::~Metacache() = default;

void Metacache::insert(std::string_view key, CacheValue* ptr, size_t size) {
Cache::Handle* handle = _cache->insert(CacheKey(key), ptr, size, cache_value_deleter);
Cache::Handle* handle = _cache->insert(CacheKey(key), ptr, size, size, cache_value_deleter);
_cache->release(handle);
}

Expand Down
14 changes: 5 additions & 9 deletions be/src/storage/page_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,12 @@ void StoragePageCache::prune() {
}

StoragePageCache::StoragePageCache(MemTracker* mem_tracker, size_t capacity)
: _mem_tracker(mem_tracker), _cache(new_lru_cache(capacity, ChargeMode::MEMSIZE)) {}
: _mem_tracker(mem_tracker), _cache(new_lru_cache(capacity)) {}

StoragePageCache::~StoragePageCache() = default;

void StoragePageCache::set_capacity(size_t capacity) {
#ifndef BE_TEST
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(_mem_tracker);
#endif
_cache->set_capacity(capacity);
}

Expand All @@ -111,9 +109,7 @@ uint64_t StoragePageCache::get_hit_count() {
}

bool StoragePageCache::adjust_capacity(int64_t delta, size_t min_capacity) {
#ifndef BE_TEST
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(_mem_tracker);
#endif
return _cache->adjust_capacity(delta, min_capacity);
}

Expand All @@ -127,13 +123,13 @@ bool StoragePageCache::lookup(const CacheKey& key, PageCacheHandle* handle) {
}

void StoragePageCache::insert(const CacheKey& key, const Slice& data, PageCacheHandle* handle, bool in_memory) {
// mem size should equals to data size when running UT
int64_t mem_size = data.size;
#ifndef BE_TEST
mem_size = malloc_usable_size(data.data);
int64_t mem_size = malloc_usable_size(data.data);
tls_thread_status.mem_release(mem_size);
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(_mem_tracker);
tls_thread_status.mem_consume(mem_size);
#else
int64_t mem_size = data.size;
#endif

auto deleter = [](const starrocks::CacheKey& key, void* value) { delete[](uint8_t*) value; };
Expand All @@ -144,7 +140,7 @@ void StoragePageCache::insert(const CacheKey& key, const Slice& data, PageCacheH
}
// Use mem size managed by memory allocator as this record charge size. At the same time, we should record this record size
// for data fetching when lookup.
auto* lru_handle = _cache->insert(key.encode(), data.data, mem_size, deleter, priority, data.size);
auto* lru_handle = _cache->insert(key.encode(), data.data, data.size, mem_size, deleter, priority);
*handle = PageCacheHandle(_cache.get(), lru_handle);
}

Expand Down
6 changes: 1 addition & 5 deletions be/src/storage/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,7 @@ class PageCacheHandle {
PageCacheHandle(Cache* cache, Cache::Handle* handle) : _cache(cache), _handle(handle) {}
~PageCacheHandle() {
if (_handle != nullptr) {
#ifndef BE_TEST
MemTracker* prev_tracker =
tls_thread_status.set_mem_tracker(GlobalEnv::GetInstance()->page_cache_mem_tracker());
DeferOp op([&] { tls_thread_status.set_mem_tracker(prev_tracker); });
#endif
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(GlobalEnv::GetInstance()->page_cache_mem_tracker());
_cache->release(_handle);
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/rowset/metadata_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void MetadataCache::set_capacity(size_t capacity) {
}

void MetadataCache::_insert(const std::string& key, std::weak_ptr<Rowset>* ptr, size_t size) {
Cache::Handle* handle = _cache->insert(CacheKey(key), ptr, size, _cache_value_deleter);
Cache::Handle* handle = _cache->insert(CacheKey(key), ptr, size, size, _cache_value_deleter);
_cache->release(handle);
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/storage/sstable/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ Iterator* Table::BlockReader(void* arg, const ReadOptions& options, const Slice&
if (s.ok()) {
block = new Block(contents);
if (contents.cachable && options.fill_cache) {
cache_handle = block_cache->insert(key, block, block->size(), &DeleteCachedBlock);
size_t block_size = block->size();
cache_handle = block_cache->insert(key, block, block_size, block_size, &DeleteCachedBlock);
}
}
if (options.stat != nullptr) {
Expand Down
22 changes: 9 additions & 13 deletions be/src/util/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,8 @@ void LRUCache::_evict_one_entry(LRUHandle* e) {
_usage -= e->charge;
}

Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value), CachePriority priority,
size_t value_size) {
Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, size_t value_size, size_t charge,
void (*deleter)(const CacheKey& key, void* value), CachePriority priority) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the value is self-descriptive, how about removing the value_size in the lru_cache because it is really unnecessary for a underly cache? It can reduce memory if value_size=charge in many cases. If value_size!=size, the caller can get the value size from the pointer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Removing it can save metadata memory. However, we won't change it in this PR. The purpose of this PR is to enable PageCache to use the StarCache interface.

auto* e = reinterpret_cast<LRUHandle*>(malloc(sizeof(LRUHandle) - 1 + key.size()));
e->value = value;
e->deleter = deleter;
Expand Down Expand Up @@ -397,8 +396,7 @@ uint32_t ShardedLRUCache::_shard(uint32_t hash) {
return hash >> (32 - kNumShardBits);
}

ShardedLRUCache::ShardedLRUCache(size_t capacity, ChargeMode charge_mode)
: _last_id(0), _capacity(capacity), _charge_mode(charge_mode) {
ShardedLRUCache::ShardedLRUCache(size_t capacity) : _last_id(0), _capacity(capacity) {
const size_t per_shard = (_capacity + (kNumShards - 1)) / kNumShards;
for (auto& _shard : _shards) {
_shard.set_capacity(per_shard);
Expand Down Expand Up @@ -429,11 +427,10 @@ bool ShardedLRUCache::adjust_capacity(int64_t delta, size_t min_capacity) {
return true;
}

Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value), CachePriority priority,
size_t value_size) {
Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t value_size, size_t charge,
void (*deleter)(const CacheKey& key, void* value), CachePriority priority) {
const uint32_t hash = _hash_slice(key);
return _shards[_shard(hash)].insert(key, hash, value, charge, deleter, priority, value_size);
return _shards[_shard(hash)].insert(key, hash, value, value_size, charge, deleter, priority);
}

Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) {
Expand All @@ -457,8 +454,7 @@ void* ShardedLRUCache::value(Handle* handle) {

Slice ShardedLRUCache::value_slice(Handle* handle) {
auto lru_handle = reinterpret_cast<LRUHandle*>(handle);
size_t record_size = _charge_mode == ChargeMode::VALUESIZE ? lru_handle->charge : lru_handle->value_size;
return {(char*)lru_handle->value, record_size};
return {(char*)lru_handle->value, lru_handle->value_size};
}

uint64_t ShardedLRUCache::new_id() {
Expand Down Expand Up @@ -531,8 +527,8 @@ void ShardedLRUCache::get_cache_status(rapidjson::Document* document) {
}
}

Cache* new_lru_cache(size_t capacity, ChargeMode charge_mode) {
return new ShardedLRUCache(capacity, charge_mode);
Cache* new_lru_cache(size_t capacity) {
return new ShardedLRUCache(capacity);
}

} // namespace starrocks
25 changes: 9 additions & 16 deletions be/src/util/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,9 @@ namespace starrocks {
class Cache;
class CacheKey;

enum class ChargeMode {
// use value size as charge
VALUESIZE = 0,
// use allocator tracking size as charge
MEMSIZE = 1
};

// Create a new cache with a fixed size capacity. This implementation
// of Cache uses a least-recently-used eviction policy.
extern Cache* new_lru_cache(size_t capacity, ChargeMode charge_mode = ChargeMode::VALUESIZE);
extern Cache* new_lru_cache(size_t capacity);

class CacheKey {
public:
Expand Down Expand Up @@ -139,9 +132,9 @@ class Cache {
//
// When the inserted entry is no longer needed, the key and
// value will be passed to "deleter".
virtual Handle* insert(const CacheKey& key, void* value, size_t charge,
virtual Handle* insert(const CacheKey& key, void* value, size_t value_size, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
CachePriority priority = CachePriority::NORMAL, size_t value_size = 0) = 0;
CachePriority priority = CachePriority::NORMAL) = 0;

// If the cache has no mapping for "key", returns NULL.
//
Expand Down Expand Up @@ -275,9 +268,9 @@ class LRUCache {
void set_capacity(size_t capacity);

// Like Cache methods, but with an extra "hash" parameter.
Cache::Handle* insert(const CacheKey& key, uint32_t hash, void* value, size_t charge,
Cache::Handle* insert(const CacheKey& key, uint32_t hash, void* value, size_t value_size, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
CachePriority priority = CachePriority::NORMAL, size_t value_size = 0);
CachePriority priority = CachePriority::NORMAL);
Cache::Handle* lookup(const CacheKey& key, uint32_t hash);
void release(Cache::Handle* handle);
void erase(const CacheKey& key, uint32_t hash);
Expand Down Expand Up @@ -318,10 +311,11 @@ static const int kNumShards = 1 << kNumShardBits;

class ShardedLRUCache : public Cache {
public:
explicit ShardedLRUCache(size_t capacity, ChargeMode charge_mode = ChargeMode::VALUESIZE);
explicit ShardedLRUCache(size_t capacity);
~ShardedLRUCache() override = default;
Handle* insert(const CacheKey& key, void* value, size_t charge, void (*deleter)(const CacheKey& key, void* value),
CachePriority priority = CachePriority::NORMAL, size_t value_size = 0) override;
Handle* insert(const CacheKey& key, void* value, size_t value_size, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
CachePriority priority = CachePriority::NORMAL) override;
Handle* lookup(const CacheKey& key) override;
void release(Handle* handle) override;
void erase(const CacheKey& key) override;
Expand All @@ -347,7 +341,6 @@ class ShardedLRUCache : public Cache {
std::mutex _mutex;
uint64_t _last_id;
size_t _capacity;
ChargeMode _charge_mode;
};

} // namespace starrocks
4 changes: 2 additions & 2 deletions be/test/exprs/jit_func_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class JITFunctionCacheTest : public ::testing::Test {
}

void mock_insert(string expr_name) {
auto* handle =
engine->get_func_cache()->insert(expr_name, nullptr, 1000, [](const CacheKey& key, void* value) {});
auto* handle = engine->get_func_cache()->insert(expr_name, nullptr, 1000, 1000,
[](const CacheKey& key, void* value) {});
if (handle != nullptr) {
engine->get_func_cache()->release(handle);
}
Expand Down
11 changes: 6 additions & 5 deletions be/test/util/lru_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,13 @@ class CacheTest : public testing::Test {

void Insert(int key, int value, int charge) {
std::string result;
_cache->release(_cache->insert(EncodeKey(&result, key), EncodeValue(value), charge, &CacheTest::Deleter));
_cache->release(
_cache->insert(EncodeKey(&result, key), EncodeValue(value), charge, charge, &CacheTest::Deleter));
}

void InsertDurable(int key, int value, int charge) {
std::string result;
_cache->release(_cache->insert(EncodeKey(&result, key), EncodeValue(value), charge, &CacheTest::Deleter,
_cache->release(_cache->insert(EncodeKey(&result, key), EncodeValue(value), charge, charge, &CacheTest::Deleter,
CachePriority::DURABLE));
}

Expand Down Expand Up @@ -236,7 +237,7 @@ static void deleter(const CacheKey& key, void* v) {

static void insert_LRUCache(LRUCache& cache, const CacheKey& key, int value, CachePriority priority) {
uint32_t hash = key.hash(key.data(), key.size(), 0);
cache.release(cache.insert(key, hash, EncodeValue(value), value, &deleter, priority));
cache.release(cache.insert(key, hash, EncodeValue(value), value, value, &deleter, priority));
}

TEST_F(CacheTest, Usage) {
Expand Down Expand Up @@ -317,7 +318,7 @@ TEST_F(CacheTest, SetCapacity) {
// Insert kCacheSize entries, but not releasing.
for (int i = 0; i < 32; i++) {
std::string result;
handles[i] = _cache->insert(EncodeKey(&result, i), EncodeValue(1000 + kCacheSize), 1, &CacheTest::Deleter);
handles[i] = _cache->insert(EncodeKey(&result, i), EncodeValue(1000 + kCacheSize), 1, 1, &CacheTest::Deleter);
}
ASSERT_EQ(kCacheSize, _cache->get_capacity());
ASSERT_EQ(32, _cache->get_memory_usage());
Expand All @@ -331,7 +332,7 @@ TEST_F(CacheTest, SetCapacity) {
// then release 32, usage should be 32.
for (int i = 32; i < 64; i++) {
std::string result;
handles[i] = _cache->insert(EncodeKey(&result, i), EncodeValue(1000 + kCacheSize), 1, &CacheTest::Deleter);
handles[i] = _cache->insert(EncodeKey(&result, i), EncodeValue(1000 + kCacheSize), 1, 1, &CacheTest::Deleter);
}
ASSERT_EQ(kCacheSize * 2, _cache->get_capacity());
ASSERT_EQ(64, _cache->get_memory_usage());
Expand Down
Loading