Skip to content

Commit

Permalink
Insert non-null MRN & SSN identifiers and non-null phone numbers (#343)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-goggins authored Nov 27, 2023
1 parent 8a3f14a commit a211ffa
Showing 1 changed file with 160 additions and 3 deletions.
163 changes: 160 additions & 3 deletions scripts/Synapse/convertParquetMPI.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"metadata": {},
"outputs": [],
"source": [
"pip install --upgrade pip"
"# pip install --upgrade pip"
]
},
{
Expand All @@ -25,7 +25,7 @@
"metadata": {},
"outputs": [],
"source": [
"pip install git+https://github.com/CDCgov/phdi@main"
"# pip install git+https://github.com/CDCgov/phdi@main"
]
},
{
Expand All @@ -50,6 +50,163 @@
"filename=\"\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# This script converts patient data from parquet to patient FHIR resources.\n",
"from typing import Dict, Tuple\n",
"import uuid\n",
"from datetime import datetime\n",
"\n",
"\n",
"def extract_given_name(data: Dict):\n",
" first_name = data.get(\"first_name\", None)\n",
" middle_name = data.get(\"middle_name\", None)\n",
"\n",
" given_names = []\n",
"\n",
" for name in [first_name, middle_name]:\n",
" if name is not None:\n",
" for n in name.split():\n",
" given_names.append(n)\n",
"\n",
" if len(given_names) > 0:\n",
" return given_names\n",
" else:\n",
" return None\n",
"\n",
"\n",
"def adjust_birthdate(data: Dict):\n",
" # TODO: remove this function and pass in the `format` parameter to dob\n",
" # standardization in ReadSourceData for LAC\n",
" format = \"%d%b%Y:00:00:00.000\"\n",
" dob = data.get(\"birthdate\", None)\n",
" if dob is not None and \":\" in dob:\n",
" datetime_str = datetime.strptime(dob, format)\n",
" dob = datetime_str.strftime(\"%Y-%m-%d\")\n",
" return dob\n",
"\n",
"def convert_to_patient_fhir_resources(data: Dict) -> Tuple:\n",
" \"\"\"\n",
" Converts and returns a row of patient data into patient resource in a FHIR-formatted\n",
" patient resouce with a newly generated patient id as well as the\n",
" `external_person_id`.\n",
"\n",
" :param data: Dictionary of patient data that optionionally includes the following\n",
" fields: mrn, ssn, first_name, middle_name, last_name, home_phone, cell-phone, sex,\n",
" birthdate, address, city, state, zip.\n",
" :return: Tuple of the `external_person_id` and FHIR-formatted patient resource.\n",
" \"\"\"\n",
"\n",
" patient_id = str(uuid.uuid4())\n",
"\n",
" optional_data = {\n",
" \"mrn\": data.get(\"mrn\", None),\n",
" \"ssn\": data.get(\"ssn\", None),\n",
" \"home_phone\": data.get(\"home_phone\", None),\n",
" \"cell_phone\": data.get(\"cell_phone\", None),\n",
" \"email\": data.get(\"email\", None),\n",
" }\n",
" identifiers = []\n",
" telecom = []\n",
"\n",
" # Iterate through each patient and convert patient data to FHIR resource\n",
" patient_resource = {\n",
" \"resourceType\": \"Patient\",\n",
" \"id\": f\"{patient_id}\",\n",
" \"name\": [\n",
" {\n",
" \"family\": f\"{data.get('last_name',None)}\",\n",
" \"given\": extract_given_name(data),\n",
" }\n",
" ],\n",
" \"gender\": f\"{data.get('sex',None)}\",\n",
" \"birthDate\": adjust_birthdate(data),\n",
" \"address\": [\n",
" {\n",
" \"use\": \"home\",\n",
" \"line\": [f\"{data.get('address',None)}\"],\n",
" \"city\": f\"{data.get('city',None)}\",\n",
" \"state\": f\"{data.get('state',None)}\",\n",
" \"postalCode\": f\"{data.get('zip',None)}\",\n",
" }\n",
" ],\n",
" }\n",
"\n",
" for col, value in optional_data.items():\n",
" if value is not None:\n",
" if col == \"mrn\":\n",
" mrn = {\n",
" \"type\": {\n",
" \"coding\": [\n",
" {\n",
" \"system\": \"http://terminology.hl7.org/CodeSystem/v2-0203\",\n",
" \"code\": \"MR\",\n",
" }\n",
" ]\n",
" },\n",
" \"value\": value,\n",
" }\n",
" identifiers.append(mrn)\n",
" elif col == \"ssn\":\n",
" ssn = {\n",
" \"type\": {\n",
" \"coding\": [\n",
" {\n",
" \"system\": \"http://terminology.hl7.org/CodeSystem/v2-0203\",\n",
" \"code\": \"SS\",\n",
" }\n",
" ]\n",
" },\n",
" \"value\": value,\n",
" }\n",
" identifiers.append(ssn)\n",
" elif col == \"home_phone\":\n",
" home_phone = (\n",
" {\n",
" \"system\": \"phone\",\n",
" \"value\": value,\n",
" \"use\": \"home\",\n",
" },\n",
" )\n",
" telecom.append(home_phone)\n",
" elif col == \"cell_phone\":\n",
" cell_phone = {\n",
" \"system\": \"phone\",\n",
" \"value\": value,\n",
" \"use\": \"cell\",\n",
" }\n",
" telecom.append(cell_phone)\n",
"\n",
" elif col == \"email\":\n",
" email = {\"value\": value, \"system\": \"email\"}\n",
" telecom.append(email)\n",
"\n",
" if len(identifiers) > 0:\n",
" patient_resource[\"identifier\"] = identifiers\n",
" if len(telecom) > 0:\n",
" patient_resource[\"telecom\"] = telecom\n",
"\n",
" fhir_bundle = {\n",
" \"resourceType\": \"Bundle\",\n",
" \"type\": \"batch\",\n",
" \"id\": str(uuid.uuid4()),\n",
" \"entry\": [\n",
" {\n",
" \"fullUrl\": f\"urn:uuid:{patient_id}\",\n",
" \"resource\": patient_resource,\n",
" \"request\": {\"method\": \"PUT\", \"url\": f\"Patient/{patient_id}\"},\n",
" },\n",
" ],\n",
" }\n",
"\n",
" external_person_id = data.get(\"person_id\", None)\n",
" return (external_person_id, fhir_bundle)"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -67,7 +224,7 @@
"outputs": [],
"source": [
"from notebookutils import mssparkutils\n",
"from phdi.linkage.seed import convert_to_patient_fhir_resources\n",
"# from phdi.linkage.seed import convert_to_patient_fhir_resources\n",
"from datetime import date\n",
"import json\n",
"from pyspark.sql import SparkSession\n",
Expand Down

0 comments on commit a211ffa

Please sign in to comment.