Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHDO-149 - Fix issues with dynamo db queries #255

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,5 @@ class DynamoCollection<R>(
// Elements "sometimes" need to be surrounded by quotes for dynamodb, so do it always.
override val collectionElementForQuery = { name: String -> "\"$name\"" }

override val timeConversionForQuery: (Long) -> String = { epochMillis ->
val instant = Instant
.ofEpochMilli(epochMillis)
.atZone(ZoneId.of("GMT"))

// Format the Instant to ISO 8601 string
DateTimeFormatter.ISO_INSTANT.format(instant)
}
override val isArrayNotEmptyOrNull = "SIZE"
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import gov.cdc.ocio.database.persistence.Collection
import gov.cdc.ocio.database.models.Report
import gov.cdc.ocio.database.models.ReportDeadLetter
import gov.cdc.ocio.database.persistence.ProcessingStatusRepository
import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.protocols.jsoncore.JsonNode
Expand Down Expand Up @@ -62,6 +61,10 @@ class DynamoRepository(tablePrefix: String): ProcessingStatusRepository() {
Any::class.java // TODO(This needs to be replaced!)
) as Collection

override val supportsGroupBy = false
override val supportsDistinct = false
override val supportsCount = false

/**
* Dynamodb implementation of converting the content map to a JsonNode.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,7 @@ interface Collection {

val timeConversionForQuery: (Long) -> String
get() = { timeEpoch: Long -> timeEpoch.toString() }

val isArrayNotEmptyOrNull
get() = "ARRAY_LENGTH"
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ abstract class ProcessingStatusRepository {
.setObjectToNumberStrategy(ToNumberPolicy.LONG_OR_DOUBLE)
.create()

open val supportsGroupBy = true
open val supportsDistinct = true
open val supportsCount = true

/**
* Interface and default implementation for doing report content transformations from the map to whatever type
* the database is expecting.
Expand Down
2 changes: 2 additions & 0 deletions pstatus-graphql-ktor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ dependencies {
implementation project(':libs:commons-database')
implementation project(':libs:commons-types')

implementation 'software.amazon.awssdk:dynamodb-enhanced:2.27.16'

testImplementation "io.ktor:ktor-server-tests-jvm:$ktor_version"
testImplementation "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import gov.cdc.ocio.processingstatusapi.exceptions.ContentException
import gov.cdc.ocio.processingstatusapi.models.query.UploadStatus
import gov.cdc.ocio.processingstatusapi.models.query.UploadsStatus
import gov.cdc.ocio.database.models.dao.ReportDao
import gov.cdc.ocio.processingstatusapi.models.query.UploadCounts
import gov.cdc.ocio.processingstatusapi.models.dao.UploadCountsDao
import gov.cdc.ocio.processingstatusapi.utils.DateUtils
import gov.cdc.ocio.processingstatusapi.utils.PageUtils
import gov.cdc.ocio.processingstatusapi.utils.SortUtils
Expand All @@ -16,6 +16,18 @@ import org.koin.core.component.KoinComponent
import org.koin.core.component.inject


/**
* Upload status loader.
*
* @property logger KLogger
* @property repository ProcessingStatusRepository
* @property reportsCollection Collection
* @property cName String
* @property cVar String
* @property cPrefix String
* @property cElFunc Function1<String, String>
* @property timeFunc Function1<Long, String>
*/
class UploadStatusLoader: KoinComponent {

private val logger = KotlinLogging.logger {}
Expand Down Expand Up @@ -89,7 +101,9 @@ class UploadStatusLoader: KoinComponent {
sqlQuery.append(" and ${cPrefix}dexIngestDateTime < $dateEndEpochMillis")
}

sqlQuery.append(" group by ${cPrefix}uploadId, ${cPrefix}jurisdiction, ${cPrefix}senderId")
if (repository.supportsGroupBy) {
sqlQuery.append(" group by ${cPrefix}uploadId, ${cPrefix}jurisdiction, ${cPrefix}senderId")
}

// Check the sort field as well to add them to the group by clause
val sortByQueryStr = StringBuilder()
Expand All @@ -110,7 +124,7 @@ class UploadStatusLoader: KoinComponent {
}
}
//Add the sort by fields to grouping
sqlQuery.append(" , ${cPrefix}$sortField")
// sqlQuery.append(" , ${cPrefix}$sortField")

var sortOrderVal = DEFAULT_SORT_ORDER

Expand All @@ -126,21 +140,30 @@ class UploadStatusLoader: KoinComponent {
}

}
//Sort By/ Order By the given sort field
sortByQueryStr.append(" order by ${cPrefix}$sortField $sortOrderVal")
// Sort By/ Order By the given sort field
// sortByQueryStr.append(" order by ${cPrefix}$sortField $sortOrderVal")
}
}
}

