Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proof-of-concept implemenation for supporting multiple pack repositories #126

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 137 additions & 36 deletions disk_objectstore/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def __init__(self, folder: Union[str, Path]) -> None:
# IMPORANT! IF YOU ADD MORE, REMEMBER TO CLEAR THEM IN `init_container()`!
self._current_pack_id: Optional[int] = None
self._config: Optional[dict] = None
self._additional_pack_locations: Optional[dict] = None

def get_folder(self) -> str:
"""Return the path to the folder that will host the object-store container."""
Expand Down Expand Up @@ -247,10 +248,66 @@ def _get_pack_path_from_pack_id(
"""
pack_id = str(pack_id)
assert self._is_valid_pack_id(
pack_id, allow_repack_pack=allow_repack_pack
pack_id, allow_repack_pack=False
), f"Invalid pack ID {pack_id}"
assert (
allow_repack_pack is False
), "Please use _get_repack_path_from_pack_id to get a repack path valid for a pack_id"

# Not in the main repository folder - try to locate it from the additional storages
if not os.path.isfile(os.path.join(self._get_pack_folder(), pack_id)):

# locate the file path from the cache
loc_cache = self._get_additional_pack_locations(refresh=False)
if pack_id in loc_cache and os.path.isfile(loc_cache[pack_id]):
return loc_cache[pack_id]

# File moved? Refresh the cache again
loc_cache = self._get_additional_pack_locations(refresh=True)
# Try again
if pack_id in loc_cache:
return loc_cache[pack_id]

# Use the default path in the main repo folder
return os.path.join(self._get_pack_folder(), pack_id)

def _get_additional_pack_locations(self, refresh=False) -> Dict[str, str]:
"""Get cached pack location and refresh the cache if needed"""
# Build a dictionary mapping pack name to pack's absolute path
if self._additional_pack_locations is None or refresh:
self._additional_pack_locations = {}
for path in self.additional_pack_repos:
packs_folder = Path(path) / "packs"
for filepath in packs_folder.glob("*"):
self._additional_pack_locations[filepath.name] = str(
filepath.resolve()
)

return self._additional_pack_locations

def _is_pack_in_additional_storage(self, pack_id: Union[str, int]) -> bool:
"""Return wether a pack is in additional storage"""
pack_id = str(pack_id)
if os.path.isfile(os.path.join(self._get_pack_folder(), str(pack_id))):
return False
if pack_id in self._get_additional_pack_locations():
return True
if pack_id in self._get_additional_pack_locations(refresh=True):
return True
raise RuntimeError(f"Pack {pack_id} is missing!")

def _get_repack_path_from_pack_id(self, pack_id: Union[int, str]) -> str:
"""
Return the path of a repack file for a given pack

This ensures that the path is on the same repository folder as the pack file
of the give pack_id.
"""
pack_id = str(pack_id)
pack_path = Path(self._get_pack_path_from_pack_id(pack_id))
repack_file = pack_path.parent / str(self._REPACK_PACK_ID)
return str(repack_file)

def _get_pack_index_path(self) -> str:
"""Return the path to the SQLite file containing the index of packed objects."""
return os.path.join(self._folder, f"packs{self._PACK_INDEX_SUFFIX}")
Expand All @@ -267,12 +324,19 @@ def _get_pack_id_to_write_to(self) -> int:
"""
# Default to zero if not set (e.g. if it's None)
pack_id = self._current_pack_id or 0
additional_packs = list(
map(int, self._get_additional_pack_locations(refresh=True).keys())
)
max_addon_packid = max(additional_packs) if additional_packs else -10
while True:
pack_path = self._get_pack_path_from_pack_id(pack_id)
if not os.path.exists(pack_path):
if not os.path.exists(pack_path) and pack_id > max_addon_packid:
# Use this ID - the pack file does not exist yet
break
if os.path.getsize(pack_path) < self.pack_size_target:
if (
os.path.getsize(pack_path) < self.pack_size_target
and pack_id not in additional_packs
):
# Use this ID - the pack file is not "full" yet
break
# Try the next pack
Expand Down Expand Up @@ -345,6 +409,7 @@ def init_container(
# (at least the container_id, possibly the rest), and the other caches
self._config = None
self._current_pack_id = None
self._additional_pack_locations = None

if self.is_initialised:
raise FileExistsError(
Expand Down Expand Up @@ -385,6 +450,7 @@ def init_container(
"hash_type": hash_type,
"container_id": container_id,
"compression_algorithm": compression_algorithm,
"additional_pack_repos": [],
},
fhandle,
)
Expand All @@ -399,7 +465,7 @@ def init_container(

self._get_session(create=True)

def _get_repository_config(self) -> Dict[str, Union[int, str]]:
def _get_repository_config(self) -> Dict[str, Union[int, str, List]]:
"""Return the repository config."""
if self._config is None:
if not self.is_initialised:
Expand All @@ -410,6 +476,33 @@ def _get_repository_config(self) -> Dict[str, Union[int, str]]:
self._config = json.load(fhandle)
return self._config

def add_additional_pack_repo(self, repo_path):
"""Add additional pack repository to the container"""
repo_path = Path(repo_path).resolve()
repo_path.mkdir(exist_ok=True)
pack_path = repo_path / "packs"
pack_path.mkdir(exist_ok=True)

if (repo_path / "config.json").is_file():
# This repository already exists
with open(repo_path / "config.json") as fhandle:
repo_config = json.load(fhandle)
if repo_config["container_id"] != self.container_id:
raise ValueError(
"Trying to add a pack repository not belonging to this container!"
)
else:
# A new repository - store the the container_id in the config.json file
with open(repo_path / "config.json", mode="w") as fhandle:
json.dump({"container_id": self.container_id}, fhandle)
# Add the folder to the list of additional packs
repo_list: List[str] = self._get_repository_config()["additional_pack_repos"] # type: ignore[assignment]
repo_list.append(str(repo_path))

# Save the configuration file
with open(self._get_config_file(), "w") as fhandle:
json.dump(self._config, fhandle)

@property
def loose_prefix_len(self) -> int:
"""Return the length of the prefix of loose objects, when sharding.
Expand All @@ -434,6 +527,12 @@ def hash_type(self) -> str:
"""
return self._get_repository_config()["hash_type"] # type: ignore[return-value]

@property
def additional_pack_repos(self) -> List[Path]:
"""Additional pack repository paths"""
repo_list: List[str] = self._get_repository_config()["additional_pack_repos"] # type: ignore[assignment]
return [Path(repo) for repo in repo_list]

@property
def container_id(self) -> str:
"""Return the repository unique ID.
Expand Down Expand Up @@ -1134,7 +1233,7 @@ def get_total_size(self) -> Dict[str, int]:

@contextmanager
def lock_pack(
self, pack_id: str, allow_repack_pack: bool = False
self, pack_id: str, lock_repack: bool = False
) -> Iterator[StreamWriteBytesType]:
"""Lock the given pack id. Use as a context manager.

Expand All @@ -1143,15 +1242,19 @@ def lock_pack(

Important to use for avoiding concurrent access/append to the same file.
:param pack_id: a string with a valid pack name.
:param allow_pack_repack: if True, allow to open the pack file used for repacking
:param lock_repack: if True, is the corresponding repack file that will be locked and returned.
"""
assert self._is_valid_pack_id(pack_id, allow_repack_pack=allow_repack_pack)
assert self._is_valid_pack_id(pack_id, allow_repack_pack=False)

if lock_repack:
pack_file = self._get_repack_path_from_pack_id(pack_id)
else:
pack_file = self._get_pack_path_from_pack_id(
pack_id, allow_repack_pack=False
)

# Open file in exclusive mode
lock_file = os.path.join(self._get_pack_folder(), f"{pack_id}.lock")
pack_file = self._get_pack_path_from_pack_id(
pack_id, allow_repack_pack=allow_repack_pack
)
lock_file = Path(pack_file).with_suffix(".lock")
try:
with open(lock_file, "x"):
with open(pack_file, "ab") as pack_handle:
Expand Down Expand Up @@ -1185,12 +1288,16 @@ def _list_loose(self) -> Iterator[str]:
continue
yield first_level

def _list_packs(self) -> Iterator[str]:
def _list_packs(self, only_main_repo=False) -> Iterator[str]:
"""Iterate over packs.

.. note:: this returns a generator of the pack IDs.
"""
for fname in os.listdir(self._get_pack_folder()):

all_path = os.listdir(self._get_pack_folder())
if not only_main_repo:
all_path += self._get_additional_pack_locations(refresh=True)
for fname in all_path:
## I actually check for pack index files
# if not fname.endswith(self._PACK_INDEX_SUFFIX):
# continue
Expand Down Expand Up @@ -2514,7 +2621,7 @@ def repack(self, compress_mode: CompressMode = CompressMode.KEEP) -> None:

:param compress_mode: see docstring of ``repack_pack``.
"""
for pack_id in self._list_packs():
for pack_id in self._list_packs(only_main_repo=True):
self.repack_pack(pack_id, compress_mode=compress_mode)
self._vacuum()

Expand All @@ -2540,11 +2647,13 @@ def repack_pack(
pack_id
)

# Check that it does not exist
# Get the path of the pack and repack file - they are not expected to change thoughout this method
repack_path = self._get_repack_path_from_pack_id(pack_id)
pack_path = self._get_pack_path_from_pack_id(pack_id)

# The repack file should not exist yet
assert not os.path.exists(
self._get_pack_path_from_pack_id(
self._REPACK_PACK_ID, allow_repack_pack=True
)
repack_path
), "The repack pack '{}' already exists, probably a previous repacking aborted?".format(
self._REPACK_PACK_ID
)
Expand All @@ -2556,17 +2665,15 @@ def repack_pack(
).all()
if not one_object_in_pack:
# No objects. Clean up the pack file, if it exists.
if os.path.exists(self._get_pack_path_from_pack_id(pack_id)):
os.remove(self._get_pack_path_from_pack_id(pack_id))
if os.path.exists(pack_path):
os.remove(pack_path)
return

obj_dicts = []
# At least one object. Let's repack. We have checked before that the
# REPACK_PACK_ID did not exist.
with self.lock_pack(
str(self._REPACK_PACK_ID), allow_repack_pack=True
) as write_pack_handle:
with open(self._get_pack_path_from_pack_id(pack_id), "rb") as read_pack:
with self.lock_pack(pack_id, lock_repack=True) as write_pack_handle:
with open(pack_path, "rb") as read_pack:
stmt = (
select(
Obj.id,
Expand Down Expand Up @@ -2612,9 +2719,7 @@ def repack_pack(
obj_dicts.append(obj_dict)
safe_flush_to_disk(
write_pack_handle,
self._get_pack_path_from_pack_id(
self._REPACK_PACK_ID, allow_repack_pack=True
),
repack_path,
)

# We are done with data transfer.
Expand All @@ -2638,17 +2743,15 @@ def repack_pack(
pack_id=pack_id, repack_id=self._REPACK_PACK_ID
)
)
os.remove(self._get_pack_path_from_pack_id(pack_id))
os.remove(pack_path)

# I need now to move the file back. I need to be careful, to avoid conditions in which
# I remain with inconsistent data.
# Since hard links seem to be supported on all three platforms, I do a hard link
# of -1 back to the correct pack ID.
os.link(
self._get_pack_path_from_pack_id(
self._REPACK_PACK_ID, allow_repack_pack=True
),
self._get_pack_path_from_pack_id(pack_id),
repack_path,
pack_path,
)

# Before deleting the source (pack -1) I need now to update again all
Expand All @@ -2664,10 +2767,8 @@ def repack_pack(
# I am not doing this for now
# I now can unlink/delete the original source
os.unlink(
self._get_pack_path_from_pack_id(
self._REPACK_PACK_ID, allow_repack_pack=True
)
repack_path,
)

# We are now done. The temporary pack is gone, and the old `pack_id`
# has now been replaced with an udpated, repacked pack.
# has now been replaced with an updated, repacked pack.
43 changes: 43 additions & 0 deletions tests/test_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -3336,6 +3336,49 @@ def test_container_id(temp_container):
assert old_container_id != temp_container.container_id


def test_additional_pack_repo(temp_container):
"""Test adding additional pack locations"""

temp_dir = tempfile.mkdtemp()

temp_container: Container = temp_container

# Insert some random data
expected = {}
for idx in range(100):
content = f"{idx}".encode()
expected[temp_container.add_object(content)] = content

temp_container.pack_all_loose()
temp_container.add_additional_pack_repo(temp_dir)

# Move the pack 0 to the additional repository
shutil.move(
os.path.join(temp_container._get_pack_folder(), "0"),
os.path.join(temp_dir, "packs"),
)
new_pack = pathlib.Path(temp_container._get_pack_path_from_pack_id(0)).resolve()
assert new_pack.relative_to(pathlib.Path(temp_dir).resolve())

# Now the pack to write to is pack 1
assert temp_container._get_pack_id_to_write_to() == 1
retrieved = temp_container.get_objects_content(expected.keys())
assert retrieved == expected

# Adding data to pack 1 and validate
for idx in range(100, 200):
content = f"{idx}".encode()
expected[temp_container.add_object(content)] = content
temp_container.pack_all_loose()
retrieved = temp_container.get_objects_content(expected.keys())
assert retrieved == expected

# The first part should be the pack 00 and the second part is in pack 1
assert pathlib.Path(temp_container._get_pack_path_from_pack_id(1)).is_file()
assert temp_container.get_object_meta(list(expected.keys())[0])["pack_id"] == 0
assert temp_container.get_object_meta(list(expected.keys())[-1])["pack_id"] == 1


@pytest.mark.parametrize(
"compression_algorithm",
[
Expand Down