Skip to content

Commit

Permalink
eCR datastore refresh notebook updates (#349)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanPaseltiner authored Nov 29, 2023
1 parent 6c74856 commit 82a020a
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions scripts/Synapse/updateECRDataStore.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@
" \"core\":{\n",
" \"patient_id\": [\"string\", False],\n",
" \"person_id\": [\"string\", False],\n",
" \"person_id_date_added\": [\"timestamp\", True],\n",
" \"person_id_date_added\": [\"datetime\", True],\n",
" \"iris_id\": [\"string\", True],\n",
" \"iris_id_date_added\": [\"timestamp\", True],\n",
" \"iris_id_date_added\": [\"datetime\", True],\n",
" \"incident_id\": [\"string\", True],\n",
" \"incident_id_date_added\": [\"timestamp\", True]}\n",
" \"incident_id_date_added\": [\"datetime\", True]}\n",
" }\n",
"\n",
" row = ecr_schema.collect()[0].asDict()\n",
Expand Down Expand Up @@ -177,7 +177,12 @@
" \"org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n",
" )\n",
" .getOrCreate()\n",
") "
")\n",
"spark.conf.set(\"spark.databricks.delta.schema.autoMerge.enabled\", \"true\")\n",
"spark.conf.set(\"spark.sql.legacy.parquet.int96RebaseModeInRead\", \"LEGACY\")\n",
"spark.conf.set(\"spark.sql.legacy.parquet.int96RebaseModeInWrite\", \"LEGACY\")\n",
"spark.conf.set(\"spark.sql.legacy.parquet.datetimeRebaseModeInRead\", \"LEGACY\")\n",
"spark.conf.set(\"spark.sql.legacy.parquet.datetimeRebaseModeInWrite\", \"LEGACY\") "
]
},
{
Expand Down Expand Up @@ -223,12 +228,13 @@
" # Then we can stop overwriting in favor of a merge and ultimately only read in new or changed records as opposed to all of them as we are currently doing. \n",
" print(\"writing new records...\")\n",
" ECR_DATASTORE_PATH = ECR_DATASTORE_PATH + f\"-{table_name}\"\n",
" spark.sql(\"SET spark.databricks.delta.schema.autoMerge.enabled = true\") \n",
" if DeltaTable.isDeltaTable(spark, ECR_DATASTORE_PATH) and table_name == \"core\":\n",
" ecr_datastore = DeltaTable.forPath(spark, ECR_DATASTORE_PATH)\n",
"\n",
" ecr_datastore.alias(\"old\").merge(\n",
" new_ecr_records.alias(\"new\"), \"old.eicr_id = new.eicr_id\"\n",
" ).whenNotMatchedInsert(values=merge_schema).execute()\n",
" ).whenNotMatchedInsertAll().execute()\n",
" else:\n",
" # If Delta table doesn't exist, create it.\n",
" new_ecr_records.write.format(\"delta\").mode(\"overwrite\").save(ECR_DATASTORE_PATH) \n",
Expand Down Expand Up @@ -271,6 +277,7 @@
"outputs": [],
"source": [
"for table_name, schemas in spark_schemas.items():\n",
" print(f\"table: {table_name}\")\n",
" update_ecr_datastore(schemas,table_name,table_schemas, ECR_DATASTORE_PATH, DELTA_TABLES_FILESYSTEM)\n"
]
}
Expand Down

0 comments on commit 82a020a

Please sign in to comment.