logger.info("Upload Status, sqlQuery = $sqlQuery")
// Query for getting counts in the structure of UploadCounts object. Note the MAX aggregate which is used to
// get the latest timestamp from t._ts.
val countQuery = (
"select count(1) as reportCounts, ${cPrefix}uploadId, "
+ "MAX(${cPrefix}dexIngestDateTime) as latestTimestamp, "
+ "${cPrefix}jurisdiction as jurisdiction, "
+ "${cPrefix}senderId as senderId $sqlQuery"
)
// get the latest timestamp from dexIngestDateTime.
val countQuery = if (repository.supportsCount) {
(
"select count(1) as reportCounts, ${cPrefix}uploadId, "
+ "MAX(${cPrefix}dexIngestDateTime) as latestTimestamp, "
+ "${cPrefix}jurisdiction as jurisdiction, "
+ "${cPrefix}senderId as senderId $sqlQuery"
)
} else {
(
"select ${cPrefix}uploadId, "
+ "${cPrefix}dexIngestDateTime, "
+ "${cPrefix}jurisdiction, "
+ "${cPrefix}senderId $sqlQuery"
)
}
logger.info("upload status count query = $countQuery")

var totalItems = 0
Expand All @@ -150,7 +173,7 @@ class UploadStatusLoader: KoinComponent {
try {
val count = reportsCollection.queryItems(
countQuery,
UploadCounts::class.java
UploadCountsDao::class.java
)
totalItems = count.count()
logger.info("Upload status matched count = $totalItems")
Expand All @@ -163,7 +186,6 @@ class UploadStatusLoader: KoinComponent {
logger.error(ex.localizedMessage)
}


val numberOfPages: Int
val pageNumberAsInt: Int
val reports = mutableMapOf<String, List<ReportDao>>()
Expand All @@ -176,16 +198,25 @@ class UploadStatusLoader: KoinComponent {
sqlQuery.append(sortByQueryStr)

val offset = (pageNumberAsInt - 1) * pageSize
val dataSqlQuery = (
"select count(1) as reportCounts, ${cPrefix}uploadId, "
+ "MAX(${cPrefix}dexIngestDateTime) as latestTimestamp, "
+ "${cPrefix}jurisdiction as jurisdiction, ${cPrefix}senderId as senderId "
+ "$sqlQuery offset $offset limit $pageSizeAsInt"
)
val dataSqlQuery = if (repository.supportsCount) {
(
"select count(1) as reportCounts, ${cPrefix}uploadId, "
+ "MAX(${cPrefix}dexIngestDateTime) as latestTimestamp, "
+ "${cPrefix}jurisdiction as jurisdiction, ${cPrefix}senderId as senderId "
+ "$sqlQuery offset $offset limit $pageSizeAsInt"
)
} else {
(
"select ${cPrefix}uploadId, "
+ "${cPrefix}dexIngestDateTime, "
+ "${cPrefix}jurisdiction, ${cPrefix}senderId "
+ "$sqlQuery offset $offset limit $pageSizeAsInt"
)
}
logger.info("upload status data query = $dataSqlQuery")
val results = reportsCollection.queryItems(
dataSqlQuery,
UploadCounts::class.java
UploadCountsDao::class.java
).toList()

results.forEach { report ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package gov.cdc.ocio.processingstatusapi.models.dao

import gov.cdc.ocio.processingstatusapi.models.query.DuplicateFilenameCounts


/**
* Data access object for duplicate filename counts.
*
* @property filename String?
* @property totalCount Long
* @constructor
*/
data class DuplicateFilenameCountsDao(

var filename: String? = null,

var totalCount: Long = 0
) {

/**
* Convert the data access object into a graphql one.
*
* @return DuplicateFilenameCounts
*/
fun toDuplicateFilenameCounts() = DuplicateFilenameCounts().apply {
this.filename = [email protected]
this.totalCount = [email protected]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package gov.cdc.ocio.processingstatusapi.models.dao

import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbAttribute
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean

/**
* Data access object model for retrieving a received filename from the collection.
*
* @property receivedFilename String?
* @constructor
*/
@DynamoDbBean
data class ReceivedFilenameDao(
@get:DynamoDbAttribute("received_filename")
var receivedFilename: String? = null
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package gov.cdc.ocio.processingstatusapi.models.dao

import gov.cdc.ocio.processingstatusapi.models.query.UndeliveredUpload
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean


/**
* Undelivered upload data access object.
*
* @property uploadId String?
* @property filename String?
* @constructor
*/
@DynamoDbBean
data class UndeliveredUploadDao(

var uploadId: String? = null,

var filename: String? = null
) {
fun toUndeliveredUpload() = UndeliveredUpload().apply {
this.uploadId = [email protected]
this.filename = [email protected]
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package gov.cdc.ocio.processingstatusapi.models.query
package gov.cdc.ocio.processingstatusapi.models.dao

import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean
import java.time.Instant


/**
* DEX Upload Counts model, which is the structure returned from the uploads data projection cosmosdb query.
* Upload counts model, which is the structure returned from the uploads data projection query.
*
* @property reportCounts Int?
* @property uploadId String?
* @property latestTimestamp Instant?
* @property jurisdiction String?
* @property senderId String?
* @constructor
*/
data class UploadCounts(
@DynamoDbBean
data class UploadCountsDao(

var reportCounts: Int? = null,

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
package gov.cdc.ocio.processingstatusapi.models.query
package gov.cdc.ocio.processingstatusapi.models.dao

import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean

/**
* Represents the state of an upload operation in the system.
Expand All @@ -14,7 +16,8 @@ package gov.cdc.ocio.processingstatusapi.models.query
*
* Note: All properties are nullable to support partial database mappings
*/
data class Upload(
@DynamoDbBean
data class UploadDao(
var uploadId: String? = null,
var action: String? = null,
var status: String? = null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package gov.cdc.ocio.processingstatusapi.models.dao

import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean

/**
* Data access object model for retrieving an upload id from the collection.
*
* @property uploadId String?
* @constructor
*/
@DynamoDbBean
data class UploadIdDao(var uploadId: String? = null)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import gov.cdc.ocio.database.cosmos.CosmosCollection
import gov.cdc.ocio.database.cosmos.CosmosRepository
import gov.cdc.ocio.database.models.dao.ReportDao
import gov.cdc.ocio.processingstatusapi.exceptions.BadRequestException
import gov.cdc.ocio.processingstatusapi.models.query.UploadCounts
import gov.cdc.ocio.processingstatusapi.models.dao.UploadCountsDao
import io.mockk.*
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertNotNull
Expand Down Expand Up @@ -172,16 +172,16 @@ class UploadStatusLoaderTest : KoinTest {
// Setup mocks
//Return PageIterable of type UploadCounts
val uploadCounts = listOf(
UploadCounts(4, "uploadId1", Instant.now(), "jurisdiction1", "senderId1"),
UploadCounts(2, "uploadId2",Instant.now(), "jurisdiction2","senderId2")
UploadCountsDao(4, "uploadId1", Instant.now(), "jurisdiction1", "senderId1"),
UploadCountsDao(2, "uploadId2",Instant.now(), "jurisdiction2","senderId2")
)
val uploadCountsIterator = uploadCounts.iterator()


every {
mockReportsCollection.queryItems(
any<String>(),
UploadCounts::class.java
UploadCountsDao::class.java
)
} returns listOf()

Expand Down
2 changes: 1 addition & 1 deletion test/scripts/load-testing/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,4 @@ def create_routing(upload_id, dex_ingest_datetime):
}
""" % (datetime.utcnow().replace(microsecond=0).isoformat() + 'Z')

return create_report_msg_from_content(upload_id, "UPLOAD API", "", dex_ingest_datetime, False, False, content)
return create_report_msg_from_content(upload_id, "UPLOAD API", "blob-file-copy", dex_ingest_datetime, False, False, content)