diff --git a/inference/core/version.py b/inference/core/version.py index 80e60327a..a691cb603 100644 --- a/inference/core/version.py +++ b/inference/core/version.py @@ -1,4 +1,4 @@ -__version__ = "0.16.4" +__version__ = "0.17.0" if __name__ == "__main__": diff --git a/inference/core/workflows/core_steps/loader.py b/inference/core/workflows/core_steps/loader.py index e98e8d7a7..ef50499f0 100644 --- a/inference/core/workflows/core_steps/loader.py +++ b/inference/core/workflows/core_steps/loader.py @@ -105,6 +105,9 @@ from inference.core.workflows.core_steps.sinks.roboflow.dataset_upload.v1 import ( RoboflowDatasetUploadBlockV1, ) +from inference.core.workflows.core_steps.sinks.roboflow.dataset_upload.v2 import ( + RoboflowDatasetUploadBlockV2, +) from inference.core.workflows.core_steps.transformations.absolute_static_crop.v1 import ( AbsoluteStaticCropBlockV1, ) @@ -291,6 +294,7 @@ def load_blocks() -> List[Type[WorkflowBlock]]: ImageContoursDetectionBlockV1, ClipComparisonBlockV2, CameraFocusBlockV1, + RoboflowDatasetUploadBlockV2, ] diff --git a/inference/core/workflows/core_steps/sinks/roboflow/dataset_upload/v1.py b/inference/core/workflows/core_steps/sinks/roboflow/dataset_upload/v1.py index 488b6952b..ba57666ef 100644 --- a/inference/core/workflows/core_steps/sinks/roboflow/dataset_upload/v1.py +++ b/inference/core/workflows/core_steps/sinks/roboflow/dataset_upload/v1.py @@ -1,3 +1,20 @@ +""" +***************************************************************** +* WARNING! * +***************************************************************** +This module contains the utility functions used by +RoboflowDatasetUploadBlockV2. + +We do not recommend making multiple blocks dependent on the same code, +but the change between v1 and v2 was basically the default value of +some parameter - hence we decided not to replicate the code. + +If you need to modify this module beware that you may introduce +change to RoboflowDatasetUploadBlockV2! If that happens, +probably that's the time to disentangle those blocks and copy the +code. +""" + import hashlib import json import logging diff --git a/inference/core/workflows/core_steps/sinks/roboflow/dataset_upload/v2.py b/inference/core/workflows/core_steps/sinks/roboflow/dataset_upload/v2.py new file mode 100644 index 000000000..0d03667ab --- /dev/null +++ b/inference/core/workflows/core_steps/sinks/roboflow/dataset_upload/v2.py @@ -0,0 +1,322 @@ +import random +from concurrent.futures import ThreadPoolExecutor +from typing import List, Literal, Optional, Tuple, Type, Union + +import supervision as sv +from fastapi import BackgroundTasks +from pydantic import ConfigDict, Field +from typing_extensions import Annotated + +from inference.core.cache.base import BaseCache +from inference.core.workflows.core_steps.sinks.roboflow.dataset_upload.v1 import ( + register_datapoint_at_roboflow, +) +from inference.core.workflows.execution_engine.entities.base import ( + Batch, + OutputDefinition, + WorkflowImageData, +) +from inference.core.workflows.execution_engine.entities.types import ( + BATCH_OF_BOOLEAN_KIND, + BATCH_OF_CLASSIFICATION_PREDICTION_KIND, + BATCH_OF_INSTANCE_SEGMENTATION_PREDICTION_KIND, + BATCH_OF_KEYPOINT_DETECTION_PREDICTION_KIND, + BATCH_OF_OBJECT_DETECTION_PREDICTION_KIND, + BATCH_OF_STRING_KIND, + BOOLEAN_KIND, + FLOAT_KIND, + ROBOFLOW_PROJECT_KIND, + STRING_KIND, + ImageInputField, + StepOutputImageSelector, + StepOutputSelector, + WorkflowImageSelector, + WorkflowParameterSelector, +) +from inference.core.workflows.prototypes.block import ( + BlockResult, + WorkflowBlock, + WorkflowBlockManifest, +) + +FloatZeroToHundred = Annotated[float, Field(ge=0.0, le=100.0)] + +SHORT_DESCRIPTION = "Save images and predictions in your Roboflow Dataset" + +LONG_DESCRIPTION = """ +Block let users save their images and predictions into Roboflow Dataset. Persisting data from +production environments helps iteratively building more robust models. + +Block provides configuration options to decide how data should be stored and what are the limits +to be applied. We advice using this block in combination with rate limiter blocks to effectively +collect data that the model struggle with. +""" + +WORKSPACE_NAME_CACHE_EXPIRE = 900 # 15 min +TIMESTAMP_FORMAT = "%Y_%m_%d" +DUPLICATED_STATUS = "Duplicated image" +BatchCreationFrequency = Literal["never", "daily", "weekly", "monthly"] + + +class BlockManifest(WorkflowBlockManifest): + model_config = ConfigDict( + json_schema_extra={ + "name": "Roboflow Dataset Upload", + "version": "v2", + "short_description": SHORT_DESCRIPTION, + "long_description": LONG_DESCRIPTION, + "license": "Apache-2.0", + "block_type": "sink", + } + ) + type: Literal["roboflow_core/roboflow_dataset_upload@v2"] + images: Union[WorkflowImageSelector, StepOutputImageSelector] = ImageInputField + predictions: Optional[ + StepOutputSelector( + kind=[ + BATCH_OF_OBJECT_DETECTION_PREDICTION_KIND, + BATCH_OF_INSTANCE_SEGMENTATION_PREDICTION_KIND, + BATCH_OF_KEYPOINT_DETECTION_PREDICTION_KIND, + BATCH_OF_CLASSIFICATION_PREDICTION_KIND, + ] + ) + ] = Field( + default=None, + description="Reference q detection-like predictions", + examples=["$steps.object_detection_model.predictions"], + ) + target_project: Union[ + WorkflowParameterSelector(kind=[ROBOFLOW_PROJECT_KIND]), str + ] = Field( + description="name of Roboflow dataset / project to be used as target for collected data", + examples=["my_dataset", "$inputs.target_al_dataset"], + ) + usage_quota_name: str = Field( + description="Unique name for Roboflow project pointed by `target_project` parameter, that identifies " + "usage quota applied for this block.", + examples=["quota-for-data-sampling-1"], + ) + data_percentage: Union[ + FloatZeroToHundred, WorkflowParameterSelector(kind=[FLOAT_KIND]) + ] = Field( + description="Percent of data that will be saved (in range [0.0, 100.0])", + examples=[True, False, "$inputs.persist_predictions"], + ) + persist_predictions: Union[bool, WorkflowParameterSelector(kind=[BOOLEAN_KIND])] = ( + Field( + default=True, + description="Boolean flag to decide if predictions should be registered along with images", + examples=[True, False, "$inputs.persist_predictions"], + ) + ) + minutely_usage_limit: int = Field( + default=10, + description="Maximum number of data registration requests per minute accounted in scope of " + "single server or whole Roboflow platform, depending on context of usage.", + examples=[10, 60], + ) + hourly_usage_limit: int = Field( + default=100, + description="Maximum number of data registration requests per hour accounted in scope of " + "single server or whole Roboflow platform, depending on context of usage.", + examples=[10, 60], + ) + daily_usage_limit: int = Field( + default=1000, + description="Maximum number of data registration requests per day accounted in scope of " + "single server or whole Roboflow platform, depending on context of usage.", + examples=[10, 60], + ) + max_image_size: Tuple[int, int] = Field( + default=(1920, 1080), + description="Maximum size of the image to be registered - bigger images will be " + "downsized preserving aspect ratio. Format of data: `(width, height)`", + examples=[(1920, 1080), (512, 512)], + ) + compression_level: int = Field( + default=95, + gt=0, + le=100, + description="Compression level for images registered", + examples=[95, 75], + ) + registration_tags: List[ + Union[WorkflowParameterSelector(kind=[STRING_KIND]), str] + ] = Field( + default_factory=list, + description="Tags to be attached to registered datapoints", + examples=[["location-florida", "factory-name", "$inputs.dynamic_tag"]], + ) + disable_sink: Union[bool, WorkflowParameterSelector(kind=[BOOLEAN_KIND])] = Field( + default=False, + description="boolean flag that can be also reference to input - to arbitrarily disable " + "data collection for specific request", + examples=[True, "$inputs.disable_active_learning"], + ) + fire_and_forget: Union[bool, WorkflowParameterSelector(kind=[BOOLEAN_KIND])] = ( + Field( + default=True, + description="Boolean flag dictating if sink is supposed to be executed in the background, " + "not waiting on status of registration before end of workflow run. Use `True` if best-effort " + "registration is needed, use `False` while debugging and if error handling is needed", + ) + ) + labeling_batch_prefix: Union[str, WorkflowParameterSelector(kind=[STRING_KIND])] = ( + Field( + default="workflows_data_collector", + description="Prefix of the name for labeling batches that will be registered in Roboflow app", + examples=["my_labeling_batch_name"], + ) + ) + labeling_batches_recreation_frequency: BatchCreationFrequency = Field( + default="never", + description="Frequency in which new labeling batches are created in Roboflow app. New batches " + "are created with name prefix provided in `labeling_batch_prefix` in given time intervals." + "Useful in organising labeling flow.", + examples=["never", "daily"], + ) + + @classmethod + def accepts_batch_input(cls) -> bool: + return True + + @classmethod + def describe_outputs(cls) -> List[OutputDefinition]: + return [ + OutputDefinition(name="error_status", kind=[BATCH_OF_BOOLEAN_KIND]), + OutputDefinition(name="message", kind=[BATCH_OF_STRING_KIND]), + ] + + @classmethod + def get_execution_engine_compatibility(cls) -> Optional[str]: + return ">=1.0.0,<2.0.0" + + +class RoboflowDatasetUploadBlockV2(WorkflowBlock): + + def __init__( + self, + cache: BaseCache, + api_key: Optional[str], + background_tasks: Optional[BackgroundTasks], + thread_pool_executor: Optional[ThreadPoolExecutor], + ): + self._cache = cache + self._api_key = api_key + self._background_tasks = background_tasks + self._thread_pool_executor = thread_pool_executor + + @classmethod + def get_init_parameters(cls) -> List[str]: + return ["cache", "api_key", "background_tasks", "thread_pool_executor"] + + @classmethod + def get_manifest(cls) -> Type[WorkflowBlockManifest]: + return BlockManifest + + def run( + self, + images: Batch[WorkflowImageData], + predictions: Optional[Batch[Union[sv.Detections, dict]]], + target_project: str, + usage_quota_name: str, + data_percentage: float, + minutely_usage_limit: int, + persist_predictions: bool, + hourly_usage_limit: int, + daily_usage_limit: int, + max_image_size: Tuple[int, int], + compression_level: int, + registration_tags: List[str], + disable_sink: bool, + fire_and_forget: bool, + labeling_batch_prefix: str, + labeling_batches_recreation_frequency: BatchCreationFrequency, + ) -> BlockResult: + if self._api_key is None: + raise ValueError( + "RoboflowDataCollector block cannot run without Roboflow API key. " + "If you do not know how to get API key - visit " + "https://docs.roboflow.com/api-reference/authentication#retrieve-an-api-key to learn how to " + "retrieve one." + ) + if disable_sink: + return [ + { + "error_status": False, + "message": "Sink was disabled by parameter `disable_sink`", + } + for _ in range(len(images)) + ] + result = [] + predictions = [None] * len(images) if predictions is None else predictions + for image, prediction in zip(images, predictions): + error_status, message = maybe_register_datapoint_at_roboflow( + image=image, + prediction=prediction, + target_project=target_project, + usage_quota_name=usage_quota_name, + data_percentage=data_percentage, + persist_predictions=persist_predictions, + minutely_usage_limit=minutely_usage_limit, + hourly_usage_limit=hourly_usage_limit, + daily_usage_limit=daily_usage_limit, + max_image_size=max_image_size, + compression_level=compression_level, + registration_tags=registration_tags, + fire_and_forget=fire_and_forget, + labeling_batch_prefix=labeling_batch_prefix, + new_labeling_batch_frequency=labeling_batches_recreation_frequency, + cache=self._cache, + background_tasks=self._background_tasks, + thread_pool_executor=self._thread_pool_executor, + api_key=self._api_key, + ) + result.append({"error_status": error_status, "message": message}) + return result + + +def maybe_register_datapoint_at_roboflow( + image: WorkflowImageData, + prediction: Optional[Union[sv.Detections, dict]], + target_project: str, + usage_quota_name: str, + data_percentage: float, + persist_predictions: bool, + minutely_usage_limit: int, + hourly_usage_limit: int, + daily_usage_limit: int, + max_image_size: Tuple[int, int], + compression_level: int, + registration_tags: List[str], + fire_and_forget: bool, + labeling_batch_prefix: str, + new_labeling_batch_frequency: BatchCreationFrequency, + cache: BaseCache, + background_tasks: Optional[BackgroundTasks], + thread_pool_executor: Optional[ThreadPoolExecutor], + api_key: str, +) -> Tuple[bool, str]: + normalised_probability = data_percentage / 100 + if random.random() < normalised_probability: + return register_datapoint_at_roboflow( + image=image, + prediction=prediction, + target_project=target_project, + usage_quota_name=usage_quota_name, + persist_predictions=persist_predictions, + minutely_usage_limit=minutely_usage_limit, + hourly_usage_limit=hourly_usage_limit, + daily_usage_limit=daily_usage_limit, + max_image_size=max_image_size, + compression_level=compression_level, + registration_tags=registration_tags, + fire_and_forget=fire_and_forget, + labeling_batch_prefix=labeling_batch_prefix, + new_labeling_batch_frequency=new_labeling_batch_frequency, + cache=cache, + background_tasks=background_tasks, + thread_pool_executor=thread_pool_executor, + api_key=api_key, + ) + return False, "Registration skipped due to sampling settings" diff --git a/tests/workflows/integration_tests/execution/test_workflow_with_active_learning_sink.py b/tests/workflows/integration_tests/execution/test_workflow_with_active_learning_sink.py new file mode 100644 index 000000000..1d6658723 --- /dev/null +++ b/tests/workflows/integration_tests/execution/test_workflow_with_active_learning_sink.py @@ -0,0 +1,177 @@ +import numpy as np +import pytest + +from inference.core.env import WORKFLOWS_MAX_CONCURRENT_STEPS +from inference.core.managers.base import ModelManager +from inference.core.workflows.core_steps.common.entities import StepExecutionMode +from inference.core.workflows.execution_engine.core import ExecutionEngine +from tests.workflows.integration_tests.execution.workflows_gallery_collector.decorators import ( + add_to_workflows_gallery, +) + +ACTIVE_LEARNING_WORKFLOW = { + "version": "1.0", + "inputs": [ + {"type": "WorkflowImage", "name": "image"}, + {"type": "WorkflowParameter", "name": "data_percentage", "default_value": 50.0}, + { + "type": "WorkflowParameter", + "name": "persist_predictions", + "default_value": True, + }, + {"type": "WorkflowParameter", "name": "tag", "default_value": "my_tag"}, + {"type": "WorkflowParameter", "name": "disable_sink", "default_value": False}, + {"type": "WorkflowParameter", "name": "fire_and_forget", "default_value": True}, + { + "type": "WorkflowParameter", + "name": "labeling_batch_prefix", + "default_value": "some", + }, + ], + "steps": [ + { + "type": "roboflow_core/roboflow_object_detection_model@v1", + "name": "general_detection", + "image": "$inputs.image", + "model_id": "yolov8n-640", + "class_filter": ["dog"], + }, + { + "type": "roboflow_core/dynamic_crop@v1", + "name": "cropping", + "image": "$inputs.image", + "predictions": "$steps.general_detection.predictions", + }, + { + "type": "roboflow_core/roboflow_classification_model@v1", + "name": "breds_classification", + "image": "$steps.cropping.crops", + "model_id": "dog-breed-xpaq6/1", + }, + { + "type": "roboflow_core/roboflow_dataset_upload@v2", + "name": "data_collection", + "images": "$steps.cropping.crops", + "predictions": "$steps.breds_classification.predictions", + "target_project": "my_project", + "usage_quota_name": "my_quota", + "data_percentage": "$inputs.data_percentage", + "persist_predictions": "$inputs.persist_predictions", + "minutely_usage_limit": 10, + "hourly_usage_limit": 100, + "daily_usage_limit": 1000, + "max_image_size": (100, 200), + "compression_level": 85, + "registration_tags": ["a", "b", "$inputs.tag"], + "disable_sink": "$inputs.disable_sink", + "fire_and_forget": "$inputs.fire_and_forget", + "labeling_batch_prefix": "$inputs.labeling_batch_prefix", + "labeling_batches_recreation_frequency": "never", + }, + ], + "outputs": [ + { + "type": "JsonField", + "name": "predictions", + "selector": "$steps.breds_classification.predictions", + }, + { + "type": "JsonField", + "name": "registration_message", + "selector": "$steps.data_collection.message", + }, + ], +} + + +@add_to_workflows_gallery( + category="Workflows enhanced by Roboflow Platform", + use_case_title="Data Collection for Active Learning", + use_case_description=""" +This example showcases how to stack models on top of each other - in this particular +case, we detect objects using object detection models, requesting only "dogs" bounding boxes +in the output of prediction. Additionally, we register cropped images in Roboflow dataset. + +Thanks to this setup, we are able to collect production data and continuously train better models over time. +""", + workflow_definition=ACTIVE_LEARNING_WORKFLOW, +) +def test_detection_plus_classification_workflow_when_minimal_valid_input_provided( + model_manager: ModelManager, + dogs_image: np.ndarray, + roboflow_api_key: str, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": roboflow_api_key, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + execution_engine = ExecutionEngine.init( + workflow_definition=ACTIVE_LEARNING_WORKFLOW, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + # when + result = execution_engine.run( + runtime_parameters={ + "image": dogs_image, + "data_percentage": 0.0, + } + ) + + assert isinstance(result, list), "Expected list to be delivered" + assert len(result) == 1, "Expected 1 element in the output for one input image" + assert set(result[0].keys()) == { + "predictions", + "registration_message", + }, "Expected all declared outputs to be delivered" + assert ( + len(result[0]["predictions"]) == 2 + ), "Expected 2 dogs crops on input image, hence 2 nested classification results" + assert [result[0]["predictions"][0]["top"], result[0]["predictions"][1]["top"]] == [ + "116.Parson_russell_terrier", + "131.Wirehaired_pointing_griffon", + ], "Expected predictions to be as measured in reference run" + assert ( + result[0]["registration_message"] + == ["Registration skipped due to sampling settings"] * 2 + ), "Expected data not registered due to sampling" + + +def test_detection_plus_classification_workflow_when_nothing_to_be_registered( + model_manager: ModelManager, + crowd_image: np.ndarray, + roboflow_api_key: str, +) -> None: + # given + workflow_init_parameters = { + "workflows_core.model_manager": model_manager, + "workflows_core.api_key": roboflow_api_key, + "workflows_core.step_execution_mode": StepExecutionMode.LOCAL, + } + execution_engine = ExecutionEngine.init( + workflow_definition=ACTIVE_LEARNING_WORKFLOW, + init_parameters=workflow_init_parameters, + max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, + ) + # when + result = execution_engine.run( + runtime_parameters={ + "image": crowd_image, + "data_percentage": 0.0, + } + ) + + assert isinstance(result, list), "Expected list to be delivered" + assert len(result) == 1, "Expected 1 element in the output for one input image" + assert set(result[0].keys()) == { + "predictions", + "registration_message", + }, "Expected all declared outputs to be delivered" + assert ( + len(result[0]["predictions"]) == 0 + ), "Expected 0 dogs crops on input image, hence 0 nested classification results" + assert ( + len(result[0]["registration_message"]) == 0 + ), "Expected 0 dogs crops on input image, hence 0 nested statuses of registration" diff --git a/tests/workflows/unit_tests/core_steps/sinks/roboflow/roboflow_dataset_upload/__init__.py b/tests/workflows/unit_tests/core_steps/sinks/roboflow/roboflow_dataset_upload/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/workflows/unit_tests/core_steps/sinks/roboflow/test_roboflow_dataset_upload.py b/tests/workflows/unit_tests/core_steps/sinks/roboflow/roboflow_dataset_upload/test_v1.py similarity index 100% rename from tests/workflows/unit_tests/core_steps/sinks/roboflow/test_roboflow_dataset_upload.py rename to tests/workflows/unit_tests/core_steps/sinks/roboflow/roboflow_dataset_upload/test_v1.py diff --git a/tests/workflows/unit_tests/core_steps/sinks/roboflow/roboflow_dataset_upload/test_v2.py b/tests/workflows/unit_tests/core_steps/sinks/roboflow/roboflow_dataset_upload/test_v2.py new file mode 100644 index 000000000..6a9134881 --- /dev/null +++ b/tests/workflows/unit_tests/core_steps/sinks/roboflow/roboflow_dataset_upload/test_v2.py @@ -0,0 +1,374 @@ +from typing import Optional, Union +from unittest import mock +from unittest.mock import MagicMock + +import numpy as np +import pytest +from fastapi import BackgroundTasks +from pydantic import ValidationError + +from inference.core.cache import MemoryCache +from inference.core.workflows.core_steps.sinks.roboflow.dataset_upload import v2 +from inference.core.workflows.core_steps.sinks.roboflow.dataset_upload.v2 import ( + BlockManifest, + RoboflowDatasetUploadBlockV2, + maybe_register_datapoint_at_roboflow, +) +from inference.core.workflows.execution_engine.entities.base import ( + Batch, + ImageParentMetadata, + WorkflowImageData, +) + + +@mock.patch.object(v2, "register_datapoint_at_roboflow") +@mock.patch.object(v2, "random") +def test_maybe_register_datapoint_at_roboflow_when_data_sampled_off( + random_mock: MagicMock, + register_datapoint_at_roboflow_mock: MagicMock, +) -> None: + # given + random_mock.random.return_value = 0.38 + + # when + result = maybe_register_datapoint_at_roboflow( + image=MagicMock(), + prediction=MagicMock(), + target_project="some", + usage_quota_name="some", + data_percentage=36.0, + persist_predictions=True, + minutely_usage_limit=10, + hourly_usage_limit=100, + daily_usage_limit=1000, + max_image_size=(128, 128), + compression_level=75, + registration_tags=[], + fire_and_forget=False, + labeling_batch_prefix="some", + new_labeling_batch_frequency="never", + cache=MagicMock(), + background_tasks=MagicMock(), + thread_pool_executor=MagicMock(), + api_key="XXX", + ) + + # then + register_datapoint_at_roboflow_mock.assert_not_called() + assert result == (False, "Registration skipped due to sampling settings") + + +@mock.patch.object(v2, "register_datapoint_at_roboflow") +@mock.patch.object(v2, "random") +def test_maybe_register_datapoint_at_roboflow_when_data_sample_accepted( + random_mock: MagicMock, + register_datapoint_at_roboflow_mock: MagicMock, +) -> None: + # given + random_mock.random.return_value = 0.38 + register_datapoint_at_roboflow_mock.return_value = (False, "ok") + + # when + result = maybe_register_datapoint_at_roboflow( + image=MagicMock(), + prediction=MagicMock(), + target_project="some", + usage_quota_name="some", + data_percentage=40.0, + persist_predictions=True, + minutely_usage_limit=10, + hourly_usage_limit=100, + daily_usage_limit=1000, + max_image_size=(128, 128), + compression_level=75, + registration_tags=[], + fire_and_forget=False, + labeling_batch_prefix="some", + new_labeling_batch_frequency="never", + cache=MagicMock(), + background_tasks=MagicMock(), + thread_pool_executor=MagicMock(), + api_key="XXX", + ) + + # then + register_datapoint_at_roboflow_mock.assert_called_once(), "Expected to be called when data is sampled" + assert result == (False, "ok"), "Expected to see mock return value" + + +@pytest.mark.parametrize("image_field_name", ["image", "images"]) +@pytest.mark.parametrize("image_selector", ["$inputs.image", "$steps.some.image"]) +@pytest.mark.parametrize("predictions", ["$steps.some.predictions", None]) +def test_manifest_parsing_when_valid_input_provided_and_fields_not_linked( + image_field_name: str, + image_selector: str, + predictions: Optional[str], +) -> None: + # given + raw_manifest = { + "type": "roboflow_core/roboflow_dataset_upload@v2", + "name": "some", + image_field_name: image_selector, + "predictions": predictions, + "target_project": "some1", + "usage_quota_name": "my_quota", + "data_percentage": 37.5, + "persist_predictions": False, + "minutely_usage_limit": 10, + "hourly_usage_limit": 100, + "daily_usage_limit": 1000, + "max_image_size": (100, 200), + "compression_level": 100, + "registration_tags": ["a", "b"], + "disable_sink": True, + "fire_and_forget": False, + "labeling_batch_prefix": "my_batch", + "labeling_batches_recreation_frequency": "never", + } + + # when + result = BlockManifest.model_validate(raw_manifest) + + # then + assert result == BlockManifest( + type="roboflow_core/roboflow_dataset_upload@v2", + name="some", + images=image_selector, + predictions=predictions, + target_project="some1", + usage_quota_name="my_quota", + data_percentage=37.5, + persist_predictions=False, + minutely_usage_limit=10, + hourly_usage_limit=100, + daily_usage_limit=1000, + max_image_size=(100, 200), + compression_level=100, + registration_tags=["a", "b"], + disable_sink=True, + fire_and_forget=False, + labeling_batch_prefix="my_batch", + labeling_batches_recreation_frequency="never", + ) + + +@pytest.mark.parametrize("image_field_name", ["image", "images"]) +@pytest.mark.parametrize("image_selector", ["$inputs.image", "$steps.some.image"]) +@pytest.mark.parametrize("predictions", ["$steps.some.predictions", None]) +def test_manifest_parsing_when_valid_input_provided_and_fields_linked( + image_field_name: str, + image_selector: str, + predictions: Optional[str], +) -> None: + # given + raw_manifest = { + "type": "roboflow_core/roboflow_dataset_upload@v2", + "name": "some", + image_field_name: image_selector, + "predictions": predictions, + "target_project": "some1", + "usage_quota_name": "my_quota", + "data_percentage": "$inputs.data_percentage", + "persist_predictions": "$inputs.persist_predictions", + "minutely_usage_limit": 10, + "hourly_usage_limit": 100, + "daily_usage_limit": 1000, + "max_image_size": (100, 200), + "compression_level": 100, + "registration_tags": ["a", "b", "$inputs.tag"], + "disable_sink": "$inputs.disable_sink", + "fire_and_forget": "$inputs.fire_and_forget", + "labeling_batch_prefix": "$inputs.labeling_batch_prefix", + "labeling_batches_recreation_frequency": "never", + } + + # when + result = BlockManifest.model_validate(raw_manifest) + + # then + assert result == BlockManifest( + type="roboflow_core/roboflow_dataset_upload@v2", + name="some", + images=image_selector, + predictions=predictions, + target_project="some1", + usage_quota_name="my_quota", + data_percentage="$inputs.data_percentage", + persist_predictions="$inputs.persist_predictions", + minutely_usage_limit=10, + hourly_usage_limit=100, + daily_usage_limit=1000, + max_image_size=(100, 200), + compression_level=100, + registration_tags=["a", "b", "$inputs.tag"], + disable_sink="$inputs.disable_sink", + fire_and_forget="$inputs.fire_and_forget", + labeling_batch_prefix="$inputs.labeling_batch_prefix", + labeling_batches_recreation_frequency="never", + ) + + +@pytest.mark.parametrize("compression_level", [0, -1, 101]) +def test_manifest_parsing_when_compression_level_invalid( + compression_level: float, +) -> None: + raw_manifest = { + "type": "roboflow_core/roboflow_dataset_upload@v2", + "name": "some", + "images": "$inputs.image", + "predictions": None, + "target_project": "some1", + "usage_quota_name": "my_quota", + "data_percentage": "$inputs.data_percentage", + "persist_predictions": "$inputs.persist_predictions", + "minutely_usage_limit": 10, + "hourly_usage_limit": 100, + "daily_usage_limit": 1000, + "max_image_size": (100, 200), + "compression_level": compression_level, + "registration_tags": ["a", "b", "$inputs.tag"], + "disable_sink": "$inputs.disable_sink", + "fire_and_forget": "$inputs.fire_and_forget", + "labeling_batch_prefix": "$inputs.labeling_batch_prefix", + "labeling_batches_recreation_frequency": "never", + } + + # when + with pytest.raises(ValidationError): + _ = BlockManifest.model_validate(raw_manifest) + + +@pytest.mark.parametrize("data_percentage", [-1.0, 101.0]) +def test_manifest_parsing_when_data_percentage_invalid(data_percentage: float) -> None: + raw_manifest = { + "type": "roboflow_core/roboflow_dataset_upload@v2", + "name": "some", + "images": "$inputs.image", + "predictions": None, + "target_project": "some1", + "usage_quota_name": "my_quota", + "data_percentage": data_percentage, + "persist_predictions": "$inputs.persist_predictions", + "minutely_usage_limit": 10, + "hourly_usage_limit": 100, + "daily_usage_limit": 1000, + "max_image_size": (100, 200), + "compression_level": 85, + "registration_tags": ["a", "b", "$inputs.tag"], + "disable_sink": "$inputs.disable_sink", + "fire_and_forget": "$inputs.fire_and_forget", + "labeling_batch_prefix": "$inputs.labeling_batch_prefix", + "labeling_batches_recreation_frequency": "never", + } + + # when + with pytest.raises(ValidationError): + _ = BlockManifest.model_validate(raw_manifest) + + +@mock.patch.object(v2, "register_datapoint_at_roboflow") +def test_run_sink_when_data_sampled_off( + register_datapoint_at_roboflow_mock: MagicMock, +) -> None: + # given + background_tasks = BackgroundTasks() + cache = MemoryCache() + data_collector_block = RoboflowDatasetUploadBlockV2( + cache=cache, + api_key="my_api_key", + background_tasks=background_tasks, + thread_pool_executor=None, + ) + image = WorkflowImageData( + parent_metadata=ImageParentMetadata(parent_id="parent"), + numpy_image=np.zeros((512, 256, 3), dtype=np.uint8), + ) + register_datapoint_at_roboflow_mock.return_value = False, "OK" + indices = [(0,), (1,), (2,)] + + # when + result = data_collector_block.run( + images=Batch(content=[image, image, image], indices=indices), + predictions=None, + target_project="my_project", + usage_quota_name="my_quota", + data_percentage=0.0, + persist_predictions=True, + minutely_usage_limit=10, + hourly_usage_limit=100, + daily_usage_limit=1000, + max_image_size=(128, 128), + compression_level=75, + registration_tags=["some"], + disable_sink=False, + fire_and_forget=False, + labeling_batch_prefix="my_batch", + labeling_batches_recreation_frequency="never", + ) + + # then + assert ( + result + == [ + { + "error_status": False, + "message": "Registration skipped due to sampling settings", + } + ] + * 3 + ), "Expected nothing registered" + register_datapoint_at_roboflow_mock.assert_not_called() + + +@mock.patch.object(v2, "register_datapoint_at_roboflow") +def test_run_sink_when_data_sampled( + register_datapoint_at_roboflow_mock: MagicMock, +) -> None: + # given + background_tasks = BackgroundTasks() + cache = MemoryCache() + data_collector_block = RoboflowDatasetUploadBlockV2( + cache=cache, + api_key="my_api_key", + background_tasks=background_tasks, + thread_pool_executor=None, + ) + image = WorkflowImageData( + parent_metadata=ImageParentMetadata(parent_id="parent"), + numpy_image=np.zeros((512, 256, 3), dtype=np.uint8), + ) + register_datapoint_at_roboflow_mock.return_value = False, "OK" + indices = [(0,), (1,), (2,)] + + # when + result = data_collector_block.run( + images=Batch(content=[image, image, image], indices=indices), + predictions=None, + target_project="my_project", + usage_quota_name="my_quota", + data_percentage=100.1, # more than 1.0 to ensure always True on sampling + persist_predictions=True, + minutely_usage_limit=10, + hourly_usage_limit=100, + daily_usage_limit=1000, + max_image_size=(128, 128), + compression_level=75, + registration_tags=["some"], + disable_sink=False, + fire_and_forget=False, + labeling_batch_prefix="my_batch", + labeling_batches_recreation_frequency="never", + ) + + # then + assert ( + result + == [ + { + "error_status": False, + "message": "OK", + } + ] + * 3 + ), "Expected data registered" + assert register_datapoint_at_roboflow_mock.call_count == 3