From d95e2362336af494f6bcb0162be16c158820a714 Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Fri, 18 Oct 2024 08:36:10 +0000 Subject: [PATCH 01/16] reorg +azure o1 model --- eureka_ml_insights/configs/model_configs.py | 65 +++++--- eureka_ml_insights/models/__init__.py | 54 +++---- eureka_ml_insights/models/models.py | 168 +++++++++++--------- 3 files changed, 161 insertions(+), 126 deletions(-) diff --git a/eureka_ml_insights/configs/model_configs.py b/eureka_ml_insights/configs/model_configs.py index 7b72988..f3588d8 100644 --- a/eureka_ml_insights/configs/model_configs.py +++ b/eureka_ml_insights/configs/model_configs.py @@ -3,14 +3,16 @@ You can also add your custom models here by following the same pattern as the existing configs. """ from eureka_ml_insights.models import ( - ClaudeModels, - GeminiModels, - LlamaServerlessAzureRestEndpointModels, - LLaVA, - LLaVAHuggingFaceMM, - MistralServerlessAzureRestEndpointModels, - OpenAIModelsOAI, - RestEndpointModels, + ClaudeModel, + GeminiModel, + LlamaServerlessAzureRestEndpointModel, + LLaVAModel, + LLaVAHuggingFaceModel, + MistralServerlessAzureRestEndpointModel, + DirectOpenAIModel, + AzureOpenAIO1Model, + DirectOpenAIO1Model, + RestEndpointModel, ) from .config import ModelConfig @@ -22,8 +24,25 @@ "key_vault_url": None, } +OAI_O1_PREVIEW_CONFIG = ModelConfig( + DirectOpenAIO1Model, + { + "model_name": "o1-preview", + "secret_key_params": OPENAI_SECRET_KEY_PARAMS, + }, +) + +OAI_O1_PREVIEW_AUZRE_CONFIG = ModelConfig( + AzureOpenAIO1Model, + { + "model_name": "o1-preview", + "url": "your/endpoint/url", + "api_version": "2024-08-01-preview", + } +) + OAI_GPT4_1106_PREVIEW_CONFIG = ModelConfig( - OpenAIModelsOAI, + DirectOpenAIModel, { "model_name": "gpt-4-1106-preview", "secret_key_params": OPENAI_SECRET_KEY_PARAMS, @@ -31,7 +50,7 @@ ) OAI_GPT4V_1106_VISION_PREVIEW_CONFIG = ModelConfig( - OpenAIModelsOAI, + DirectOpenAIModel, { "model_name": "gpt-4-1106-vision-preview", "secret_key_params": OPENAI_SECRET_KEY_PARAMS, @@ -39,7 +58,7 @@ ) OAI_GPT4V_TURBO_2024_04_09_CONFIG = ModelConfig( - OpenAIModelsOAI, + DirectOpenAIModel, { "model_name": "gpt-4-turbo-2024-04-09", "secret_key_params": OPENAI_SECRET_KEY_PARAMS, @@ -47,7 +66,7 @@ ) OAI_GPT4O_2024_05_13_CONFIG = ModelConfig( - OpenAIModelsOAI, + DirectOpenAIModel, { "model_name": "gpt-4o-2024-05-13", "secret_key_params": OPENAI_SECRET_KEY_PARAMS, @@ -63,7 +82,7 @@ } GEMINI_V15_PRO_CONFIG = ModelConfig( - GeminiModels, + GeminiModel, { "model_name": "gemini-1.5-pro", "secret_key_params": GEMINI_SECRET_KEY_PARAMS, @@ -71,7 +90,7 @@ ) GEMINI_V1_PRO_CONFIG = ModelConfig( - GeminiModels, + GeminiModel, { "model_name": "gemini-1.0-pro", "secret_key_params": GEMINI_SECRET_KEY_PARAMS, @@ -86,7 +105,7 @@ } CLAUDE_3_OPUS_CONFIG = ModelConfig( - ClaudeModels, + ClaudeModel, { "model_name": "claude-3-opus-20240229", "secret_key_params": CLAUDE_SECRET_KEY_PARAMS, @@ -94,7 +113,7 @@ ) CLAUDE_3_5_SONNET_CONFIG = ModelConfig( - ClaudeModels, + ClaudeModel, { "secret_key_params": CLAUDE_SECRET_KEY_PARAMS, "model_name": "claude-3-5-sonnet-20240620", @@ -103,29 +122,29 @@ # LLAVA models LLAVAHF_V16_34B_CONFIG = ModelConfig( - LLaVAHuggingFaceMM, + LLaVAHuggingFaceModel, {"model_name": "llava-hf/llava-v1.6-34b-hf", "use_flash_attn": True}, ) LLAVAHF_V15_7B_CONFIG = ModelConfig( - LLaVAHuggingFaceMM, + LLaVAHuggingFaceModel, {"model_name": "llava-hf/llava-1.5-7b-hf", "use_flash_attn": True}, ) LLAVA_V16_34B_CONFIG = ModelConfig( - LLaVA, + LLaVAModel, {"model_name": "liuhaotian/llava-v1.6-34b", "use_flash_attn": True}, ) LLAVA_V15_7B_CONFIG = ModelConfig( - LLaVA, + LLaVAModel, {"model_name": "liuhaotian/llava-v1.5-7b", "use_flash_attn": True}, ) # Llama models LLAMA3_1_70B_INSTRUCT_CONFIG = ModelConfig( - RestEndpointModels, + RestEndpointModel, { "url": "your/endpoint/url", "secret_key_params": { @@ -138,7 +157,7 @@ ) LLAMA3_1_405B_INSTRUCT_CONFIG = ModelConfig( - LlamaServerlessAzureRestEndpointModels, + LlamaServerlessAzureRestEndpointModel, { "url": "your/endpoint/url", "secret_key_params": { @@ -152,7 +171,7 @@ # Mistral Endpoints AIF_NT_MISTRAL_LARGE_2_2407_CONFIG = ModelConfig( - MistralServerlessAzureRestEndpointModels, + MistralServerlessAzureRestEndpointModel, { "url": "your/endpoint/url", "secret_key_params": { diff --git a/eureka_ml_insights/models/__init__.py b/eureka_ml_insights/models/__init__.py index c7a6926..a945901 100644 --- a/eureka_ml_insights/models/__init__.py +++ b/eureka_ml_insights/models/__init__.py @@ -1,32 +1,32 @@ from .models import ( - ClaudeModels, - GeminiModels, - HuggingFaceLM, - LlamaServerlessAzureRestEndpointModels, - LLaVA, - LLaVAHuggingFaceMM, - MistralServerlessAzureRestEndpointModels, - OpenAIModelsMixIn, - OpenAIModelsAzure, - OpenAIModelsOAI, - OpenAIO1Direct, - Phi3HF, - KeyBasedAuthentication, - EndpointModels, - RestEndpointModels + ClaudeModel, + GeminiModel, + HuggingFaceModel, + LlamaServerlessAzureRestEndpointModel, + LLaVAModel, + LLaVAHuggingFaceModel, + MistralServerlessAzureRestEndpointModel, + AzureOpenAIModel, + DirectOpenAIModel, + AzureOpenAIO1Model, + DirectOpenAIO1Model, + Phi3HFModel, + KeyBasedAuthMixIn, + EndpointModel, + RestEndpointModel ) __all__ = [ - OpenAIModelsMixIn, - OpenAIO1Direct, - HuggingFaceLM, - LLaVAHuggingFaceMM, - Phi3HF, - OpenAIModelsOAI, - OpenAIModelsAzure, - GeminiModels, - ClaudeModels, - MistralServerlessAzureRestEndpointModels, - LlamaServerlessAzureRestEndpointModels, - LLaVA, + AzureOpenAIO1Model, + DirectOpenAIO1Model, + HuggingFaceModel, + LLaVAHuggingFaceModel, + Phi3HFModel, + DirectOpenAIModel, + AzureOpenAIModel, + GeminiModel, + ClaudeModel, + MistralServerlessAzureRestEndpointModel, + LlamaServerlessAzureRestEndpointModel, + LLaVAModel, ] diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index c5ab5a6..ab6e187 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -62,7 +62,7 @@ def base64encode(self, query_images): @dataclass -class KeyBasedAuthentication: +class KeyBasedAuthMixIn: """This class is used to handle key-based authentication for models.""" api_key: str = None @@ -73,6 +73,7 @@ def __post_init__(self): raise ValueError("Either api_key or secret_key_params must be provided.") self.api_key = self.get_api_key() + def get_api_key(self): """ This method is used to get the api_key for the models that require key-based authentication. @@ -80,12 +81,12 @@ def get_api_key(self): if api_key is not directly provided, secret_key_params must be provided to get the api_key using GetKey method. """ if self.api_key is None: - self.api_key = GetKey(**self.secret_key_params) + self.api_key = GetKey(**self.secret_key_params) return self.api_key @dataclass -class EndpointModels(Model): +class EndpointModel(Model): """This class is used to interact with API-based models.""" num_retries: int = 3 @@ -149,7 +150,7 @@ def handle_request_error(self, e): @dataclass -class RestEndpointModels(EndpointModels, KeyBasedAuthentication): +class RestEndpointModel(EndpointModel, KeyBasedAuthMixIn): url: str = None model_name: str = None temperature: float = 0 @@ -206,14 +207,14 @@ def get_response(self, request): def handle_request_error(self, e): if isinstance(e, urllib.error.HTTPError): logging.info("The request failed with status code: " + str(e.code)) - # Print the headers - they include the requert ID and the timestamp, which are useful for debugging. + # Print the headers - they include the request ID and the timestamp, which are useful for debugging. logging.info(e.info()) logging.info(e.read().decode("utf8", "ignore")) return False @dataclass -class ServerlessAzureRestEndpointModels(EndpointModels, KeyBasedAuthentication): +class ServerlessAzureRestEndpointModel(EndpointModel, KeyBasedAuthMixIn): """This class can be used for serverless Azure model deployments.""" """https://learn.microsoft.com/en-us/azure/ai-studio/how-to/deploy-models-serverless?tabs=azure-ai-studio""" @@ -222,11 +223,19 @@ class ServerlessAzureRestEndpointModels(EndpointModels, KeyBasedAuthentication): stream: str = "false" def __post_init__(self): - super().__post_init__() - self.headers = { - "Content-Type": "application/json", - "Authorization": ("Bearer " + self.api_key), - } + try: + super().__post_init__() + self.headers = { + "Content-Type": "application/json", + "Authorization": ("Bearer " + self.api_key), + } + except ValueError: + self.bearer_token_provider = get_bearer_token_provider(AzureCliCredential(), "https://cognitiveservices.azure.com/.default") + headers = { + "Content-Type": "application/json", + "Authorization": ("Bearer " + self.bearer_token_provider()), + } + @abstractmethod def create_request(self, text_prompt, query_images=None, system_message=None): @@ -252,7 +261,7 @@ def handle_request_error(self, e): @dataclass -class LlamaServerlessAzureRestEndpointModels(ServerlessAzureRestEndpointModels, KeyBasedAuthentication): +class LlamaServerlessAzureRestEndpointModel(ServerlessAzureRestEndpointModel): """Tested for Llama 3.1 405B Instruct deployments.""" """See https://learn.microsoft.com/en-us/azure/ai-studio/how-to/deploy-models-llama?tabs=llama-three for the api reference.""" @@ -285,7 +294,7 @@ def create_request(self, text_prompt, *args): @dataclass -class MistralServerlessAzureRestEndpointModels(ServerlessAzureRestEndpointModels, KeyBasedAuthentication): +class MistralServerlessAzureRestEndpointModel(ServerlessAzureRestEndpointModel): """Tested for Mistral Large 2 2407 deployments.""" """See https://learn.microsoft.com/en-us/azure/ai-studio/how-to/deploy-models-mistral?tabs=mistral-large#mistral-chat-api for the api reference.""" @@ -317,26 +326,11 @@ def create_request(self, text_prompt, *args): @dataclass -class OpenAIModelsMixIn(EndpointModels): +class OpenAICommonRequestResponseMixIn(): """ - This class defines the request and response handling for OpenAI models. - This is an abstract class and should not be used directly. Child classes should implement the get_client - method and handle_request_error method. + This mixin class defines the request and response handling for most OpenAI models. """ - model_name: str = None - temperature: float = 0 - max_tokens: int = 2000 - top_p: float = 0.95 - frequency_penalty: float = 0 - presence_penalty: float = 0 - seed: int = 0 - api_version: str = "2023-06-01-preview" - - @abstractmethod - def get_client(self): - raise NotImplementedError - def create_request(self, prompt, query_images=None, system_message=None): messages = [] if system_message: @@ -373,20 +367,9 @@ def get_response(self, request): self.model_output = openai_response["choices"][0]["message"]["content"] self.response_time = end_time - start_time - @abstractmethod - def handle_request_error(self, e): - raise NotImplementedError - - -@dataclass -class OpenAIModelsAzure(OpenAIModelsMixIn): - """This class is used to interact with Azure OpenAI models.""" - - url: str = None - - def __post_init__(self): - self.client = self.get_client() +class AzureOpenAIClientMixIn(): + """This mixin provides some methods to interact with Azure OpenAI models.""" def get_client(self): from openai import AzureOpenAI @@ -406,14 +389,8 @@ def handle_request_error(self, e): return False -@dataclass -class OpenAIModelsOAI(OpenAIModelsMixIn, KeyBasedAuthentication): - """This class is used to interact with OpenAI models dirctly (not through Azure)""" - - def __post_init__(self): - super().__post_init__() - self.client = self.get_client() - +class DirectOpenAIClientMixIn(KeyBasedAuthMixIn): + """This mixin class provides some methods for using OpenAI models dirctly (not through Azure)""" def get_client(self): from openai import OpenAI @@ -425,32 +402,40 @@ def handle_request_error(self, e): logging.warning(e) return False - @dataclass -class OpenAIO1Direct(EndpointModels, KeyBasedAuthentication): +class AzureOpenAIModel(OpenAICommonRequestResponseMixIn, AzureOpenAIClientMixIn, EndpointModel): + """This class is used to interact with Azure OpenAI models.""" + url: str = None model_name: str = None - temperature: float = 1 - # Not used currently, because the API throws: - # "Completions.create() got an unexpected keyword argument 'max_completion_tokens'" - # although this argument is documented in the OpenAI API documentation. - max_completion_tokens: int = 2000 - top_p: float = 1 - seed: int = 0 + temperature: float = 0 + max_tokens: int = 2000 + top_p: float = 0.95 frequency_penalty: float = 0 presence_penalty: float = 0 + seed: int = 0 + api_version: str = "2023-06-01-preview" def __post_init__(self): - super().__post_init__() self.client = self.get_client() - def get_client(self): - from openai import OpenAI +@dataclass +class DirectOpenAIModel(OpenAICommonRequestResponseMixIn, DirectOpenAIClientMixIn, EndpointModel): + """This class is used to interact with OpenAI models dirctly (not through Azure)""" + model_name: str = None + temperature: float = 0 + max_tokens: int = 2000 + top_p: float = 0.95 + frequency_penalty: float = 0 + presence_penalty: float = 0 + seed: int = 0 + api_version: str = "2023-06-01-preview" - return OpenAI( - api_key=self.api_key, - ) + def __post_init__(self): + super().__post_init__() + self.client = self.get_client() - def create_request(self, prompt, *kwargs): +class OpenAIO1RequestResponseMixIn(): + def create_request(self, prompt, *args, **kwargs): messages = [{"role": "user", "content": prompt}] return {"messages": messages} @@ -470,13 +455,44 @@ def get_response(self, request): self.model_output = openai_response["choices"][0]["message"]["content"] self.response_time = end_time - start_time - def handle_request_error(self, e): - logging.warning(e) - return False +@dataclass +class DirectOpenAIO1Model(OpenAIO1RequestResponseMixIn, DirectOpenAIClientMixIn, EndpointModel): + model_name: str = None + temperature: float = 1 + # Not used currently, because the API throws: + # "Completions.create() got an unexpected keyword argument 'max_completion_tokens'" + # although this argument is documented in the OpenAI API documentation. + max_completion_tokens: int = 2000 + top_p: float = 1 + seed: int = 0 + frequency_penalty: float = 0 + presence_penalty: float = 0 + + def __post_init__(self): + self.client = super().get_client() + +@dataclass +class AzureOpenAIO1Model(OpenAIO1RequestResponseMixIn, AzureOpenAIClientMixIn, EndpointModel): + url: str = None + model_name: str = None + temperature: float = 1 + # Not used currently, because the API throws: + # "Completions.create() got an unexpected keyword argument 'max_completion_tokens'" + # although this argument is documented in the OpenAI API documentation. + max_completion_tokens: int = 2000 + top_p: float = 1 + seed: int = 0 + frequency_penalty: float = 0 + presence_penalty: float = 0 + api_version: str = "2023-06-01-preview" + + + def __post_init__(self): + self.client = super().get_client() @dataclass -class GeminiModels(EndpointModels, KeyBasedAuthentication): +class GeminiModel(EndpointModel, KeyBasedAuthMixIn): """This class is used to interact with Gemini models through the python api.""" timeout: int = 60 @@ -549,7 +565,7 @@ def handle_request_error(self, e): @dataclass -class HuggingFaceLM(Model): +class HuggingFaceModel(Model): """This class is used to run a self-hosted language model via HuggingFace apis.""" model_name: str = None @@ -647,7 +663,7 @@ def model_template_fn(self, text_prompt, system_message=None): @dataclass -class Phi3HF(HuggingFaceLM): +class Phi3HFModel(HuggingFaceModel): """This class is used to run a self-hosted PHI3 model via HuggingFace apis.""" def __post_init__(self): @@ -664,7 +680,7 @@ def model_template_fn(self, text_prompt, system_message=None): @dataclass -class LLaVAHuggingFaceMM(HuggingFaceLM): +class LLaVAHuggingFaceModel(HuggingFaceModel): """This class is used to run a self-hosted LLaVA model via HuggingFace apis.""" quantize: bool = False @@ -786,7 +802,7 @@ def model_template_fn(self, text_prompt, system_message=None): @dataclass -class LLaVA(LLaVAHuggingFaceMM): +class LLaVAModel(LLaVAHuggingFaceModel): """This class is used to run a self-hosted LLaVA model via the LLaVA package.""" model_base: str = None @@ -857,7 +873,7 @@ def _generate(self, text_prompt, query_images=None, system_message=None): @dataclass -class ClaudeModels(EndpointModels, KeyBasedAuthentication): +class ClaudeModel(EndpointModel, KeyBasedAuthMixIn): """This class is used to interact with Claude models through the python api.""" model_name: str = None From ac1d40c193ed6a89e803fb3e412fd5724798f68a Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Fri, 18 Oct 2024 17:53:33 +0000 Subject: [PATCH 02/16] bug fix --- eureka_ml_insights/models/models.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index ab6e187..2c3a32b 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -431,7 +431,7 @@ class DirectOpenAIModel(OpenAICommonRequestResponseMixIn, DirectOpenAIClientMixI api_version: str = "2023-06-01-preview" def __post_init__(self): - super().__post_init__() + self.api_key = self.get_api_key() self.client = self.get_client() class OpenAIO1RequestResponseMixIn(): @@ -469,7 +469,8 @@ class DirectOpenAIO1Model(OpenAIO1RequestResponseMixIn, DirectOpenAIClientMixIn, presence_penalty: float = 0 def __post_init__(self): - self.client = super().get_client() + self.api_key = self.get_api_key() + self.client = self.get_client() @dataclass class AzureOpenAIO1Model(OpenAIO1RequestResponseMixIn, AzureOpenAIClientMixIn, EndpointModel): @@ -488,7 +489,7 @@ class AzureOpenAIO1Model(OpenAIO1RequestResponseMixIn, AzureOpenAIClientMixIn, E def __post_init__(self): - self.client = super().get_client() + self.client = self.get_client() @dataclass From e4be6a755306942e66b535986bb83ca0b58aefe5 Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Fri, 18 Oct 2024 18:09:12 +0000 Subject: [PATCH 03/16] formatting and tests --- eureka_ml_insights/configs/model_configs.py | 14 +++++++----- eureka_ml_insights/models/__init__.py | 15 ++++++------- eureka_ml_insights/models/models.py | 24 ++++++++++++++------- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/eureka_ml_insights/configs/model_configs.py b/eureka_ml_insights/configs/model_configs.py index f3588d8..17ac457 100644 --- a/eureka_ml_insights/configs/model_configs.py +++ b/eureka_ml_insights/configs/model_configs.py @@ -3,20 +3,24 @@ You can also add your custom models here by following the same pattern as the existing configs. """ from eureka_ml_insights.models import ( + AzureOpenAIO1Model, ClaudeModel, + DirectOpenAIModel, + DirectOpenAIO1Model, GeminiModel, LlamaServerlessAzureRestEndpointModel, - LLaVAModel, LLaVAHuggingFaceModel, + LLaVAModel, MistralServerlessAzureRestEndpointModel, - DirectOpenAIModel, - AzureOpenAIO1Model, - DirectOpenAIO1Model, RestEndpointModel, ) from .config import ModelConfig +# For models that require secret keys, you can store the keys in a json file and provide the path to the file +# in the secret_key_params dictionary. OR you can provide the key name and key vault URL to fetch the key from Azure Key Vault. +# You don't need to provide both the key_vault_url and local_keys_path. You can provide one of them based on your setup. + # OpenAI models OPENAI_SECRET_KEY_PARAMS = { "key_name": "your_openai_secret_key_name", @@ -38,7 +42,7 @@ "model_name": "o1-preview", "url": "your/endpoint/url", "api_version": "2024-08-01-preview", - } + }, ) OAI_GPT4_1106_PREVIEW_CONFIG = ModelConfig( diff --git a/eureka_ml_insights/models/__init__.py b/eureka_ml_insights/models/__init__.py index a945901..fc68cc6 100644 --- a/eureka_ml_insights/models/__init__.py +++ b/eureka_ml_insights/models/__init__.py @@ -1,19 +1,17 @@ from .models import ( + AzureOpenAIModel, + AzureOpenAIO1Model, ClaudeModel, + DirectOpenAIModel, + DirectOpenAIO1Model, GeminiModel, HuggingFaceModel, LlamaServerlessAzureRestEndpointModel, - LLaVAModel, LLaVAHuggingFaceModel, + LLaVAModel, MistralServerlessAzureRestEndpointModel, - AzureOpenAIModel, - DirectOpenAIModel, - AzureOpenAIO1Model, - DirectOpenAIO1Model, Phi3HFModel, - KeyBasedAuthMixIn, - EndpointModel, - RestEndpointModel + RestEndpointModel, ) __all__ = [ @@ -29,4 +27,5 @@ MistralServerlessAzureRestEndpointModel, LlamaServerlessAzureRestEndpointModel, LLaVAModel, + RestEndpointModel, ] diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index 2c3a32b..5ecfd6d 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -73,7 +73,6 @@ def __post_init__(self): raise ValueError("Either api_key or secret_key_params must be provided.") self.api_key = self.get_api_key() - def get_api_key(self): """ This method is used to get the api_key for the models that require key-based authentication. @@ -81,7 +80,7 @@ def get_api_key(self): if api_key is not directly provided, secret_key_params must be provided to get the api_key using GetKey method. """ if self.api_key is None: - self.api_key = GetKey(**self.secret_key_params) + self.api_key = GetKey(**self.secret_key_params) return self.api_key @@ -230,12 +229,13 @@ def __post_init__(self): "Authorization": ("Bearer " + self.api_key), } except ValueError: - self.bearer_token_provider = get_bearer_token_provider(AzureCliCredential(), "https://cognitiveservices.azure.com/.default") + self.bearer_token_provider = get_bearer_token_provider( + AzureCliCredential(), "https://cognitiveservices.azure.com/.default" + ) headers = { "Content-Type": "application/json", "Authorization": ("Bearer " + self.bearer_token_provider()), } - @abstractmethod def create_request(self, text_prompt, query_images=None, system_message=None): @@ -326,7 +326,7 @@ def create_request(self, text_prompt, *args): @dataclass -class OpenAICommonRequestResponseMixIn(): +class OpenAICommonRequestResponseMixIn: """ This mixin class defines the request and response handling for most OpenAI models. """ @@ -368,8 +368,9 @@ def get_response(self, request): self.response_time = end_time - start_time -class AzureOpenAIClientMixIn(): +class AzureOpenAIClientMixIn: """This mixin provides some methods to interact with Azure OpenAI models.""" + def get_client(self): from openai import AzureOpenAI @@ -391,6 +392,7 @@ def handle_request_error(self, e): class DirectOpenAIClientMixIn(KeyBasedAuthMixIn): """This mixin class provides some methods for using OpenAI models dirctly (not through Azure)""" + def get_client(self): from openai import OpenAI @@ -402,9 +404,11 @@ def handle_request_error(self, e): logging.warning(e) return False + @dataclass class AzureOpenAIModel(OpenAICommonRequestResponseMixIn, AzureOpenAIClientMixIn, EndpointModel): """This class is used to interact with Azure OpenAI models.""" + url: str = None model_name: str = None temperature: float = 0 @@ -418,9 +422,11 @@ class AzureOpenAIModel(OpenAICommonRequestResponseMixIn, AzureOpenAIClientMixIn, def __post_init__(self): self.client = self.get_client() + @dataclass class DirectOpenAIModel(OpenAICommonRequestResponseMixIn, DirectOpenAIClientMixIn, EndpointModel): """This class is used to interact with OpenAI models dirctly (not through Azure)""" + model_name: str = None temperature: float = 0 max_tokens: int = 2000 @@ -434,7 +440,8 @@ def __post_init__(self): self.api_key = self.get_api_key() self.client = self.get_client() -class OpenAIO1RequestResponseMixIn(): + +class OpenAIO1RequestResponseMixIn: def create_request(self, prompt, *args, **kwargs): messages = [{"role": "user", "content": prompt}] return {"messages": messages} @@ -455,6 +462,7 @@ def get_response(self, request): self.model_output = openai_response["choices"][0]["message"]["content"] self.response_time = end_time - start_time + @dataclass class DirectOpenAIO1Model(OpenAIO1RequestResponseMixIn, DirectOpenAIClientMixIn, EndpointModel): model_name: str = None @@ -472,6 +480,7 @@ def __post_init__(self): self.api_key = self.get_api_key() self.client = self.get_client() + @dataclass class AzureOpenAIO1Model(OpenAIO1RequestResponseMixIn, AzureOpenAIClientMixIn, EndpointModel): url: str = None @@ -487,7 +496,6 @@ class AzureOpenAIO1Model(OpenAIO1RequestResponseMixIn, AzureOpenAIClientMixIn, E presence_penalty: float = 0 api_version: str = "2023-06-01-preview" - def __post_init__(self): self.client = self.get_client() From 9f5c03313595c7b9ff98347f0508157768c0e610 Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Thu, 24 Oct 2024 00:46:36 +0000 Subject: [PATCH 04/16] dumping usage data in inference results --- eureka_ml_insights/models/__init__.py | 2 ++ eureka_ml_insights/models/models.py | 17 +++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/eureka_ml_insights/models/__init__.py b/eureka_ml_insights/models/__init__.py index fc68cc6..a4bbc88 100644 --- a/eureka_ml_insights/models/__init__.py +++ b/eureka_ml_insights/models/__init__.py @@ -6,6 +6,7 @@ DirectOpenAIO1Model, GeminiModel, HuggingFaceModel, + KeyBasedAuthMixIn, LlamaServerlessAzureRestEndpointModel, LLaVAHuggingFaceModel, LLaVAModel, @@ -18,6 +19,7 @@ AzureOpenAIO1Model, DirectOpenAIO1Model, HuggingFaceModel, + KeyBasedAuthMixIn, LLaVAHuggingFaceModel, Phi3HFModel, DirectOpenAIModel, diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index 5ecfd6d..c1f5158 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -250,6 +250,8 @@ def get_response(self, request): res = json.loads(response.read()) self.model_output = res["choices"][0]["message"]["content"] self.response_time = end_time - start_time + if "usage" in res: + return {"usage": res["usage"]} def handle_request_error(self, e): if isinstance(e, urllib.error.HTTPError): @@ -366,6 +368,8 @@ def get_response(self, request): openai_response = completion.model_dump() self.model_output = openai_response["choices"][0]["message"]["content"] self.response_time = end_time - start_time + if "usage" in openai_response: + return {"usage": openai_response["usage"]} class AzureOpenAIClientMixIn: @@ -547,6 +551,17 @@ def get_response(self, request): end_time = time.time() self.model_output = self.gemini_response.parts[0].text self.response_time = end_time - start_time + if hasattr(self.gemini_response, "usage_metadata"): + try: + return { + "usage": { + "prompt_token_count": self.gemini_response.usage_metadata.prompt_token_count, + "candidates_token_count": self.gemini_response.usage_metadata.candidates_token_count, + "total_token_count": self.gemini_response.usage_metadata.total_token_count, + } + } + except AttributeError: + logging.warning("Usage metadata not found in the response.") def handle_request_error(self, e): """Handles exceptions originating from making requests to Gemini through the python api. @@ -935,6 +950,8 @@ def get_response(self, request): end_time = time.time() self.model_output = completion.content[0].text self.response_time = end_time - start_time + if hasattr(completion, "usage"): + return {"usage": completion.usage.to_dict()} def handle_request_error(self, e): return False From 6e7715648c05f2cf0cccab28d404efd114499f97 Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Thu, 24 Oct 2024 17:45:49 +0000 Subject: [PATCH 05/16] clean up --- eureka_ml_insights/data_utils/aime_utils.py | 25 +++++---------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/eureka_ml_insights/data_utils/aime_utils.py b/eureka_ml_insights/data_utils/aime_utils.py index 3576897..62860f4 100644 --- a/eureka_ml_insights/data_utils/aime_utils.py +++ b/eureka_ml_insights/data_utils/aime_utils.py @@ -1,7 +1,3 @@ -# Part of this file was authored by authors of the Kitab dataset (https://huggingface.co/datasets/microsoft/kitab) -# Some code in this file is copied from the original source repository and then adapted to fit this repository. -# The original license for this code is Community Data License Agreement - Permissive - Version 2.0 - import re from dataclasses import dataclass @@ -24,11 +20,12 @@ def parse_output_answer(response): """ Parse the input string to extract answer of a given AIME question. Parameters: - s (str): Input string containing answer X in the form of "Final Answer: X". - Returns: - dict: A numeric value representing the model's output. - Example: 255 + response (str): Input string containing answer X in the form of "Final Answer: X". + Returns: + numerical_value (float): A numeric value representing the model's answer. """ + numerical_value = None + # Try to find an answer in the "Final Answer: X" format match = re.search(r"Final Answer:\s*([\$]?-?[\d,]+(?:\.\d+)?%?)", response) if match: @@ -37,12 +34,7 @@ def parse_output_answer(response): # If that fails, look for any number with optional $ or % in the response match = re.search(r"([\$]?-?[\d,]+(?:\.\d+)?%?)", response) answer_str = match.group(1) if match else None - # return float(answer_str) - numerical_value = None - # Store the original format - original_format = answer_str - print("----answer string", answer_str) if answer_str: # Remove $ and commas, handle percentages for numerical comparison answer_str = answer_str.replace("$", "").replace(",", "") @@ -52,13 +44,6 @@ def parse_output_answer(response): try: numerical_value = float(answer_str) except ValueError as e: - print(f"Error converting answer '{answer_str}' to numerical value: {e}") numerical_value = None - if numerical_value is not None: - print(f"Extracted Answer: {original_format}") - print(f"Numerical Value for Comparison: {numerical_value}") - else: - print("No valid answer extracted.") - return numerical_value From 6d2722bc078695b6b05489f22235e186591f78aa Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Thu, 24 Oct 2024 21:35:13 +0000 Subject: [PATCH 06/16] rm else statement --- eureka_ml_insights/data_utils/aime_utils.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/eureka_ml_insights/data_utils/aime_utils.py b/eureka_ml_insights/data_utils/aime_utils.py index 62860f4..41d7b92 100644 --- a/eureka_ml_insights/data_utils/aime_utils.py +++ b/eureka_ml_insights/data_utils/aime_utils.py @@ -24,17 +24,13 @@ def parse_output_answer(response): Returns: numerical_value (float): A numeric value representing the model's answer. """ + answer_str = None numerical_value = None # Try to find an answer in the "Final Answer: X" format match = re.search(r"Final Answer:\s*([\$]?-?[\d,]+(?:\.\d+)?%?)", response) if match: answer_str = match.group(1) - else: - # If that fails, look for any number with optional $ or % in the response - match = re.search(r"([\$]?-?[\d,]+(?:\.\d+)?%?)", response) - answer_str = match.group(1) if match else None - if answer_str: # Remove $ and commas, handle percentages for numerical comparison answer_str = answer_str.replace("$", "").replace(",", "") From 42cd6a4b874c369cd62a6fd85ca2caef1d60e0ef Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Thu, 24 Oct 2024 21:51:50 +0000 Subject: [PATCH 07/16] fixed the aime test --- eureka_ml_insights/data_utils/aime_utils.py | 4 +--- tests/data_utils_tests/aime_utils_tests.py | 14 +++++--------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/eureka_ml_insights/data_utils/aime_utils.py b/eureka_ml_insights/data_utils/aime_utils.py index 41d7b92..780b706 100644 --- a/eureka_ml_insights/data_utils/aime_utils.py +++ b/eureka_ml_insights/data_utils/aime_utils.py @@ -24,14 +24,12 @@ def parse_output_answer(response): Returns: numerical_value (float): A numeric value representing the model's answer. """ - answer_str = None numerical_value = None # Try to find an answer in the "Final Answer: X" format match = re.search(r"Final Answer:\s*([\$]?-?[\d,]+(?:\.\d+)?%?)", response) if match: answer_str = match.group(1) - if answer_str: # Remove $ and commas, handle percentages for numerical comparison answer_str = answer_str.replace("$", "").replace(",", "") if answer_str.endswith("%"): @@ -42,4 +40,4 @@ def parse_output_answer(response): except ValueError as e: numerical_value = None - return numerical_value + return numerical_value \ No newline at end of file diff --git a/tests/data_utils_tests/aime_utils_tests.py b/tests/data_utils_tests/aime_utils_tests.py index e359daa..6d45177 100644 --- a/tests/data_utils_tests/aime_utils_tests.py +++ b/tests/data_utils_tests/aime_utils_tests.py @@ -10,26 +10,22 @@ log = logging.getLogger("AIME_ExtractAnswer_tests") - class TestAIMEAnswerExtract(unittest.TestCase): def setUp(self): - testcases = ["1", "3", "0", "6", "5", "-1", "t", "10", ""] + testcases = ["1", "3", "0$", "6%", "5", "-1", "t", "10", ""] self.df = pd.DataFrame( { - "C": ["Final Answer:{0}".format(s) for s in testcases], + "C": ["Final Answer: {0} ".format(s) for s in testcases], "D": ["0", "0", "0", "0", "0", "0", "0", "0", "0"], } ) def test_answerextraction(self): - log.info("Testing AIME answer extraction") transform = AIMEExtractAnswer("C", "D") - result = transform.transform(self.df) - self.assertListEqual(list(result.columns), ["C", "D"]) - + transform.transform(self.df) # Check values, accounting for NaN - expected_values = [1.0, 3.0, 0.0, 6.0, 5.0, -1.0, float("nan"), 10.0, float("nan")] - np.testing.assert_array_equal(result["D"], expected_values) + expected_values = [1.0, 3.0, 0.0, 0.06, 5.0, -1.0, float("nan"), 10.0, float("nan")] + np.testing.assert_array_equal(self.df["D"].values, expected_values) if __name__ == "__main__": From 7eff14afb57953274365e551e7c5c0102893df39 Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Sat, 26 Oct 2024 00:03:41 +0000 Subject: [PATCH 08/16] rate limiting + parallelinf --- .github/workflows/eureka-tests.yml | 1 + eureka_ml_insights/configs/config.py | 30 ++++--- eureka_ml_insights/core/inference.py | 118 +++++++++++++++++++++++---- tests/component_tests.py | 83 ++++++++++++++++++- tests/test_utils.py | 10 +++ 5 files changed, 215 insertions(+), 27 deletions(-) diff --git a/.github/workflows/eureka-tests.yml b/.github/workflows/eureka-tests.yml index 769c6fc..eb25c17 100644 --- a/.github/workflows/eureka-tests.yml +++ b/.github/workflows/eureka-tests.yml @@ -39,5 +39,6 @@ jobs: run: | pip show eureka_ml_insights export skip_tests_with_missing_ds=1 + export skip_slow_tests=1 pwd make test \ No newline at end of file diff --git a/eureka_ml_insights/configs/config.py b/eureka_ml_insights/configs/config.py index ec49fd7..1b0098f 100644 --- a/eureka_ml_insights/configs/config.py +++ b/eureka_ml_insights/configs/config.py @@ -1,6 +1,6 @@ import copy from dataclasses import dataclass, field -from typing import Any, Type, TypeVar, List +from typing import Any, List, Type, TypeVar UtilityClassConfigType = TypeVar("UtilityClassConfigType", bound=Type["UtilityClassConfig"]) ComponentConfigType = TypeVar("ComponentConfigType", bound=Type["ComponentConfig"]) @@ -10,11 +10,12 @@ @dataclass class UtilityClassConfig: - """ Base class for all utility class configs + """Base class for all utility class configs Args: class_name (Any): The utility class to be used with this config init_args (dict): The arguments to be passed to the utility class constructor """ + class_name: Any = None init_args: dict = field(default_factory=dict) @@ -52,70 +53,78 @@ class AggregatorConfig(UtilityClassConfig): @dataclass class ComponentConfig: - """ Base class for all component configs + """Base class for all component configs Args: component_type (Any): The component class to be used with this config output_dir (str): The directory to save the output files of this component """ + component_type: Any = None output_dir: str = None @dataclass class DataProcessingConfig(ComponentConfig): - """ Config class for the data processing component + """Config class for the data processing component Args: data_reader_config (UtilityClassConfig): The data reader config to be used with this component output_data_columns (list): List of columns (subset of input columns) to keep in the transformed data output file """ + data_reader_config: UtilityClassConfigType = None output_data_columns: List[str] = None @dataclass class PromptProcessingConfig(DataProcessingConfig): - """ Config class for the prompt processing component + """Config class for the prompt processing component Args: prompt_template_path (str): The path to the prompt template jinja file ignore_failure (bool): Whether to ignore the failures in the prompt processing and move on """ + prompt_template_path: str = None ignore_failure: bool = False @dataclass class DataJoinConfig(DataProcessingConfig): - """ Config class for the data join component + """Config class for the data join component Args: other_data_reader_config (UtilityClassConfig): The data reader config for the dataset to be joined with the main dataset pandas_merge_args (dict): Arguments to be passed to pandas merge function """ + other_data_reader_config: UtilityClassConfigType = None pandas_merge_args: dict = None @dataclass class InferenceConfig(ComponentConfig): - """ Config class for the inference component + """Config class for the inference component Args: data_loader_config (UtilityClassConfig): The data loader config to be used with this component model_config (UtilityClassConfig): The model config to be used with this component resume_from (str): Optional. Path to the file where previous inference results are stored """ + data_loader_config: UtilityClassConfigType = None model_config: UtilityClassConfigType = None resume_from: str = None + n_calls_per_min: int = None + max_concurrent: int = 1 @dataclass class EvalReportingConfig(ComponentConfig): - """ Config class for the evaluation reporting component + """Config class for the evaluation reporting component Args: data_reader_config (UtilityClassConfig): The data reader config to configure the data reader for this component - metric_config (UtilityClassConfig): The metric config + metric_config (UtilityClassConfig): The metric config aggregator_configs (list): List of aggregator configs visualizer_configs (list): List of visualizer configs """ + data_reader_config: UtilityClassConfigType = None metric_config: UtilityClassConfigType = None aggregator_configs: List[UtilityClassConfigType] = field(default_factory=list) @@ -127,11 +136,12 @@ class EvalReportingConfig(ComponentConfig): @dataclass class PipelineConfig: - """ Config class for the pipeline class + """Config class for the pipeline class Args: component_configs (list): List of ComponentConfigs log_dir (str): The directory to save the logs of the pipeline """ + component_configs: list[ComponentConfigType] = field(default_factory=list) log_dir: str = None diff --git a/eureka_ml_insights/core/inference.py b/eureka_ml_insights/core/inference.py index e68ddb9..67dcfa2 100644 --- a/eureka_ml_insights/core/inference.py +++ b/eureka_ml_insights/core/inference.py @@ -1,5 +1,9 @@ +import asyncio import logging import os +import time +from collections import deque +from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm @@ -7,9 +11,11 @@ from .pipeline import Component +MINUTE = 60 + class Inference(Component): - def __init__(self, model_config, data_config, output_dir, resume_from=None): + def __init__(self, model_config, data_config, output_dir, resume_from=None, n_calls_per_min=None, max_concurrent=1): """ Initialize the Inference component. args: @@ -17,6 +23,8 @@ def __init__(self, model_config, data_config, output_dir, resume_from=None): data_config (dict): DataSetConfig object. output_dir (str): Directory to save the inference results. resume_from (str): optional. Path to the file where previous inference results are stored. + n_calls_per_min (int): optional. Number of calls to be made per minute, used for rate limiting. If not provided, rate limiting will not be applied. + max_concurrent (int): optional. Maximum number of concurrent inferences to run. Default is 1. """ super().__init__(output_dir) self.model = model_config.class_name(**model_config.init_args) @@ -26,9 +34,21 @@ def __init__(self, model_config, data_config, output_dir, resume_from=None): if resume_from and not os.path.exists(resume_from): raise FileNotFoundError(f"File {resume_from} not found.") + self.n_calls_per_min = n_calls_per_min + self.max_concurrent = max_concurrent + self.call_times = deque() + self.period = MINUTE + @classmethod def from_config(cls, config): - return cls(config.model_config, config.data_loader_config, config.output_dir, config.resume_from) + return cls( + config.model_config, + config.data_loader_config, + config.output_dir, + resume_from=config.resume_from, + n_calls_per_min=config.n_calls_per_min, + max_concurrent=config.max_concurrent, + ) def fetch_previous_inference_results(self): # fetch previous results from the provided resume_from file @@ -58,28 +78,96 @@ def fetch_previous_inference_results(self): logging.info(f"Last uid inferenced: {last_uid}") return pre_inf_results_df, last_uid + def validate_response_dict(self, response_dict): + # "model_output" and "is_valid" are mandatory fields by any inference component + if "model_output" not in response_dict or "is_valid" not in response_dict: + raise ValueError("Response dictionary must contain 'model_output' and 'is_valid' keys.") + + def retrieve_exisiting_result(self, data, pre_inf_results_df): + # if resume_from file is provided and valid inference results + # for the current data point are present in it, use them. + prev_results = pre_inf_results_df[pre_inf_results_df.uid == data["uid"]] + prev_result_is_valid = bool(prev_results["is_valid"].values[0]) + prev_model_output = prev_results["model_output"].values[0] + if prev_result_is_valid: + logging.info(f"Skipping inference for uid: {data['uid']}. Using previous results.") + data["model_output"], data["is_valid"] = prev_model_output, prev_result_is_valid + return data + def run(self): + if self.max_concurrent > 1: + asyncio.run(self._run_par()) + else: + self._run() + + def _run(self): if self.resume_from: pre_inf_results_df, last_uid = self.fetch_previous_inference_results() with self.data_loader as loader: with self.writer as writer: for data, model_inputs in tqdm(loader, desc="Inference Progress:"): - # if resume_from file is provided and valid inference results - # for the current data point are present in it, use them. + if self.resume_from and (data["uid"] <= last_uid): - prev_results = pre_inf_results_df[pre_inf_results_df.uid == data["uid"]] - prev_result_is_valid = bool(prev_results["is_valid"].values[0]) - prev_model_output = prev_results["model_output"].values[0] - if prev_result_is_valid: - logging.info(f"Skipping inference for uid: {data['uid']}. Using previous results.") - data["model_output"], data["is_valid"] = prev_model_output, prev_result_is_valid - writer.write(data) + prev_result = self.retrieve_exisiting_result(data, pre_inf_results_df) + if prev_result: + writer.write(prev_result) continue - # generate text from model + + # generate text from model (optionally at a limited rate) + if self.n_calls_per_min: + while len(self.call_times) >= self.n_calls_per_min: + # remove the oldest call time if it is older than the rate limit period + if time.time() - self.call_times[0] > self.period: + self.call_times.popleft() + else: + # rate limit is reached, wait for a second + time.sleep(1) + self.call_times.append(time.time()) response_dict = self.model.generate(*model_inputs) - # "model_output" and "is_valid" are mandatory fields by any inference component - if "model_output" not in response_dict or "is_valid" not in response_dict: - raise ValueError("Response dictionary must contain 'model_output' and 'is_valid' keys.") + self.validate_response_dict(response_dict) # write results data.update(response_dict) writer.write(data) + + async def run_in_excutor(self, model_inputs, executor): + loop = asyncio.get_event_loop() + return await loop.run_in_executor(executor, self.model.generate, *model_inputs) + + async def _run_par(self): + concurrent_inputs = [] + concurrent_metadata = [] + if self.resume_from: + pre_inf_results_df, last_uid = self.fetch_previous_inference_results() + with self.data_loader as loader: + with self.writer as writer: + for data, model_inputs in tqdm(loader, desc="Inference Progress:"): + if self.resume_from and (data["uid"] <= last_uid): + prev_result = self.retrieve_exisiting_result(data, pre_inf_results_df) + if prev_result: + writer.write(prev_result) + continue + + # if batch is ready for concurrent inference + elif len(concurrent_inputs) >= self.max_concurrent: + with ThreadPoolExecutor() as executor: + await self.run_batch(concurrent_inputs, concurrent_metadata, writer, executor) + concurrent_inputs = [] + concurrent_metadata = [] + else: + # add data to batch for concurrent inference + concurrent_inputs.append(model_inputs) + concurrent_metadata.append(data) + # if data loader is exhausted but there are remaining data points that did not form a full batch + if concurrent_inputs: + with ThreadPoolExecutor() as executor: + await self.run_batch(concurrent_inputs, concurrent_metadata, writer, executor) + + async def run_batch(self, concurrent_inputs, concurrent_metadata, writer, executor): + tasks = [asyncio.create_task(self.run_in_excutor(input_data, executor)) for input_data in concurrent_inputs] + results = await asyncio.gather(*tasks) + for i in range(len(concurrent_inputs)): + data, response_dict = concurrent_metadata[i], results[i] + self.validate_response_dict(response_dict) + # prepare results for writing + data.update(response_dict) + writer.write(data) diff --git a/tests/component_tests.py b/tests/component_tests.py index 31dce66..a124aa7 100644 --- a/tests/component_tests.py +++ b/tests/component_tests.py @@ -1,4 +1,5 @@ import os +import time import unittest import pandas as pd @@ -25,7 +26,7 @@ RunPythonTransform, SequenceTransform, ) -from tests.test_utils import HoloAssistTestModel, TestDataLoader +from tests.test_utils import TestDataLoader, TestModel class TestPromptProcessing(unittest.TestCase): @@ -152,7 +153,7 @@ def setUp(self) -> None: "n_iter": 40, }, ), - model_config=ModelConfig(HoloAssistTestModel, {}), + model_config=ModelConfig(TestModel, {}), output_dir=os.path.join(self.log_dir, "model_output"), resume_from="./tests/test_assets/resume_from.jsonl", ) @@ -182,5 +183,83 @@ def test_inference(self): self.assertGreaterEqual(len(df), len(resume_from_df)) +class TestParallelInference(unittest.TestCase): + def setUp(self) -> None: + self.log_dir = create_logdir("TestInference") + self.config = InferenceConfig( + component_type=Inference, + data_loader_config=DataSetConfig( + TestDataLoader, + { + "path": "./tests/test_assets/transformed_data.jsonl", + "n_iter": 40, + }, + ), + model_config=ModelConfig(TestModel, {}), + output_dir=os.path.join(self.log_dir, "model_output"), + resume_from="./tests/test_assets/resume_from.jsonl", + max_concurrent=5, + ) + component = Inference.from_config(self.config) + component.run() + + def test_inference(self): + # load inference results + df = pd.read_json(os.path.join(self.config.output_dir, "inference_result.jsonl"), lines=True) + df["is_valid"] = df["is_valid"].astype(bool) + # load resume_from file + resume_from_df = pd.read_json(self.config.resume_from, lines=True) + resume_from_df["is_valid"] = resume_from_df["is_valid"].astype(bool) + # join on uid + merged_df = df.merge(resume_from_df, on="uid", suffixes=("_new", "_old"), how="left") + merged_df["is_valid_old"] = merged_df[merged_df["is_valid_old"].isna()]["is_valid_old"] = False + merged_df["is_valid_new"] = merged_df[merged_df["is_valid_new"].isna()]["is_valid_new"] = False + # assert that when is_valid is true in resume_from, it's still true in the inference results + merged_df["validity_match"] = merged_df.apply(lambda x: x["is_valid_old"] and x["is_valid_new"], axis=1) + self.assertTrue(merged_df[merged_df["is_valid_old"]]["validity_match"].all()) + + # assert that model_output is the same in resume_from and inference results when is_valid is true + merged_df["model_output_match"] = merged_df["model_output_new"] == merged_df["model_output_old"] + self.assertTrue(merged_df[merged_df["is_valid_old"]]["model_output_match"].all()) + + # assert that number of rows in inference results is greater than or equal to resume_from + self.assertGreaterEqual(len(df), len(resume_from_df)) + + +class TestRateLimitedInference(unittest.TestCase): + def setUp(self) -> None: + self.log_dir = create_logdir("TestInference") + self.config = InferenceConfig( + component_type=Inference, + data_loader_config=DataSetConfig( + TestDataLoader, + { + "path": "./tests/test_assets/transformed_data.jsonl", + "n_iter": 40, + }, + ), + model_config=ModelConfig(TestModel, {}), + output_dir=os.path.join(self.log_dir, "model_output"), + n_calls_per_min=20, + ) + component = Inference.from_config(self.config) + start = time.time() + component.run() + end = time.time() + self.duration = end - start + + # skip this test if environment variable skip_slow_tests is set + @unittest.skipIf(os.environ.get("skip_slow_tests"), "Skipping slow test") + def test_inference(self): + # load inference results + df = pd.read_json(os.path.join(self.config.output_dir, "inference_result.jsonl"), lines=True) + # assert that all inferences are valid + self.assertTrue(df["is_valid"].all()) + # assert that there are 40 inference results + self.assertEqual(len(df), 40) + # assert that the duration is greater than 40/20 - 1 minutes + self.assertGreaterEqual(self.duration, (40 / 20 - 1) * 60) + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_utils.py b/tests/test_utils.py index ddf8065..1b0dd17 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,4 +1,5 @@ import random +import time from eureka_ml_insights.data_utils import ( AzureMMDataLoader, @@ -9,6 +10,15 @@ from eureka_ml_insights.metrics import ClassicMetric, CompositeMetric +class TestModel: + def __init__(self, model_name="generic_test_model"): + self.name = model_name + + def generate(self, text_prompt, query_images=None): + time.sleep(0.1) + return {"model_output": "model output", "is_valid": True} + + class TestHFDataReader(HFDataReader): def __init__(self, path, **kwargs): super().__init__(path, **kwargs) From b933ec1f32ea2c2c3e7e4bebd1cad3c7fbf81a16 Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Sat, 26 Oct 2024 00:13:06 +0000 Subject: [PATCH 09/16] docs + formatting --- eureka_ml_insights/core/inference.py | 29 ++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/eureka_ml_insights/core/inference.py b/eureka_ml_insights/core/inference.py index 67dcfa2..cedc859 100644 --- a/eureka_ml_insights/core/inference.py +++ b/eureka_ml_insights/core/inference.py @@ -30,15 +30,19 @@ def __init__(self, model_config, data_config, output_dir, resume_from=None, n_ca self.model = model_config.class_name(**model_config.init_args) self.data_loader = data_config.class_name(**data_config.init_args) self.writer = JsonLinesWriter(os.path.join(output_dir, "inference_result.jsonl")) + self.resume_from = resume_from if resume_from and not os.path.exists(resume_from): raise FileNotFoundError(f"File {resume_from} not found.") + # rate limiting parameters self.n_calls_per_min = n_calls_per_min - self.max_concurrent = max_concurrent self.call_times = deque() self.period = MINUTE + # parallel inference parameters + self.max_concurrent = max_concurrent + @classmethod def from_config(cls, config): return cls( @@ -79,13 +83,16 @@ def fetch_previous_inference_results(self): return pre_inf_results_df, last_uid def validate_response_dict(self, response_dict): - # "model_output" and "is_valid" are mandatory fields by any inference component + # Validate that the response dictionary contains the required fields + # "model_output" and "is_valid" are mandatory fields to be returned by any model if "model_output" not in response_dict or "is_valid" not in response_dict: raise ValueError("Response dictionary must contain 'model_output' and 'is_valid' keys.") def retrieve_exisiting_result(self, data, pre_inf_results_df): - # if resume_from file is provided and valid inference results - # for the current data point are present in it, use them. + """Finds the previous result for the given data point from the pre_inf_results_df and returns it if it is valid + data: dict, data point to be inferenced + pre_inf_results_df: pd.DataFrame, previous inference results + """ prev_results = pre_inf_results_df[pre_inf_results_df.uid == data["uid"]] prev_result_is_valid = bool(prev_results["is_valid"].values[0]) prev_model_output = prev_results["model_output"].values[0] @@ -101,6 +108,7 @@ def run(self): self._run() def _run(self): + """sequential inference""" if self.resume_from: pre_inf_results_df, last_uid = self.fetch_previous_inference_results() with self.data_loader as loader: @@ -130,10 +138,16 @@ def _run(self): writer.write(data) async def run_in_excutor(self, model_inputs, executor): + """Run model.generate in a ThreadPoolExecutor. + args: + model_inputs (tuple): inputs to the model.generate function. + executor (ThreadPoolExecutor): ThreadPoolExecutor instance. + """ loop = asyncio.get_event_loop() return await loop.run_in_executor(executor, self.model.generate, *model_inputs) async def _run_par(self): + """parallel inference""" concurrent_inputs = [] concurrent_metadata = [] if self.resume_from: @@ -163,6 +177,13 @@ async def _run_par(self): await self.run_batch(concurrent_inputs, concurrent_metadata, writer, executor) async def run_batch(self, concurrent_inputs, concurrent_metadata, writer, executor): + """Run a batch of inferences concurrently using ThreadPoolExecutor. + args: + concurrent_inputs (list): list of inputs to the model.generate function. + concurrent_metadata (list): list of metadata corresponding to the inputs. + writer (JsonLinesWriter): JsonLinesWriter instance to write the results. + executor (ThreadPoolExecutor): ThreadPoolExecutor instance. + """ tasks = [asyncio.create_task(self.run_in_excutor(input_data, executor)) for input_data in concurrent_inputs] results = await asyncio.gather(*tasks) for i in range(len(concurrent_inputs)): From 61cbfd6758ebf62e728ea3650e9b1774e47b893c Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Tue, 29 Oct 2024 02:52:38 +0000 Subject: [PATCH 10/16] fix resume from issue --- eureka_ml_insights/core/data_processing.py | 7 +++-- eureka_ml_insights/core/inference.py | 32 +++++++++++++++----- eureka_ml_insights/core/prompt_processing.py | 12 +++++--- tests/component_tests.py | 4 ++- 4 files changed, 39 insertions(+), 16 deletions(-) diff --git a/eureka_ml_insights/core/data_processing.py b/eureka_ml_insights/core/data_processing.py index 92c43f6..94e0284 100644 --- a/eureka_ml_insights/core/data_processing.py +++ b/eureka_ml_insights/core/data_processing.py @@ -8,7 +8,7 @@ import numpy as np from .pipeline import Component - +from .reserved_names import INFERENCE_RESERVED_NAMES, PROMPT_PROC_RESERVED_NAMES def compute_hash(val: str) -> str: """ @@ -67,7 +67,8 @@ def __init__( data_reader_config: DataReaderConfig output_dir: str directory to save the output files of this component. output_data_columns: Optional[List[str]] list of columns (subset of input columns) - to keep in the transformed data output file. + to keep in the transformed data output file. The columns reserved for the Eureka framework + will automatically be added to the output_data_columns if not provided. """ super().__init__(output_dir) self.data_reader = data_reader_config.class_name(**data_reader_config.init_args) @@ -88,7 +89,7 @@ def get_desired_columns(self, df): self.output_data_columns = list(self.output_data_columns) # if the data was multiplied, keep the columns that are needed to identify datapoint and replicates # (just in case the user forgot to specify these columns in output_data_columns) - cols_to_keep = ["data_point_id", "data_repeat_id"] + cols_to_keep = set(INFERENCE_RESERVED_NAMES + PROMPT_PROC_RESERVED_NAMES) self.output_data_columns.extend([col for col in cols_to_keep if col in df.columns]) self.output_data_columns = list(set(self.output_data_columns)) return df[self.output_data_columns] diff --git a/eureka_ml_insights/core/inference.py b/eureka_ml_insights/core/inference.py index cedc859..67cd557 100644 --- a/eureka_ml_insights/core/inference.py +++ b/eureka_ml_insights/core/inference.py @@ -10,6 +10,7 @@ from eureka_ml_insights.data_utils.data import DataReader, JsonLinesWriter from .pipeline import Component +from .reserved_names import INFERENCE_RESERVED_NAMES MINUTE = 60 @@ -68,15 +69,15 @@ def fetch_previous_inference_results(self): if "model_output" not in pre_inf_results_df.columns or "is_valid" not in pre_inf_results_df.columns: raise ValueError("Columns 'model_output' and 'is_valid' are required in the resume_from file.") - # check if remaining columns match those in current data loader - pre_inf_results_keys = pre_inf_results_df.columns.drop(["model_output", "is_valid"]) + # check if after removing reserved columns, remaining columns match those in current data loader + pre_inf_results_keys = pre_inf_results_df.columns.drop(INFERENCE_RESERVED_NAMES, errors="ignore") + if set(sample_data_keys) != set(pre_inf_results_keys): raise ValueError( - f"Columns in resume_from do not match the columns in the current data loader." - f"Current data loader columns: {sample_data_keys}. " - f"Provided inference results columns: {pre_inf_results_keys}." + f"Columns in the resume_from file do not match the columns in the current data loader. " + f"Expected columns: {sample_data_keys}. " + f"Columns in resume_from file: {pre_inf_results_keys}." ) - # find the last uid that was inferenced last_uid = pre_inf_results_df["uid"].astype(int).max() logging.info(f"Last uid inferenced: {last_uid}") @@ -96,9 +97,26 @@ def retrieve_exisiting_result(self, data, pre_inf_results_df): prev_results = pre_inf_results_df[pre_inf_results_df.uid == data["uid"]] prev_result_is_valid = bool(prev_results["is_valid"].values[0]) prev_model_output = prev_results["model_output"].values[0] + if prev_result_is_valid: logging.info(f"Skipping inference for uid: {data['uid']}. Using previous results.") - data["model_output"], data["is_valid"] = prev_model_output, prev_result_is_valid + try: + prev_model_tokens = prev_results["n_output_tokens"].values[0] + except KeyError: + logging.warn("Previous results do not contain 'n_output_tokens' column, setting to None for this data point.") + prev_model_tokens = None + try: + prev_model_time = prev_results["response_time"].values[0] + except KeyError: + logging.warn("Previous results do not contain 'response_time' column, setting to None for this data point.") + prev_model_time = None + + data["model_output"], data["is_valid"], data["n_output_tokens"], data["response_time"] = ( + prev_model_output, + prev_result_is_valid, + prev_model_tokens, + prev_model_time, + ) return data def run(self): diff --git a/eureka_ml_insights/core/prompt_processing.py b/eureka_ml_insights/core/prompt_processing.py index ed39e65..483299c 100644 --- a/eureka_ml_insights/core/prompt_processing.py +++ b/eureka_ml_insights/core/prompt_processing.py @@ -9,7 +9,7 @@ from eureka_ml_insights.data_utils import JinjaPromptTemplate from .data_processing import DataProcessing - +from .reserved_names import INFERENCE_RESERVED_NAMES def compute_hash(val: str) -> str: """ @@ -93,10 +93,12 @@ def run(self) -> None: logging.info(f"Average prompt num tokens: {statistics.fmean(prompt_num_tokens)}.") input_df = self.get_desired_columns(input_df) - # Remove `model_output` column if it exists in the data because this name is reserved for the model output. - if "model_output" in self.output_data_columns: - self.output_data_columns.remove("model_output") - logging.warning("Removed 'model_output' column from transformed data columns.") + # Remove `model_output`, `is_valid`, `response_time`, `n_output_tokens` columns if they exists + # in the data because these names are reserved for the inference component's use. + for col in INFERENCE_RESERVED_NAMES: + if col in self.output_data_columns: + self.output_data_columns.remove(col) + logging.warning(f"Removed '{col}' column from transformed data columns because it is reserved for the inference component.") input_df = input_df[self.output_data_columns] input_df["prompt_hash"] = prompt_hashes diff --git a/tests/component_tests.py b/tests/component_tests.py index a124aa7..4602993 100644 --- a/tests/component_tests.py +++ b/tests/component_tests.py @@ -118,7 +118,9 @@ def setUp(self) -> None: { "path": "./sample_data/sample_data2.jsonl", "format": ".jsonl", - "transform": RunPythonTransform("df['images'] = df['images'].apply(lambda x: x[0])"), + "transform": + SequenceTransform([RunPythonTransform("df['images'] = df['images'].apply(lambda x: x[0])"), + ColumnRename(name_mapping={"prompt": "prompt_2"})]), }, ), output_dir=os.path.join(self.log_dir, "data_join_output"), From 1d97f8c72727de92adc7cd1597bab1a28d5255e3 Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Tue, 29 Oct 2024 02:58:29 +0000 Subject: [PATCH 11/16] added reserved names module --- eureka_ml_insights/core/data_processing.py | 8 ++++++-- eureka_ml_insights/core/reserved_names.py | 2 ++ 2 files changed, 8 insertions(+), 2 deletions(-) create mode 100644 eureka_ml_insights/core/reserved_names.py diff --git a/eureka_ml_insights/core/data_processing.py b/eureka_ml_insights/core/data_processing.py index 94e0284..8fc45e3 100644 --- a/eureka_ml_insights/core/data_processing.py +++ b/eureka_ml_insights/core/data_processing.py @@ -8,7 +8,11 @@ import numpy as np from .pipeline import Component -from .reserved_names import INFERENCE_RESERVED_NAMES, PROMPT_PROC_RESERVED_NAMES +from .reserved_names import ( + INFERENCE_RESERVED_NAMES, + PROMPT_PROC_RESERVED_NAMES, +) + def compute_hash(val: str) -> str: """ @@ -67,7 +71,7 @@ def __init__( data_reader_config: DataReaderConfig output_dir: str directory to save the output files of this component. output_data_columns: Optional[List[str]] list of columns (subset of input columns) - to keep in the transformed data output file. The columns reserved for the Eureka framework + to keep in the transformed data output file. The columns reserved for the Eureka framework will automatically be added to the output_data_columns if not provided. """ super().__init__(output_dir) diff --git a/eureka_ml_insights/core/reserved_names.py b/eureka_ml_insights/core/reserved_names.py new file mode 100644 index 0000000..f9ca1cd --- /dev/null +++ b/eureka_ml_insights/core/reserved_names.py @@ -0,0 +1,2 @@ +INFERENCE_RESERVED_NAMES = ["model_output", "is_valid", "response_time", "n_output_tokens"] +PROMPT_PROC_RESERVED_NAMES = ["prompt_hash", "prompt", "uid", "data_point_id", "data_repeat_id"] From f0582d9bd04eb59e3fd080d6ae8553ce3e42dba2 Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Tue, 29 Oct 2024 02:59:34 +0000 Subject: [PATCH 12/16] formatting --- eureka_ml_insights/core/inference.py | 8 ++++++-- eureka_ml_insights/core/prompt_processing.py | 5 ++++- tests/component_tests.py | 9 ++++++--- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/eureka_ml_insights/core/inference.py b/eureka_ml_insights/core/inference.py index 67cd557..fcff940 100644 --- a/eureka_ml_insights/core/inference.py +++ b/eureka_ml_insights/core/inference.py @@ -103,12 +103,16 @@ def retrieve_exisiting_result(self, data, pre_inf_results_df): try: prev_model_tokens = prev_results["n_output_tokens"].values[0] except KeyError: - logging.warn("Previous results do not contain 'n_output_tokens' column, setting to None for this data point.") + logging.warn( + "Previous results do not contain 'n_output_tokens' column, setting to None for this data point." + ) prev_model_tokens = None try: prev_model_time = prev_results["response_time"].values[0] except KeyError: - logging.warn("Previous results do not contain 'response_time' column, setting to None for this data point.") + logging.warn( + "Previous results do not contain 'response_time' column, setting to None for this data point." + ) prev_model_time = None data["model_output"], data["is_valid"], data["n_output_tokens"], data["response_time"] = ( diff --git a/eureka_ml_insights/core/prompt_processing.py b/eureka_ml_insights/core/prompt_processing.py index 483299c..fe0f959 100644 --- a/eureka_ml_insights/core/prompt_processing.py +++ b/eureka_ml_insights/core/prompt_processing.py @@ -11,6 +11,7 @@ from .data_processing import DataProcessing from .reserved_names import INFERENCE_RESERVED_NAMES + def compute_hash(val: str) -> str: """ Hashes the provided value using MD5. @@ -98,7 +99,9 @@ def run(self) -> None: for col in INFERENCE_RESERVED_NAMES: if col in self.output_data_columns: self.output_data_columns.remove(col) - logging.warning(f"Removed '{col}' column from transformed data columns because it is reserved for the inference component.") + logging.warning( + f"Removed '{col}' column from transformed data columns because it is reserved for the inference component." + ) input_df = input_df[self.output_data_columns] input_df["prompt_hash"] = prompt_hashes diff --git a/tests/component_tests.py b/tests/component_tests.py index 4602993..2b63f98 100644 --- a/tests/component_tests.py +++ b/tests/component_tests.py @@ -118,9 +118,12 @@ def setUp(self) -> None: { "path": "./sample_data/sample_data2.jsonl", "format": ".jsonl", - "transform": - SequenceTransform([RunPythonTransform("df['images'] = df['images'].apply(lambda x: x[0])"), - ColumnRename(name_mapping={"prompt": "prompt_2"})]), + "transform": SequenceTransform( + [ + RunPythonTransform("df['images'] = df['images'].apply(lambda x: x[0])"), + ColumnRename(name_mapping={"prompt": "prompt_2"}), + ] + ), }, ), output_dir=os.path.join(self.log_dir, "data_join_output"), From ad88d7c437eaab7ec97f78a1430b5be46c5e1a94 Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Tue, 29 Oct 2024 03:59:36 +0000 Subject: [PATCH 13/16] fix encoding issue --- eureka_ml_insights/core/inference.py | 9 +++++---- eureka_ml_insights/data_utils/data.py | 9 ++++++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/eureka_ml_insights/core/inference.py b/eureka_ml_insights/core/inference.py index fcff940..387b731 100644 --- a/eureka_ml_insights/core/inference.py +++ b/eureka_ml_insights/core/inference.py @@ -73,10 +73,11 @@ def fetch_previous_inference_results(self): pre_inf_results_keys = pre_inf_results_df.columns.drop(INFERENCE_RESERVED_NAMES, errors="ignore") if set(sample_data_keys) != set(pre_inf_results_keys): - raise ValueError( - f"Columns in the resume_from file do not match the columns in the current data loader. " - f"Expected columns: {sample_data_keys}. " - f"Columns in resume_from file: {pre_inf_results_keys}." + logging.warn( + f"Columns in resume_from file do not match the current data loader. " + f"Current data loader columns: {sample_data_keys}. " + f"Resume_from file columns: {pre_inf_results_keys}." + "If the missing columns are going to be added by the inference process, this warning can be ignored." ) # find the last uid that was inferenced last_uid = pre_inf_results_df["uid"].astype(int).max() diff --git a/eureka_ml_insights/data_utils/data.py b/eureka_ml_insights/data_utils/data.py index 6150e82..40ac0df 100644 --- a/eureka_ml_insights/data_utils/data.py +++ b/eureka_ml_insights/data_utils/data.py @@ -14,6 +14,8 @@ from PIL import Image from tqdm import tqdm +from eureka_ml_insights.core import NumpyEncoder + from .secret_key_utils import GetKey from .transform import DFTransformBase @@ -272,7 +274,7 @@ def __init__(self, out_path): self.writer = None def __enter__(self): - self.writer = jsonlines.open(self.out_path, mode="w") + self.writer = jsonlines.open(self.out_path, mode="w", dumps=NumpyEncoder().encode) return self.writer def __exit__(self, exc_type, exc_value, traceback): @@ -355,6 +357,7 @@ class HFJsonReader(JsonReader): """ This is a DataReader that loads a json or jsonl data file from HuggingFace. """ + def __init__(self, repo_id, repo_type, filename): """ Initializes an HFJsonReader. @@ -523,8 +526,8 @@ def _save_base64_to_image_file(self, image_base64: dict, cache_path: str) -> str if image_base64: # create path to save image - file_path = os.path.join(cache_path, image_base64["path"]) - + file_path = os.path.join(cache_path, image_base64["path"]) + # only do this if the image doesn't already exist if not os.path.exists(file_path): # base64 string to binary image data From a99065117206e0c1fd13c3435b59f97956ab0b50 Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Tue, 29 Oct 2024 21:32:07 +0000 Subject: [PATCH 14/16] inference resume from validation --- eureka_ml_insights/core/inference.py | 20 +++++----- eureka_ml_insights/data_utils/data.py | 56 +++++++++++++++------------ 2 files changed, 40 insertions(+), 36 deletions(-) diff --git a/eureka_ml_insights/core/inference.py b/eureka_ml_insights/core/inference.py index 387b731..644cff9 100644 --- a/eureka_ml_insights/core/inference.py +++ b/eureka_ml_insights/core/inference.py @@ -10,7 +10,6 @@ from eureka_ml_insights.data_utils.data import DataReader, JsonLinesWriter from .pipeline import Component -from .reserved_names import INFERENCE_RESERVED_NAMES MINUTE = 60 @@ -62,23 +61,22 @@ def fetch_previous_inference_results(self): # validate the resume_from contents with self.data_loader as loader: - sample_data = loader.reader.read() - sample_data_keys = sample_data.keys() + _, sample_model_input = self.data_loader.get_sample_model_input() # verify that "model_output" and "is_valid" columns are present if "model_output" not in pre_inf_results_df.columns or "is_valid" not in pre_inf_results_df.columns: raise ValueError("Columns 'model_output' and 'is_valid' are required in the resume_from file.") - # check if after removing reserved columns, remaining columns match those in current data loader - pre_inf_results_keys = pre_inf_results_df.columns.drop(INFERENCE_RESERVED_NAMES, errors="ignore") - - if set(sample_data_keys) != set(pre_inf_results_keys): + # perform a sample inference call to get the model output keys and validate the resume_from contents + sample_response_dict = self.model.generate(*sample_model_input) + # check if the inference response dictionary contains the same keys as the resume_from file + if set(sample_response_dict.keys()) != set(pre_inf_results_df.columns): logging.warn( - f"Columns in resume_from file do not match the current data loader. " - f"Current data loader columns: {sample_data_keys}. " - f"Resume_from file columns: {pre_inf_results_keys}." - "If the missing columns are going to be added by the inference process, this warning can be ignored." + f"Columns in resume_from file do not match the current inference response. " + f"Current inference response keys: {sample_response_dict.keys()}. " + f"Resume_from file columns: {pre_inf_results_df.columns}." ) + # find the last uid that was inferenced last_uid = pre_inf_results_df["uid"].astype(int).max() logging.info(f"Last uid inferenced: {last_uid}") diff --git a/eureka_ml_insights/data_utils/data.py b/eureka_ml_insights/data_utils/data.py index 40ac0df..948670f 100644 --- a/eureka_ml_insights/data_utils/data.py +++ b/eureka_ml_insights/data_utils/data.py @@ -58,9 +58,17 @@ def __len__(self): def __iter__(self): for data in self.reader.iter(skip_empty=True, skip_invalid=True): - query_text = data["prompt"] - model_inputs = (query_text,) - yield data, model_inputs + yield self.prepare_model_input(data) + + def prepare_model_input(self, row): + query_text = row["prompt"] + model_inputs = (query_text,) + return row, model_inputs + + def get_sample_model_input(self): + """Get a sample data row and model_inputs from the jsonlines reader.""" + row = next(self.reader.iter(skip_empty=True, skip_invalid=True)) + return self.prepare_model_input(row) class MMDataLoader(DataLoader): @@ -91,28 +99,26 @@ def __init__( image_column_search_regex: optional Regex str, to search for which columns have images in them. """ - def __iter__(self): - for row in self.reader.iter(skip_empty=True, skip_invalid=True): - # get query text - query_text = row["prompt"] - model_inputs = (query_text,) - - # if images are present load them - if self.load_images: - # if the user passed in a list of image column names when creating the class, use it - if self.image_column_names: - image_column_names = self.image_column_names - # otherwise search for the image columns - else: - image_column_names = self._search_for_image_columns(row) - - # if found load them from disk and add to model inputs - if image_column_names: - images = self._gather_image_file_names(row, image_column_names) - query_images = self._load_images(images) - model_inputs = (query_text, query_images) - - yield row, model_inputs + def prepare_model_input(self, row): + # Given a row from the jsonl file, prepare the data for the model. + query_text = row["prompt"] + model_inputs = (query_text,) + + # if images are present load them + if self.load_images: + # if the user passed in a list of image column names when creating the class, use it + if self.image_column_names: + image_column_names = self.image_column_names + # otherwise search for the image columns + else: + image_column_names = self._search_for_image_columns(row) + + # if found load them from disk and add to model inputs + if image_column_names: + images = self._gather_image_file_names(row, image_column_names) + query_images = self._load_images(images) + model_inputs = (query_text, query_images) + return row, model_inputs def _search_for_image_columns(self, data_row) -> list: """ From b7973e9fbb77606efb1c16ecf4c5c9179e85c3f6 Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Tue, 29 Oct 2024 23:17:12 +0000 Subject: [PATCH 15/16] fix off-by-one --- eureka_ml_insights/core/inference.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/eureka_ml_insights/core/inference.py b/eureka_ml_insights/core/inference.py index 644cff9..e815618 100644 --- a/eureka_ml_insights/core/inference.py +++ b/eureka_ml_insights/core/inference.py @@ -94,6 +94,8 @@ def retrieve_exisiting_result(self, data, pre_inf_results_df): pre_inf_results_df: pd.DataFrame, previous inference results """ prev_results = pre_inf_results_df[pre_inf_results_df.uid == data["uid"]] + if prev_results.empty: + return None prev_result_is_valid = bool(prev_results["is_valid"].values[0]) prev_model_output = prev_results["model_output"].values[0] @@ -181,17 +183,16 @@ async def _run_par(self): if prev_result: writer.write(prev_result) continue - + # if batch is ready for concurrent inference elif len(concurrent_inputs) >= self.max_concurrent: with ThreadPoolExecutor() as executor: await self.run_batch(concurrent_inputs, concurrent_metadata, writer, executor) concurrent_inputs = [] concurrent_metadata = [] - else: - # add data to batch for concurrent inference - concurrent_inputs.append(model_inputs) - concurrent_metadata.append(data) + # add data to batch for concurrent inference + concurrent_inputs.append(model_inputs) + concurrent_metadata.append(data) # if data loader is exhausted but there are remaining data points that did not form a full batch if concurrent_inputs: with ThreadPoolExecutor() as executor: From 83a8bc47ca740a61fa47694fc483638bebafff6d Mon Sep 17 00:00:00 2001 From: Safoora Yousefi Date: Wed, 30 Oct 2024 01:02:18 +0000 Subject: [PATCH 16/16] renamed variable --- eureka_ml_insights/configs/config.py | 2 +- eureka_ml_insights/core/inference.py | 22 +++++++++++----------- tests/component_tests.py | 2 +- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/eureka_ml_insights/configs/config.py b/eureka_ml_insights/configs/config.py index 1b0098f..d5d9298 100644 --- a/eureka_ml_insights/configs/config.py +++ b/eureka_ml_insights/configs/config.py @@ -111,7 +111,7 @@ class InferenceConfig(ComponentConfig): data_loader_config: UtilityClassConfigType = None model_config: UtilityClassConfigType = None resume_from: str = None - n_calls_per_min: int = None + requests_per_minute: int = None max_concurrent: int = 1 diff --git a/eureka_ml_insights/core/inference.py b/eureka_ml_insights/core/inference.py index e815618..24c56c9 100644 --- a/eureka_ml_insights/core/inference.py +++ b/eureka_ml_insights/core/inference.py @@ -15,7 +15,7 @@ class Inference(Component): - def __init__(self, model_config, data_config, output_dir, resume_from=None, n_calls_per_min=None, max_concurrent=1): + def __init__(self, model_config, data_config, output_dir, resume_from=None, requests_per_minute=None, max_concurrent=1): """ Initialize the Inference component. args: @@ -23,7 +23,7 @@ def __init__(self, model_config, data_config, output_dir, resume_from=None, n_ca data_config (dict): DataSetConfig object. output_dir (str): Directory to save the inference results. resume_from (str): optional. Path to the file where previous inference results are stored. - n_calls_per_min (int): optional. Number of calls to be made per minute, used for rate limiting. If not provided, rate limiting will not be applied. + requests_per_minute (int): optional. Number of inference requests to be made per minute, used for rate limiting. If not provided, rate limiting will not be applied. max_concurrent (int): optional. Maximum number of concurrent inferences to run. Default is 1. """ super().__init__(output_dir) @@ -36,8 +36,8 @@ def __init__(self, model_config, data_config, output_dir, resume_from=None, n_ca raise FileNotFoundError(f"File {resume_from} not found.") # rate limiting parameters - self.n_calls_per_min = n_calls_per_min - self.call_times = deque() + self.requests_per_minute = requests_per_minute + self.request_times = deque() self.period = MINUTE # parallel inference parameters @@ -50,7 +50,7 @@ def from_config(cls, config): config.data_loader_config, config.output_dir, resume_from=config.resume_from, - n_calls_per_min=config.n_calls_per_min, + requests_per_minute=config.requests_per_minute, max_concurrent=config.max_concurrent, ) @@ -145,15 +145,15 @@ def _run(self): continue # generate text from model (optionally at a limited rate) - if self.n_calls_per_min: - while len(self.call_times) >= self.n_calls_per_min: - # remove the oldest call time if it is older than the rate limit period - if time.time() - self.call_times[0] > self.period: - self.call_times.popleft() + if self.requests_per_minute: + while len(self.request_times) >= self.requests_per_minute: + # remove the oldest request time if it is older than the rate limit period + if time.time() - self.request_times[0] > self.period: + self.request_times.popleft() else: # rate limit is reached, wait for a second time.sleep(1) - self.call_times.append(time.time()) + self.request_times.append(time.time()) response_dict = self.model.generate(*model_inputs) self.validate_response_dict(response_dict) # write results diff --git a/tests/component_tests.py b/tests/component_tests.py index 2b63f98..0a16fe6 100644 --- a/tests/component_tests.py +++ b/tests/component_tests.py @@ -245,7 +245,7 @@ def setUp(self) -> None: ), model_config=ModelConfig(TestModel, {}), output_dir=os.path.join(self.log_dir, "model_output"), - n_calls_per_min=20, + requests_per_minute=20, ) component = Inference.from_config(self.config) start = time.time()