diff --git a/README.md b/README.md index 5d4d4a51..c9c83c01 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,14 @@ pip install -r requirements-test.txt pytest -vv tests ``` +NOTE: +if you are running tests on a macOS, +you should install the coreutils, so tests which are using the `du` cmd will work properly: + +```shell +brew install coreutils +``` + ### Running integration tests locally Make sure you have docker installed. To build docker image where test run, you should set the following ENV variables: diff --git a/charts/platform-storage/templates/_helpers.tpl b/charts/platform-storage/templates/_helpers.tpl index b904fdf9..e3377931 100644 --- a/charts/platform-storage/templates/_helpers.tpl +++ b/charts/platform-storage/templates/_helpers.tpl @@ -51,6 +51,20 @@ release: {{ .Release.Name | quote }} value: {{ .Values.permissionForgettingInterval | quote }} - name: NP_STORAGE_API_KEEP_ALIVE_TIMEOUT value: {{ .Values.keepAliveTimeout | quote }} +- name: NP_STORAGE_API_K8S_API_URL + value: https://kubernetes.default:443 +- name: NP_STORAGE_API_K8S_AUTH_TYPE + value: token +- name: NP_STORAGE_API_K8S_CA_PATH + value: {{ include "platformStorage.kubeAuthMountRoot" . }}/ca.crt +- name: NP_STORAGE_API_K8S_TOKEN_PATH + value: {{ include "platformStorage.kubeAuthMountRoot" . }}/token +- name: NP_STORAGE_API_K8S_NS + value: {{ .Values.kube.namespace | default "default" | quote }} +- name: NP_STORAGE_ADMISSION_CONTROLLER_TLS_KEY + value: {{ .Values.admissionController.tlsKey}} +- name: NP_STORAGE_ADMISSION_CONTROLLER_TLS_CERT + value: {{ .Values.admissionController.tlsCert}} {{ include "platformStorage.env.s3" . }} {{- if .Values.sentry }} - name: SENTRY_DSN @@ -126,3 +140,7 @@ app: {{ include "platformStorage.name" . }} release: {{ .Release.Name }} service: platform-storage-metrics {{- end -}} + +{{- define "platformStorage.kubeAuthMountRoot" -}} +{{- printf "/var/run/secrets/kubernetes.io/serviceaccount" -}} +{{- end -}} diff --git a/charts/platform-storage/templates/admission-controller-deployment.yaml b/charts/platform-storage/templates/admission-controller-deployment.yaml new file mode 100644 index 00000000..fe2d2762 --- /dev/null +++ b/charts/platform-storage/templates/admission-controller-deployment.yaml @@ -0,0 +1,37 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Values.admissionController.app_name }} + namespace: platform + labels: + {{- include "platformStorage.labels.standard" . | nindent 4 }} + service: {{ .Values.admissionController.app_name }} +spec: + replicas: 1 + selector: + matchLabels: + app: {{ .Values.admissionController.app_name }} + template: + metadata: + labels: + app: {{ .Values.admissionController.app_name }} + spec: + serviceAccountName: {{ .Values.admissionController.app_name }} + containers: + - name: admission-controller + image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + command: + - platform-storage-admission-controller + ports: + - containerPort: {{ .Values.admissionController.service.port }} + name: http + protocol: TCP + env: + - name: SERVER_PORT + value: {{ .Values.admissionController.service.port | quote }} + {{- include "platformStorage.env" . | nindent 10 }} + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/charts/platform-storage/templates/admission-controller-rbac.yaml b/charts/platform-storage/templates/admission-controller-rbac.yaml new file mode 100644 index 00000000..e529f758 --- /dev/null +++ b/charts/platform-storage/templates/admission-controller-rbac.yaml @@ -0,0 +1,69 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .Values.admissionController.app_name }} + labels: + {{- include "platformStorage.labels.standard" . | nindent 4 }} + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: {{ .Values.admissionController.app_name }} + labels: + {{- include "platformStorage.labels.standard" . | nindent 4 }} +rules: + - apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list", "watch"] + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: {{ .Values.admissionController.app_name }} + labels: + {{- include "platformStorage.labels.standard" . | nindent 4 }} +subjects: + - kind: ServiceAccount + name: {{ .Values.admissionController.app_name }} + namespace: platform +roleRef: + kind: Role + apiGroup: rbac.authorization.k8s.io + name: {{ .Values.admissionController.app_name }} + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: {{ .Values.admissionController.app_name}} + labels: + {{- include "platformStorage.labels.standard" . | nindent 4 }} +rules: + - apiGroups: [""] + resources: ["persistentvolumeclaims"] + verbs: ["get", "list", "watch"] + - apiGroups: [""] + resources: ["persistentvolumes"] + verbs: ["get", "list", "watch"] + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: {{ .Values.admissionController.app_name}} + labels: + {{- include "platformStorage.labels.standard" . | nindent 4 }} +subjects: + - kind: ServiceAccount + name: {{ .Values.admissionController.app_name}} + namespace: platform +roleRef: + kind: ClusterRole + name: {{ .Values.admissionController.app_name}} + apiGroup: rbac.authorization.k8s.io diff --git a/charts/platform-storage/templates/admission-controller-service.yaml b/charts/platform-storage/templates/admission-controller-service.yaml new file mode 100644 index 00000000..22ee8c57 --- /dev/null +++ b/charts/platform-storage/templates/admission-controller-service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.admissionController.app_name }}-svc + namespace: platform + labels: + {{- include "platformStorage.labels.standard" . | nindent 4 }} +spec: + selector: + app: {{ .Values.admissionController.app_name }} + ports: + - name: https + port: 443 + targetPort: 8080 + protocol: TCP diff --git a/charts/platform-storage/templates/admission-controller-webhook.yaml b/charts/platform-storage/templates/admission-controller-webhook.yaml new file mode 100644 index 00000000..6d6a0928 --- /dev/null +++ b/charts/platform-storage/templates/admission-controller-webhook.yaml @@ -0,0 +1,27 @@ +apiVersion: admissionregistration.k8s.io/v1 +kind: MutatingWebhookConfiguration +metadata: + name: platform-storage-mutating-webhook + annotations: + "helm.sh/hook": post-install,post-upgrade + labels: + {{- include "platformStorage.labels.standard" . | nindent 4 }} +webhooks: + - name: pod-volume-injector.apolo.us + admissionReviewVersions: ["v1", "v1beta1"] + sideEffects: None + clientConfig: + caBundle: {{ .Values.admissionController.caBundle }} + service: + namespace: platform + name: {{ .Values.admissionController.app_name }}-svc + path: /admission-controller/mutate + rules: + - operations: ["CREATE"] + apiGroups: ["*"] + apiVersions: ["*"] + resources: ["pods"] + failurePolicy: Fail + objectSelector: + matchLabels: + platform.apolo.us/storage-injection-webhook: "enabled" diff --git a/charts/platform-storage/templates/deployment.yaml b/charts/platform-storage/templates/deployment.yaml index 7820de89..6893e60d 100644 --- a/charts/platform-storage/templates/deployment.yaml +++ b/charts/platform-storage/templates/deployment.yaml @@ -23,6 +23,7 @@ spec: app: {{ include "platformStorage.name" . }} release: {{ .Release.Name }} service: platform-storage + platform.apolo.us/storage-injection-webhook: "enabled" {{- if .Values.secrets }} annotations: checksum/secret: {{ include (print $.Template.BasePath "/secrets.yaml") . | sha256sum }} diff --git a/charts/platform-storage/values.yaml b/charts/platform-storage/values.yaml index 4fb48ea9..5015bdaa 100644 --- a/charts/platform-storage/values.yaml +++ b/charts/platform-storage/values.yaml @@ -38,6 +38,9 @@ service: storages: [] +kube: + namespace: "default" + sentry: appName: platform-storage sampleRate: 0.002 @@ -82,3 +85,24 @@ metrics: serviceMonitor: enabled: true + + +admissionController: + app_name: "storage-admission-controller" + tlsKey: + valueFrom: + secretKeyRef: + name: platform-storage-ac-certs + key: tls.key + tlsCert: + valueFrom: + secretKeyRef: + name: platform-storage-ac-certs + key: tls.crt + caBundle: + valueFrom: + secretKeyRef: + name: platform-storage-ac-certs + key: ca.bundle + service: + port: 8080 diff --git a/pyproject.toml b/pyproject.toml index 45c94e2a..c0ce7db8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,6 @@ select = [ ignore = [ "A003", # Class attribute "..." is shadowing a Python builtin "N818", - "PT005" ] [tool.ruff.lint.isort] diff --git a/setup.cfg b/setup.cfg index feb8cca2..eddd4565 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,8 +18,10 @@ install_requires = aiodns==3.2.0 aiofiles==24.1.0 aiohttp==3.10.10 + aiorwlock==1.5.0 + apolo-kube-client==25.2.6 cbor==1.0.0 - cchardet==2.1.7 + cchardet==2.2.0a2 fastapi==0.115.8 neuro-admin-client==25.1.1 neuro-auth-client==22.6.1 @@ -43,6 +45,7 @@ console_scripts = platform-storage-api = platform_storage_api.api:main platform-storage-metrics = platform_storage_api.metrics:main platform-storage-worker = platform_storage_api.worker:main + platform-storage-admission-controller = platform_storage_api.admission_controller.__main__:main [options.extras_require] dev = @@ -122,3 +125,6 @@ ignore_missing_imports = true [mypy-botocore.*] ignore_missing_imports = true + +[mypy-apolo_kube_client.*] +ignore_missing_imports = true diff --git a/src/platform_storage_api/admission_controller/__init__.py b/src/platform_storage_api/admission_controller/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/platform_storage_api/admission_controller/__main__.py b/src/platform_storage_api/admission_controller/__main__.py new file mode 100644 index 00000000..a98290ea --- /dev/null +++ b/src/platform_storage_api/admission_controller/__main__.py @@ -0,0 +1,69 @@ +import asyncio +import logging +import os +import ssl +import tempfile +from base64 import b64decode +from typing import cast + +from aiohttp import web +from neuro_logging import init_logging, setup_sentry + +from platform_storage_api.admission_controller.app import create_app +from platform_storage_api.config import AdmissionControllerTlsConfig, Config + + +logger = logging.getLogger(__name__) + + +def main() -> None: + init_logging() + config = Config.from_environ() + logging.info("Loaded config: %r", config) + + setup_sentry( + health_check_url_path="/api/v1/ping", + ignore_errors=[web.HTTPNotFound], + ) + + loop = asyncio.get_event_loop() + app = loop.run_until_complete(create_app(config)) + + context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + + crt_file = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix='.crt') + key_file = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix='.key') + + tls_config = cast( + AdmissionControllerTlsConfig, + config.admission_controller_tls_config + ) + try: + # extract certificates from the env and store in a temp files + crt_file.write(b64decode(tls_config.tls_cert).decode()) + key_file.write(b64decode(tls_config.tls_key).decode()) + crt_file.close() + key_file.close() + + context.load_cert_chain( + certfile=crt_file.name, + keyfile=key_file.name, + ) + + web.run_app( + app, + host=config.server.host, + port=config.server.port, + ssl_context=context, + ) + + except Exception as e: + logger.exception("Unhandled error") + raise e + finally: + os.unlink(crt_file.name) + os.unlink(key_file.name) + + +if __name__ == "__main__": + main() diff --git a/src/platform_storage_api/admission_controller/api.py b/src/platform_storage_api/admission_controller/api.py new file mode 100644 index 00000000..4614c5c8 --- /dev/null +++ b/src/platform_storage_api/admission_controller/api.py @@ -0,0 +1,219 @@ +import base64 +import dataclasses +import json +import logging +from enum import Enum +from typing import Any, Optional +from uuid import uuid4 + +from aiohttp import web + +from platform_storage_api.admission_controller.app_keys import VOLUME_RESOLVER_KEY +from platform_storage_api.admission_controller.volume_resolver import ( + KubeVolumeResolver, + VolumeResolverError, +) +from platform_storage_api.config import Config + + +logger = logging.getLogger(__name__) + + +ANNOTATION_APOLO_INJECT_STORAGE = "platform.apolo.us/injectStorage" + +LABEL_APOLO_ORG_NAME = "platform.apolo.us/org" +LABEL_APOLO_PROJECT_NAME = "platform.apolo.us/project" +LABEL_PLATFORM_STORAGE_POD = "platform-storage" + +INJECTED_VOLUME_NAME_PREFIX = "storage-auto-injected-volume" + + +def create_injection_volume_name() -> str: + """Creates a random volume name""" + return f"{INJECTED_VOLUME_NAME_PREFIX}-{uuid4().hex}" + + +class AdmissionReviewPatchType(str, Enum): + JSON = "JSONPatch" + + +@dataclasses.dataclass +class AdmissionReviewResponse: + uid: str + allowed: bool = True + patch: Optional[list[dict[str, Any]]] = None + patch_type: AdmissionReviewPatchType = AdmissionReviewPatchType.JSON + + def add_patch(self, path: str, value: Any) -> None: + if self.patch is None: + self.patch = [] + + self.patch.append({ + "op": "add", + "path": path, + "value": value, + }) + + def to_dict(self) -> dict[str, Any]: + patch: Optional[str] = None + + if self.patch is not None: + # convert patch changes to a b64 + dumped = json.dumps(self.patch).encode() + patch = base64.b64encode(dumped).decode() + + return { + "apiVersion": "admission.k8s.io/v1", + "kind": "AdmissionReview", + "response": { + "uid": self.uid, + "allowed": self.allowed, + "patch": patch, + "patchType": AdmissionReviewPatchType.JSON.value + } + } + + def to_response(self) -> web.Response: + return web.json_response(self.to_dict()) + + +class AdmissionControllerApi: + + def __init__( + self, + app: web.Application, + config: Config + ) -> None: + + self._app = app + self._config = config + + @property + def _volume_resolver(self) -> KubeVolumeResolver: + return self._app[VOLUME_RESOLVER_KEY] + + def register(self, app: web.Application) -> None: + app.add_routes([ + web.post("/mutate", self.handle_post_mutate) + ]) + + async def handle_post_mutate(self, request: web.Request) -> Any: + logger.info("mutate call") + payload: dict[str, Any] = await request.json() + + uid = payload["request"]["uid"] + admission_review = AdmissionReviewResponse(uid=uid) + + pod = payload["request"]["object"] + kind = pod.get("kind") + + if kind != "Pod": + # not a pod creation request. early-exit + return admission_review.to_response() + + metadata = pod.get("metadata", {}) + labels = metadata.get("labels") + + # let's check if this is a new platform storage POD + if ( + labels.get("app") == LABEL_PLATFORM_STORAGE_POD and + labels.get("service") == LABEL_PLATFORM_STORAGE_POD + ): + return await self._handle_new_platform_storage_pod( + pod=pod, + admission_review=admission_review, + ) + + annotations = metadata.get("annotations", {}) + raw_injection_spec = annotations.get(ANNOTATION_APOLO_INJECT_STORAGE) + + if not raw_injection_spec: + # a pod does not request storage. we can do early-exit here + logger.info("POD won't be mutated because doesnt define proper annotations") + return admission_review.to_response() + + return await self._handle_injection( + pod=pod, + raw_injection_spec=raw_injection_spec, + admission_review=admission_review, + ) + + async def _handle_new_platform_storage_pod( + self, + pod: dict[str, Any], + admission_review: AdmissionReviewResponse, + ) -> web.Response: + await self._volume_resolver.refresh_internal_state(pod=pod) + return admission_review.to_response() + + async def _handle_injection( + self, + pod: dict[str, Any], + raw_injection_spec: str, + admission_review: AdmissionReviewResponse, + ) -> web.Response: + pod_spec = pod["spec"] + containers = pod_spec.get("containers") or [] + + if not containers: + logger.info("POD won't be mutated because doesnt define containers") + # pod does not define any containers. we can exit + return admission_review.to_response() + + logger.info("Going to inject volumes") + try: + injection_spec = json.loads(raw_injection_spec) + except ValueError: + logger.info("Invalid injection spec. Denying the request") + admission_review.allowed = False + return admission_review.to_response() + + # let's ensure POD has volumes + if "volumes" not in pod_spec: + admission_review.add_patch( + path="/spec/volumes", + value=[] + ) + + # and ensure that each container has a volume mounts + for idx, container in enumerate(containers): + if "volumeMounts" not in container: + admission_review.add_patch( + path=f"/spec/containers/{idx}/volumeMounts", + value=[] + ) + + for mount_path, storage_path in injection_spec.items(): + # now let's try to resolve a path which POD wants to mount + try: + volume_spec = await self._volume_resolver.resolve_to_mount_volume( + path=storage_path + ) + except VolumeResolverError: + # report an error and disallow spawning a POD + logger.exception("Unable to resolve a volume for a provided path") + admission_review.allowed = False + return admission_review.to_response() + + future_volume_name = create_injection_volume_name() + + # add a volume host path + admission_review.add_patch( + path="/spec/volumes/-", + value={ + "name": future_volume_name, + **volume_spec.to_kube(), + } + ) + + # add a volumeMount with mount path for all the POD containers + for container_idx in range(len(containers)): + admission_review.add_patch( + path=f"/spec/containers/{container_idx}/volumeMounts/-", + value={ + "name": future_volume_name, + "mountPath": mount_path, + } + ) + + return admission_review.to_response() diff --git a/src/platform_storage_api/admission_controller/app.py b/src/platform_storage_api/admission_controller/app.py new file mode 100644 index 00000000..3302da86 --- /dev/null +++ b/src/platform_storage_api/admission_controller/app.py @@ -0,0 +1,60 @@ +import logging +from collections.abc import AsyncIterator +from contextlib import AsyncExitStack +from typing import cast + +from aiohttp import web +from apolo_kube_client.client import kube_client_from_config + +from platform_storage_api.admission_controller.api import AdmissionControllerApi +from platform_storage_api.admission_controller.app_keys import ( + VOLUME_RESOLVER_KEY, +) +from platform_storage_api.admission_controller.volume_resolver import ( + KubeVolumeResolver, +) +from platform_storage_api.config import Config, KubeConfig +from platform_storage_api.fs.local import LocalFileSystem +from platform_storage_api.storage import create_path_resolver + + +logger = logging.getLogger(__name__) + + +async def create_app(config: Config) -> web.Application: + app = web.Application( + handler_args={"keepalive_timeout": config.server.keep_alive_timeout_s}, + ) + + async def _init_app(app: web.Application) -> AsyncIterator[None]: + async with AsyncExitStack() as exit_stack: + fs = await exit_stack.enter_async_context( + LocalFileSystem( + executor_max_workers=config.storage.fs_local_thread_pool_size + ) + ) + path_resolver = create_path_resolver(config, fs) + + kube_config = cast(KubeConfig, config.kube) + kube_client = await exit_stack.enter_async_context( + kube_client_from_config(kube_config) + ) + volume_resolver = await exit_stack.enter_async_context( + KubeVolumeResolver( + kube_client=kube_client, + path_resolver=path_resolver, + ) + ) + app[VOLUME_RESOLVER_KEY] = volume_resolver + + yield + + app.cleanup_ctx.append(_init_app) + + admission_controller_app = web.Application() + admission_controller_api = AdmissionControllerApi(app, config) + admission_controller_api.register(admission_controller_app) + + app.add_subapp("/admission-controller", admission_controller_app) + + return app diff --git a/src/platform_storage_api/admission_controller/app_keys.py b/src/platform_storage_api/admission_controller/app_keys.py new file mode 100644 index 00000000..28f0b422 --- /dev/null +++ b/src/platform_storage_api/admission_controller/app_keys.py @@ -0,0 +1,6 @@ +from aiohttp import web + +from platform_storage_api.admission_controller.volume_resolver import KubeVolumeResolver + + +VOLUME_RESOLVER_KEY = web.AppKey("volume_resolver", KubeVolumeResolver) diff --git a/src/platform_storage_api/admission_controller/volume_resolver.py b/src/platform_storage_api/admission_controller/volume_resolver.py new file mode 100644 index 00000000..c82a754a --- /dev/null +++ b/src/platform_storage_api/admission_controller/volume_resolver.py @@ -0,0 +1,245 @@ +import asyncio +import dataclasses +import logging +from asyncio import Task +from enum import Enum +from pathlib import PurePath +from types import TracebackType +from typing import Any, Optional +from uuid import uuid4 + +import aiorwlock +from apolo_kube_client.client import KubeClient + +from platform_storage_api.storage import StoragePathResolver + + +logger = logging.getLogger(__name__) + +KUBE_PLATFORM_STORAGE_APP_NAME = "platform-storage" +KUBE_PLATFORM_NAMESPACE = "platform" + + +class VolumeResolverError(Exception): + ... + + +class VolumeBackend(str, Enum): + """Supported volume backends""" + NFS = "nfs" + + +@dataclasses.dataclass +class BaseVolumeSpec: + + def to_kube(self) -> dict[str, Any]: + return dataclasses.asdict(self) + + +@dataclasses.dataclass +class NfsVolumeSpec(BaseVolumeSpec): + server: str + path: str + + @classmethod + def from_pv(cls, pv: dict[str, Any]) -> "NfsVolumeSpec": + """ + Constructs a volume spec from a PV object + """ + return cls( + server=pv["spec"][VolumeBackend.NFS]["server"], + path=pv["spec"][VolumeBackend.NFS]["path"] + ) + + +T_VolumeSpec = NfsVolumeSpec # we'll add others later? + + +@dataclasses.dataclass +class KubeVolume: + backend: VolumeBackend + spec: T_VolumeSpec + + def to_kube(self) -> dict[str, Any]: + return { + self.backend.value: self.spec.to_kube() + } + + +class KubeVolumeResolver: + + def __init__( + self, + kube_client: KubeClient, + path_resolver: StoragePathResolver, + ): + self._kube = kube_client + self._path_resolver = path_resolver + + self._local_fs_prefix_to_kube_volume: dict[str, KubeVolume] = {} + """will store a mapping, where a key is a local mounted path, + and a value is a kube volume definition. + """ + + self._refresh_lock = aiorwlock.RWLock() + # will keep a references to a tasks, + # so they won't be garbage-collected + self._refresh_tasks: dict[str, Task[None]] = {} + + async def __aenter__(self) -> "KubeVolumeResolver": + """ + Initialize the kube volume resolver. + Gets the most fresh platform-storage POD and passes it to an + internal state updater, so we can resolve the real mount paths. + """ + logger.info("initializing volume resolver") + + namespace_url = self._kube.generate_namespace_url(KUBE_PLATFORM_NAMESPACE) + + # get all platform-storage PODs, and sort them by a creation timestamp, + # so we'll be able to get the most up-to-date POD. + pods_response = await self._kube.get( + f"{namespace_url}/pods", + params={ + "labelSelector": f"service={KUBE_PLATFORM_STORAGE_APP_NAME}", + } + ) + try: + most_fresh_pod = next( + iter( + sorted( + pods_response["items"], + key=lambda pod: pod["metadata"]["creationTimestamp"], + reverse=True, + ) + ) + ) + except StopIteration: + raise VolumeResolverError() from None + + await self._refresh_internal_state(pod=most_fresh_pod) + return self + + async def __aexit__( + self, + exc_type: Optional[type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> bool: + return exc_type is None + + def refresh_internal_state( + self, + pod: dict[str, Any] + ) -> Task[None]: + """ + Refreshes an internal state as a background task + """ + task_id = uuid4().hex + task = asyncio.create_task(self._refresh_internal_state(pod)) + self._refresh_tasks[task_id] = task + task.add_done_callback(lambda _: self._refresh_tasks.pop(task_id)) # cleanup + return task + + async def _refresh_internal_state( + self, + pod: dict[str, Any], + ) -> None: + """ + Refreshes an internal mapping of volumes based on a provided POD. + This method expects that the POD will have the most up-to-date mapping + of volumes + """ + async with self._refresh_lock.writer_lock: # type: ignore[attr-defined] + logger.info("refreshing internal state") + namespace_url = self._kube.generate_namespace_url(KUBE_PLATFORM_NAMESPACE) + + # internal storage name to a PV and PVC names + storage_name_to_pvc: dict[str, str] = {} + storage_name_to_pv: dict[str, str] = {} + + # storage name to a local mounted path + storage_name_to_local_path: dict[str, str] = {} + + # go over volumes to identify linked PVCs + for volume in pod["spec"]["volumes"]: + pvc = volume.get("persistentVolumeClaim") + if not pvc: + continue + storage_name, pvc_name = volume["name"], pvc["claimName"] + storage_name_to_pvc[storage_name] = pvc_name + + # now let's go over containers and figure out mount paths for volumes + for container in pod["spec"]["containers"]: + for volume_mount in container["volumeMounts"]: + volume_name = volume_mount["name"] + if volume_name not in storage_name_to_pvc: + continue + mount_path = volume_mount["mountPath"] + storage_name_to_local_path[volume_name] = mount_path + + # get PVs by claim names + for storage_name, claim_name in storage_name_to_pvc.items(): + claim = await self._kube.get( + f"{namespace_url}/persistentvolumeclaims/{claim_name}") + storage_name_to_pv[storage_name] = claim["spec"]["volumeName"] + + # finally, get real underlying storage paths + for storage_name, pv_name in storage_name_to_pv.items(): + pv = await self._kube.get( + f"{self._kube.api_v1_url}/persistentvolumes/{pv_name}") + + # find a supported volume backend for this storage + try: + volume_backend = next( + iter( + vb for vb in VolumeBackend if vb in pv["spec"] + ) + ) + except StopIteration: + logger.info( + "storage `%s` doesn't define supported volume backends", + storage_name + ) + continue + + local_path = storage_name_to_local_path[storage_name] + volume_definition = KubeVolume( + backend=volume_backend, + spec=NfsVolumeSpec.from_pv(pv=pv), + ) + self._local_fs_prefix_to_kube_volume[local_path] = volume_definition + + async def resolve_to_mount_volume( + self, + path: str + ) -> KubeVolume: + """ + Resolves a path to a proper mount volume, so later it can be used + in a kube spec of a POD. + """ + async with self._refresh_lock.reader_lock: # type: ignore[attr-defined] + normalized_path = PurePath(path.replace("storage://", "/")) + local_path = await self._path_resolver.resolve_path(normalized_path) + str_local_path = str(local_path) + + for fs_path_prefix, kube_volume \ + in self._local_fs_prefix_to_kube_volume.items(): + if not str_local_path.startswith(fs_path_prefix): + continue + + # patch match, so we create a new volume with the adjusted path + new_mount_path = str_local_path.replace( + fs_path_prefix, + kube_volume.spec.path, + 1, # replace it only once at the beginning of the string + ) + return KubeVolume( + backend=kube_volume.backend, + spec=NfsVolumeSpec( + server=kube_volume.spec.server, + path=new_mount_path + ) + ) + + raise VolumeResolverError() diff --git a/src/platform_storage_api/api.py b/src/platform_storage_api/api.py index 3ac76879..d2c89b76 100644 --- a/src/platform_storage_api/api.py +++ b/src/platform_storage_api/api.py @@ -26,13 +26,12 @@ from neuro_logging import init_logging, setup_sentry from .cache import PermissionsCache -from .config import Config, StorageMode +from .config import Config from .fs.local import ( DiskUsage, FileStatus, FileStatusPermission, FileStatusType, - FileSystem, LocalFileSystem, ) from .security import ( @@ -42,10 +41,8 @@ PermissionChecker, ) from .storage import ( - MultipleStoragePathResolver, - SingleStoragePathResolver, Storage, - StoragePathResolver, + create_path_resolver, ) @@ -859,16 +856,6 @@ async def add_version_to_header(request: Request, response: web.StreamResponse) response.headers["X-Service-Version"] = f"platform-storage-api/{package_version}" -def create_path_resolver(config: Config, fs: FileSystem) -> StoragePathResolver: - if config.storage.mode == StorageMode.SINGLE: - return SingleStoragePathResolver(config.storage.fs_local_base_path) - return MultipleStoragePathResolver( - fs, - config.storage.fs_local_base_path, - config.storage.fs_local_base_path / config.platform.cluster_name, - ) - - async def create_app(config: Config) -> web.Application: app = web.Application( middlewares=[handle_exceptions], @@ -903,7 +890,6 @@ async def _init_app(app: web.Application) -> AsyncIterator[None]: path_resolver = create_path_resolver(config, fs) storage = Storage(path_resolver, fs) app[API_V1_KEY][STORAGE_KEY] = storage - # TODO (Rafa Zubairov): configured service shall ensure that # pre-requisites are up and running # TODO here we shall test whether AuthClient properly diff --git a/src/platform_storage_api/config.py b/src/platform_storage_api/config.py index ffa4b35f..df34cbb3 100644 --- a/src/platform_storage_api/config.py +++ b/src/platform_storage_api/config.py @@ -1,12 +1,17 @@ import enum +import logging import os from dataclasses import dataclass, field -from pathlib import PurePath -from typing import Optional +from pathlib import Path, PurePath +from typing import Optional, Union +from apolo_kube_client.client import KubeClientAuthType, KubeConfig from yarl import URL +logger = logging.getLogger(__name__) + + @dataclass(frozen=True) class ServerConfig: host: str = "0.0.0.0" @@ -60,12 +65,20 @@ class S3Config: endpoint_url: Optional[str] = None +@dataclass(frozen=True) +class AdmissionControllerTlsConfig: + tls_cert: str = field(repr=False) + tls_key: str = field(repr=False) + + @dataclass(frozen=True) class Config: server: StorageServerConfig storage: StorageConfig platform: PlatformConfig s3: S3Config + kube: Optional[KubeConfig] = None + admission_controller_tls_config: Optional[AdmissionControllerTlsConfig] = None permission_expiration_interval_s: float = 0 permission_forgetting_interval_s: float = 0 @@ -84,6 +97,33 @@ class EnvironConfigFactory: def __init__(self, environ: Optional[dict[str, str]] = None) -> None: self._environ = environ or os.environ + def create(self) -> Config: + server_config = self.create_storage_server() + storage_config = self.create_storage() + permission_expiration_interval_s: float = float( + self._environ.get( + "NP_PERMISSION_EXPIRATION_INTERVAL", + Config.permission_expiration_interval_s, + ) + ) + permission_forgetting_interval_s: float = float( + self._environ.get( + "NP_PERMISSION_FORGETTING_INTERVAL", + Config.permission_forgetting_interval_s, + ) + ) + return Config( + server=server_config, + storage=storage_config, + platform=self.create_platform(), + s3=self.create_s3(), + kube=self.create_kube(), + admission_controller_tls_config=\ + self.create_admission_controller_tls_config(), + permission_expiration_interval_s=permission_expiration_interval_s, + permission_forgetting_interval_s=permission_forgetting_interval_s, + ) + def _get_url(self, name: str) -> Optional[URL]: value = self._environ[name] return None if value == "-" else URL(value) @@ -139,32 +179,64 @@ def create_s3(self) -> S3Config: key_prefix=self._environ.get("S3_KEY_PREFIX", S3Config.key_prefix), ) - def create(self) -> Config: - server_config = self.create_storage_server() - storage_config = self.create_storage() - permission_expiration_interval_s: float = float( - self._environ.get( - "NP_PERMISSION_EXPIRATION_INTERVAL", - Config.permission_expiration_interval_s, - ) + def create_metrics(self) -> MetricsConfig: + return MetricsConfig( + server=self.create_server(), + s3=self.create_s3(), ) - permission_forgetting_interval_s: float = float( + + def create_kube(self) -> Union[KubeConfig, None]: + endpoint_url = self._environ.get("NP_STORAGE_API_K8S_API_URL") + if not endpoint_url: + logger.info("kube client won't be initialized due to a missing url") + return None + auth_type = KubeClientAuthType( self._environ.get( - "NP_PERMISSION_FORGETTING_INTERVAL", - Config.permission_forgetting_interval_s, + "NP_STORAGE_API_K8S_AUTH_TYPE", + KubeConfig.auth_type.value ) ) - return Config( - server=server_config, - storage=storage_config, - platform=self.create_platform(), - s3=self.create_s3(), - permission_expiration_interval_s=permission_expiration_interval_s, - permission_forgetting_interval_s=permission_forgetting_interval_s, + ca_path = self._environ.get("NP_STORAGE_API_K8S_CA_PATH") + ca_data = Path(ca_path).read_text() if ca_path else None + + token_path = self._environ.get("NP_STORAGE_API_K8S_TOKEN_PATH") + + return KubeConfig( + endpoint_url=endpoint_url, + cert_authority_data_pem=ca_data, + auth_type=auth_type, + auth_cert_path=self._environ.get("NP_STORAGE_API_K8S_AUTH_CERT_PATH"), + auth_cert_key_path=self._environ.get( + "NP_STORAGE_API_K8S_AUTH_CERT_KEY_PATH"), + token=None, + token_path=token_path, + namespace=self._environ.get("NP_STORAGE_API_K8S_NS", KubeConfig.namespace), + client_conn_timeout_s=int( + self._environ.get("NP_STORAGE_API_K8S_CLIENT_CONN_TIMEOUT") + or KubeConfig.client_conn_timeout_s + ), + client_read_timeout_s=int( + self._environ.get("NP_STORAGE_API_K8S_CLIENT_READ_TIMEOUT") + or KubeConfig.client_read_timeout_s + ), + client_watch_timeout_s=int( + self._environ.get("NP_STORAGE_API_K8S_CLIENT_WATCH_TIMEOUT") + or KubeConfig.client_watch_timeout_s + ), + client_conn_pool_size=int( + self._environ.get("NP_STORAGE_API_K8S_CLIENT_CONN_POOL_SIZE") + or KubeConfig.client_conn_pool_size + ), ) - def create_metrics(self) -> MetricsConfig: - return MetricsConfig( - server=self.create_server(), - s3=self.create_s3(), + def create_admission_controller_tls_config( + self + ) -> Optional[AdmissionControllerTlsConfig]: + tls_key = self._environ.get("NP_STORAGE_ADMISSION_CONTROLLER_TLS_KEY") + tls_cert = self._environ.get("NP_STORAGE_ADMISSION_CONTROLLER_TLS_CERT") + if not (tls_key and tls_cert): + return None + return AdmissionControllerTlsConfig( + tls_key=tls_key, + tls_cert=tls_cert, ) diff --git a/src/platform_storage_api/fs/local.py b/src/platform_storage_api/fs/local.py index c7f18119..1ed416c8 100644 --- a/src/platform_storage_api/fs/local.py +++ b/src/platform_storage_api/fs/local.py @@ -7,6 +7,7 @@ import os import shutil import stat as statmodule +import sys from collections.abc import AsyncIterator, Iterable, Iterator from concurrent.futures import ThreadPoolExecutor from contextlib import AbstractAsyncContextManager, asynccontextmanager @@ -609,8 +610,15 @@ async def disk_usage_by_file(self, *paths: PurePath) -> list[FileUsage]: async with aiofiles.tempfile.NamedTemporaryFile("w") as temp_file: await temp_file.write("\0".join(str(p) for p in paths)) await temp_file.flush() + if sys.platform == "darwin": + # `du` on a macOS doesn't accept a `files0-from` argument. + # for these purposes, macOS users should use a coreutils version + # which is a `gdu`. + cmd = "gdu" + else: + cmd = "du" process = await asyncio.subprocess.create_subprocess_exec( - "du", + cmd, "-sh", f"--files0-from={temp_file.name}", stdout=asyncio.subprocess.PIPE, diff --git a/src/platform_storage_api/storage.py b/src/platform_storage_api/storage.py index 2a3c66da..cff0215b 100644 --- a/src/platform_storage_api/storage.py +++ b/src/platform_storage_api/storage.py @@ -8,6 +8,7 @@ from neuro_logging import trace, trace_cm +from .config import Config, StorageMode from .fs.local import ( DiskUsage, FileStatus, @@ -197,3 +198,13 @@ async def rename( async def disk_usage(self, path: Union[PurePath, str, None] = None) -> DiskUsage: real_path = await self._path_resolver.resolve_path(PurePath(path or "/")) return await self._fs.disk_usage(real_path) + + +def create_path_resolver(config: Config, fs: FileSystem) -> StoragePathResolver: + if config.storage.mode == StorageMode.SINGLE: + return SingleStoragePathResolver(config.storage.fs_local_base_path) + return MultipleStoragePathResolver( + fs, + config.storage.fs_local_base_path, + config.storage.fs_local_base_path / config.platform.cluster_name, + ) diff --git a/src/platform_storage_api/worker.py b/src/platform_storage_api/worker.py index 4eae9a2d..ec53f052 100644 --- a/src/platform_storage_api/worker.py +++ b/src/platform_storage_api/worker.py @@ -13,15 +13,11 @@ from neuro_admin_client import AdminClient from neuro_logging import init_logging, new_trace, setup_sentry -from .config import Config, EnvironConfigFactory, StorageMode -from .fs.local import FileSystem, LocalFileSystem +from .config import Config, EnvironConfigFactory +from .fs.local import LocalFileSystem from .s3 import create_async_s3_client from .s3_storage import StorageMetricsAsyncS3Storage -from .storage import ( - MultipleStoragePathResolver, - SingleStoragePathResolver, - StoragePathResolver, -) +from .storage import create_path_resolver from .storage_usage import StorageUsageService @@ -39,16 +35,6 @@ async def upload_storage_usage(self) -> None: LOGGER.info("Finished storage usage collection") -def create_path_resolver(config: Config, fs: FileSystem) -> StoragePathResolver: - if config.storage.mode == StorageMode.SINGLE: - return SingleStoragePathResolver(config.storage.fs_local_base_path) - return MultipleStoragePathResolver( - fs, - config.storage.fs_local_base_path, - config.storage.fs_local_base_path / config.platform.cluster_name, - ) - - @asynccontextmanager async def create_app(config: Config) -> AsyncIterator[App]: async with AsyncExitStack() as exit_stack: diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 91ec34ca..4f97c1d7 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -18,6 +18,7 @@ from platform_storage_api.api import create_app from platform_storage_api.config import ( Config, + KubeConfig, MetricsConfig, PlatformConfig, S3Config, @@ -101,6 +102,7 @@ def config( storage=storage_config, platform=platform_config, s3=s3_config, + kube=KubeConfig(endpoint_url="https://localhost:8443"), ) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index ba20549e..fb425faf 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -1,4 +1,4 @@ -from pathlib import PurePath +from pathlib import Path, PurePath import pytest from yarl import URL @@ -11,6 +11,24 @@ ) +CA_DATA_PEM = "this-is-certificate-authority-public-key" +TOKEN = "this-is-token" + + +@pytest.fixture +def cert_authority_path(tmp_path: Path) -> str: + ca_path = tmp_path / "ca.crt" + ca_path.write_text(CA_DATA_PEM) + return str(ca_path) + + +@pytest.fixture +def token_path(tmp_path: Path) -> str: + token_path = tmp_path / "token" + token_path.write_text(TOKEN) + return str(token_path) + + class TestServerConfig: def test_from_environ(self) -> None: environ = {"NP_STORAGE_API_PORT": "1234"} @@ -45,6 +63,7 @@ def test_from_environ_defaults(self) -> None: "NP_PLATFORM_CLUSTER_NAME": "test-cluster", "S3_REGION": "test-region", "S3_BUCKET_NAME": "test-bucket", + "NP_STORAGE_API_K8S_API_URL": "https://localhost:8443" } config = Config.from_environ(environ) assert config.server.port == 8080 @@ -59,7 +78,11 @@ def test_from_environ_defaults(self) -> None: assert config.s3.region == "test-region" assert config.s3.bucket_name == "test-bucket" - def test_from_environ_custom(self) -> None: + def test_from_environ_custom( + self, + cert_authority_path: str, + token_path: str + ) -> None: environ = { "NP_STORAGE_MODE": "multiple", "NP_STORAGE_LOCAL_BASE_PATH": "/path/to/dir", @@ -72,6 +95,18 @@ def test_from_environ_custom(self) -> None: "S3_REGION": "test-region", "S3_BUCKET_NAME": "test-bucket", "S3_KEY_PREFIX": "test-key-prefix", + "NP_STORAGE_API_K8S_API_URL": "https://localhost:8443", + "NP_STORAGE_API_K8S_AUTH_TYPE": "token", + "NP_STORAGE_API_K8S_CA_PATH": cert_authority_path, + "NP_STORAGE_API_K8S_TOKEN_PATH": token_path, + "NP_STORAGE_API_K8S_AUTH_CERT_PATH": "/cert_path", + "NP_STORAGE_API_K8S_AUTH_CERT_KEY_PATH": "/cert_key_path", + "NP_STORAGE_API_K8S_NS": "other-namespace", + "NP_STORAGE_API_K8S_CLIENT_CONN_TIMEOUT": "111", + "NP_STORAGE_API_K8S_CLIENT_READ_TIMEOUT": "222", + "NP_STORAGE_API_K8S_CLIENT_WATCH_TIMEOUT": "555", + "NP_STORAGE_API_K8S_CLIENT_CONN_POOL_SIZE": "333", + "NP_STORAGE_API_K8S_STORAGE_CLASS": "some-class", } config = Config.from_environ(environ) assert config.server.port == 8080 diff --git a/tests/unit/test_storage_usage.py b/tests/unit/test_storage_usage.py index 48150052..59f1cc33 100644 --- a/tests/unit/test_storage_usage.py +++ b/tests/unit/test_storage_usage.py @@ -10,6 +10,7 @@ from platform_storage_api.config import ( Config, + KubeConfig, PlatformConfig, S3Config, StorageConfig, @@ -37,6 +38,9 @@ def config() -> Config: region="test-region", bucket_name="test-bucket", ), + kube=KubeConfig( + endpoint_url="https://localhost:8443", + ) )