Skip to content

Commit

Permalink
Modify ReadSourceData to handle re-running eCRs (#353)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-goggins authored Dec 4, 2023
1 parent 82a020a commit 05272c9
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 6 deletions.
46 changes: 43 additions & 3 deletions serverless-functions/ReadSourceData/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
convert_hl7_batch_messages_to_list,
)
import requests
import uuid

from phdi.fhir.harmonization.standardization import (
standardize_names,
Expand All @@ -25,6 +26,7 @@
"ecr": "EICR",
"elr": "ORU_R01",
"vxu": "VXU_V04",
"ecr-rerun": "",
}


Expand Down Expand Up @@ -185,6 +187,44 @@ def main(message: func.QueueMessage) -> None:
post_data_to_building_block(record_linkage_url, record_linkage_body)
return

# Handle re-run eCR messages
elif message_type == "ecr-rerun":
fhir_bundle, external_person_id = get_external_person_id(blob_contents)

record_linkage_url = os.environ["RECORD_LINKAGE_URL"] + "/link-record"
record_linkage_body = {
"bundle": fhir_bundle,
"external_person_id": external_person_id,
}
record_linkage_response = post_data_to_building_block(
record_linkage_url, record_linkage_body
)

message_parsing_url = os.environ["MESSAGE_PARSER_URL"] + "/parse_message"
message_parser_body = {
"message_format": "fhir",
"message": record_linkage_response.get("updated_bundle"),
"parsing_schema_name": "ecr.json",
}
message_parser_response = post_data_to_building_block(
message_parsing_url, message_parser_body
)

# Write blob data to storage
container_name = "delta-tables"
filename = f"raw_data/{str(uuid.uuid4())}.json"
parsed_message = message_parser_response.get("parsed_values")
logging.info(parsed_message)

cred_manager = AzureCredentialManager(resource_location=storage_account_url)
cloud_container_connection = AzureCloudContainerConnection(
storage_account_url=storage_account_url, cred_manager=cred_manager
)
cloud_container_connection.upload_object(
message=parsed_message, container_name=container_name, filename=filename
)
return

subscription_id = os.environ["AZURE_SUBSCRIPTION_ID"]
resource_group_name = os.environ["RESOURCE_GROUP_NAME"]
factory_name = os.environ["FACTORY_NAME"]
Expand Down Expand Up @@ -421,10 +461,10 @@ def post_data_to_building_block(url: str, body: dict) -> dict:
f"{url.upper()} STATUS CODE: {response.status_code}"
)
failed_request_reason = f"{url.upper()} REASON: {response.reason}"
failed_request_message = f"{url.upper()} MESSAGE: {response.json()['message']}"
failed_response_text = f"{url.upper()} TEXT: {response.text}"
logging.error(failed_request_status_code)
logging.error(failed_request_reason)
logging.error(failed_request_message)
raise Exception(failed_request_message)
logging.error(failed_response_text)
raise Exception(failed_response_text)

return response.json()
13 changes: 10 additions & 3 deletions serverless-functions/tests/ReadSourceData/test_read_source_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ def test_pipeline_trigger_success(
"RESOURCE_GROUP_NAME": "some-resource-group",
"FACTORY_NAME": "some-adf",
"PIPELINE_NAME": "some-pipeline",
"RECORD_LINKAGE_URL": "some_record_linkage_url",
"MESSAGE_PARSER_URL": "some_message_parser_url",
}

good_response = mock.Mock()
Expand All @@ -103,7 +105,10 @@ def test_pipeline_trigger_success(
patched_adf_management_client.return_value = adf_client

for source_data_subdirectory in MESSAGE_TO_TEMPLATE_MAP.keys():
if source_data_subdirectory != "fhir":
if (
source_data_subdirectory != "fhir"
and source_data_subdirectory != "ecr-rerun"
):
message_type = source_data_subdirectory
root_template = MESSAGE_TO_TEMPLATE_MAP[source_data_subdirectory]

Expand Down Expand Up @@ -442,7 +447,9 @@ def test_post_data_to_building_block(mocked_post, patched_azure_cred_manager):
)

# Test for failure
mocked_post.return_value = mock.Mock(status_code=400, json=(lambda: fhir_bundle))
mocked_post.return_value = mock.Mock(
status_code=400, json=(lambda: fhir_bundle), text="Oops! Error"
)
with pytest.raises(Exception) as e:
post_data_to_building_block(url="https://some_url", body=fhir_bundle)
assert "message" in str(e.value)
assert "Oops!" in str(e.value)
1 change: 1 addition & 0 deletions terraform/implementation/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ module "read_source_data" {
sleep_time = 1
ingestion_container_url = module.shared.ingestion_container_url
record_linkage_container_url = module.shared.record_linkage_container_url
message_parser_url = module.shared.message_parser_url
phi_storage_account_connection_string = module.shared.phi_storage_account_connection_string
staging_queue_url = module.shared.staging_queue_url
}
Expand Down
1 change: 1 addition & 0 deletions terraform/modules/read_source_data/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ resource "azurerm_linux_function_app" "read_source_data" {
SLEEP_TIME = var.sleep_time
INGESTION_URL = var.ingestion_container_url
RECORD_LINKAGE_URL = var.record_linkage_container_url
MESSAGE_PARSER_URL = var.message_parser_url
AzureStorageQueuesConnectionString = var.phi_storage_account_connection_string
STAGING_QUEUE_URL = var.staging_queue_url
}
Expand Down
5 changes: 5 additions & 0 deletions terraform/modules/read_source_data/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ variable "phi_storage_account_connection_string" {
variable "staging_queue_url" {
type = string
description = "The URL of the staging queue."
}

variable "message_parser_url" {
type = string
description = "URL of the message parser container"
}
6 changes: 6 additions & 0 deletions terraform/modules/shared/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ resource "azurerm_key_vault_secret" "ingestion-url" {
key_vault_id = azurerm_key_vault.phdi_key_vault.id
}

resource "azurerm_key_vault_secret" "message-parser-url" {
name = "message-parser-url"
value = "https://phdi-${terraform.workspace}-parse_message.${azurerm_container_app_environment.phdi.default_domain}"
key_vault_id = azurerm_key_vault.phdi_key_vault.id
}

##### Container registry #####

resource "azurerm_container_registry" "phdi_registry" {
Expand Down

0 comments on commit 05272c9

Please sign in to comment.