Skip to content

Commit

Permalink
extract path resolver. small refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
parikls committed Feb 12, 2025
1 parent 8a84f8d commit 2767261
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 69 deletions.
42 changes: 19 additions & 23 deletions src/platform_storage_api/admission_controller/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
KubeVolumeResolver,
VolumeResolverError,
)
from platform_storage_api.cache import PermissionsCache
from platform_storage_api.config import Config
from platform_storage_api.security import AbstractPermissionChecker, PermissionChecker


logger = logging.getLogger(__name__)
Expand All @@ -25,7 +23,7 @@
LABEL_APOLO_STORAGE_MOUNT_PATH = "platform.apolo.us/storage/mountPath"
LABEL_APOLO_STORAGE_HOST_PATH = "platform.apolo.us/storage/hostPath"

POD_INJECTED_VOLUME_NAME = "auto-injected-storage"
POD_INJECTED_VOLUME_NAME = "storage-auto-injected-volume"


class AdmissionReviewPatchType(str, Enum):
Expand All @@ -36,22 +34,13 @@ class AdmissionControllerApi:
def __init__(self, app: web.Application, config: Config) -> None:
self._app = app
self._config = config
self._permission_checker: AbstractPermissionChecker = PermissionChecker(
app, config
)
if config.permission_expiration_interval_s > 0:
self._permission_checker = PermissionsCache(
self._permission_checker,
expiration_interval_s=config.permission_expiration_interval_s,
forgetting_interval_s=config.permission_forgetting_interval_s,
)

@property
def _volume_resolver(self) -> KubeVolumeResolver:
return self._app[VOLUME_RESOLVER_KEY]

def register(self, app: web.Application) -> None:
app.add_routes([web.post("", self.handle_post_mutate)])
app.add_routes([web.post("/mutate", self.handle_post_mutate)])

async def handle_post_mutate(self, request: web.Request) -> Any:
payload: dict[str, Any] = await request.json()
Expand All @@ -63,6 +52,7 @@ async def handle_post_mutate(self, request: web.Request) -> Any:
kind = obj.get("kind")

if kind != "Pod":
# not a pod creation request. early-exit
return web.json_response(response.to_dict())

metadata = obj.get("metadata", {})
Expand All @@ -72,10 +62,17 @@ async def handle_post_mutate(self, request: web.Request) -> Any:
host_path_value = annotations.get(LABEL_APOLO_STORAGE_HOST_PATH)

if not (mount_path_value and host_path_value):
# storage is not requested
# a pod does not request storage. we can do early-exit here
return web.json_response(response.to_dict())

pod_spec = obj["spec"]
containers = pod_spec.get("containers") or []

if not containers:
# pod does not define any containers. we can exit
return web.json_response(response.to_dict())

# let's first try to resolve a path which POD wants to mount
# now let's try to resolve a path which POD wants to mount
try:
volume_spec = await self._volume_resolver.resolve_to_mount_volume(
path=host_path_value
Expand All @@ -86,8 +83,8 @@ async def handle_post_mutate(self, request: web.Request) -> Any:
response.allowed = False
return web.json_response(response.to_dict())

# ensure spec.volumes exists
if "volumes" not in obj["spec"]:
# ensure volumes
if "volumes" not in pod_spec:
response.add_patch(
path="/spec/volumes",
value=[]
Expand All @@ -102,23 +99,22 @@ async def handle_post_mutate(self, request: web.Request) -> Any:
}
)

# 3) Add a volumeMount with subPath for the first container
if "containers" in obj["spec"] and len(obj["spec"]["containers"]) > 0:
if "volumeMounts" not in obj["spec"]["containers"][0]:
# add a volumeMount with mount path for all the POD containers
for idx, container in enumerate(containers):
if "volumeMounts" not in container:
response.add_patch(
path="/spec/containers/0/volumeMounts",
path=f"/spec/containers/{idx}/volumeMounts",
value=[]
)

response.add_patch(
path="/spec/containers/0/volumeMounts/-",
path=f"/spec/containers/{idx}/volumeMounts/-",
value={
"name": POD_INJECTED_VOLUME_NAME,
"mountPath": mount_path_value,
}
)

# Return as JSON
return web.json_response(response.to_dict())


Expand Down
27 changes: 13 additions & 14 deletions src/platform_storage_api/admission_controller/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@
from typing import cast

from aiohttp import web
from neuro_auth_client import AuthClient

from platform_storage_api.admission_controller.api import AdmissionControllerApi
from platform_storage_api.admission_controller.app_keys import (
API_V1_KEY,
VOLUME_RESOLVER_KEY,
)
from platform_storage_api.admission_controller.volume_resolver import (
KubeVolumeResolver,
create_kube_client,
)
from platform_storage_api.api import ApiHandler, handle_exceptions
from platform_storage_api.config import Config, KubeConfig

from ..api import ApiHandler, create_path_resolver, handle_exceptions
from ..fs.local import LocalFileSystem
from ..security import AUTH_CLIENT_KEY
from .api import AdmissionControllerApi
from .app_keys import API_V1_KEY, VOLUME_RESOLVER_KEY
from .volume_resolver import KubeVolumeResolver, create_kube_client
from platform_storage_api.fs.local import LocalFileSystem
from platform_storage_api.storage import create_path_resolver


logger = logging.getLogger(__name__)
Expand All @@ -27,11 +31,6 @@ async def create_app(config: Config) -> web.Application:

async def _init_app(app: web.Application) -> AsyncIterator[None]:
async with AsyncExitStack() as exit_stack:
auth_client = await exit_stack.enter_async_context(
AuthClient(config.platform.auth_url, config.platform.token)
)
app[API_V1_KEY][AUTH_CLIENT_KEY] = auth_client

fs = await exit_stack.enter_async_context(
LocalFileSystem(
executor_max_workers=config.storage.fs_local_thread_pool_size
Expand Down Expand Up @@ -64,7 +63,7 @@ async def _init_app(app: web.Application) -> AsyncIterator[None]:
admission_controller_api = AdmissionControllerApi(api_v1_app, config)
admission_controller_api.register(admission_controller_app)

api_v1_app.add_subapp("/mutate", admission_controller_app)
api_v1_app.add_subapp("/admission-controller", admission_controller_app)
app.add_subapp("/api/v1", api_v1_app)

return app
17 changes: 2 additions & 15 deletions src/platform_storage_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@
from neuro_logging import init_logging, setup_sentry

from .cache import PermissionsCache
from .config import Config, StorageMode
from .config import Config
from .fs.local import (
DiskUsage,
FileStatus,
FileStatusPermission,
FileStatusType,
FileSystem,
LocalFileSystem,
)
from .security import (
Expand All @@ -42,10 +41,8 @@
PermissionChecker,
)
from .storage import (
MultipleStoragePathResolver,
SingleStoragePathResolver,
Storage,
StoragePathResolver,
create_path_resolver,
)


Expand Down Expand Up @@ -859,16 +856,6 @@ async def add_version_to_header(request: Request, response: web.StreamResponse)
response.headers["X-Service-Version"] = f"platform-storage-api/{package_version}"


def create_path_resolver(config: Config, fs: FileSystem) -> StoragePathResolver:
if config.storage.mode == StorageMode.SINGLE:
return SingleStoragePathResolver(config.storage.fs_local_base_path)
return MultipleStoragePathResolver(
fs,
config.storage.fs_local_base_path,
config.storage.fs_local_base_path / config.platform.cluster_name,
)


async def create_app(config: Config) -> web.Application:
app = web.Application(
middlewares=[handle_exceptions],
Expand Down
11 changes: 11 additions & 0 deletions src/platform_storage_api/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from neuro_logging import trace, trace_cm

from .config import Config, StorageMode
from .fs.local import (
DiskUsage,
FileStatus,
Expand Down Expand Up @@ -197,3 +198,13 @@ async def rename(
async def disk_usage(self, path: Union[PurePath, str, None] = None) -> DiskUsage:
real_path = await self._path_resolver.resolve_path(PurePath(path or "/"))
return await self._fs.disk_usage(real_path)


def create_path_resolver(config: Config, fs: FileSystem) -> StoragePathResolver:
if config.storage.mode == StorageMode.SINGLE:
return SingleStoragePathResolver(config.storage.fs_local_base_path)
return MultipleStoragePathResolver(
fs,
config.storage.fs_local_base_path,
config.storage.fs_local_base_path / config.platform.cluster_name,
)
20 changes: 3 additions & 17 deletions src/platform_storage_api/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@
from neuro_admin_client import AdminClient
from neuro_logging import init_logging, new_trace, setup_sentry

from .config import Config, EnvironConfigFactory, StorageMode
from .fs.local import FileSystem, LocalFileSystem
from .config import Config, EnvironConfigFactory
from .fs.local import LocalFileSystem
from .s3 import create_async_s3_client
from .s3_storage import StorageMetricsAsyncS3Storage
from .storage import (
MultipleStoragePathResolver,
SingleStoragePathResolver,
StoragePathResolver,
)
from .storage import create_path_resolver
from .storage_usage import StorageUsageService


Expand All @@ -39,16 +35,6 @@ async def upload_storage_usage(self) -> None:
LOGGER.info("Finished storage usage collection")


def create_path_resolver(config: Config, fs: FileSystem) -> StoragePathResolver:
if config.storage.mode == StorageMode.SINGLE:
return SingleStoragePathResolver(config.storage.fs_local_base_path)
return MultipleStoragePathResolver(
fs,
config.storage.fs_local_base_path,
config.storage.fs_local_base_path / config.platform.cluster_name,
)


@asynccontextmanager
async def create_app(config: Config) -> AsyncIterator[App]:
async with AsyncExitStack() as exit_stack:
Expand Down

0 comments on commit 2767261

Please sign in to comment.