diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f77d2f..9b9f977 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Enhancements - Improve exception unwrapping flexibility for SdkClientUtils ([#67](https://github.com/opensearch-project/opensearch-remote-metadata-sdk/pull/67)) - Add util methods to handle ActionListeners in whenComplete ([#75](https://github.com/opensearch-project/opensearch-remote-metadata-sdk/pull/75)) +- Make DynamoDBClient fully async ([#79](https://github.com/opensearch-project/opensearch-remote-metadata-sdk/pull/79)) ### Bug Fixes ### Infrastructure diff --git a/ddb-client/src/main/java/org/opensearch/remote/metadata/client/impl/DDBOpenSearchClient.java b/ddb-client/src/main/java/org/opensearch/remote/metadata/client/impl/DDBOpenSearchClient.java index f04089d..7b912ad 100644 --- a/ddb-client/src/main/java/org/opensearch/remote/metadata/client/impl/DDBOpenSearchClient.java +++ b/ddb-client/src/main/java/org/opensearch/remote/metadata/client/impl/DDBOpenSearchClient.java @@ -18,17 +18,14 @@ import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider; import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider; import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; -import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse; import software.amazon.awssdk.services.dynamodb.model.GetItemRequest; import software.amazon.awssdk.services.dynamodb.model.GetItemResponse; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; -import software.amazon.awssdk.services.dynamodb.model.PutItemRequest.Builder; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; -import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -80,6 +77,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; @@ -113,7 +111,7 @@ public class DDBOpenSearchClient extends AbstractSdkClient { // TENANT_ID hash key requires non-null value private static final String DEFAULT_TENANT = "DEFAULT_TENANT"; - private DynamoDbClient dynamoDbClient; + private DynamoDbAsyncClient dynamoDbAsyncClient; private AOSOpenSearchClient aosOpenSearchClient; @Override @@ -126,7 +124,7 @@ public void initialize(Map metadataSettings) { super.initialize(metadataSettings); validateAwsParams(remoteMetadataType, remoteMetadataEndpoint, region, serviceName); - this.dynamoDbClient = createDynamoDbClient(region); + this.dynamoDbAsyncClient = createDynamoDbAsyncClient(region); this.aosOpenSearchClient = new AOSOpenSearchClient(); this.aosOpenSearchClient.initialize(metadataSettings); } @@ -139,13 +137,13 @@ public DDBOpenSearchClient() {} /** * Package private constructor for testing * - * @param dynamoDbClient AWS DDB client to perform CRUD operations on a DDB table. + * @param dynamoDbAsyncClient AWS DDB async client to perform CRUD operations on a DDB table. * @param aosOpenSearchClient Remote opensearch client to perform search operations. Documents written to DDB * needs to be synced offline with remote opensearch. * @param tenantIdField the field name for the tenant id */ - DDBOpenSearchClient(DynamoDbClient dynamoDbClient, AOSOpenSearchClient aosOpenSearchClient, String tenantIdField) { - this.dynamoDbClient = dynamoDbClient; + DDBOpenSearchClient(DynamoDbAsyncClient dynamoDbAsyncClient, AOSOpenSearchClient aosOpenSearchClient, String tenantIdField) { + this.dynamoDbAsyncClient = dynamoDbAsyncClient; this.aosOpenSearchClient = aosOpenSearchClient; this.tenantIdField = tenantIdField; } @@ -167,9 +165,9 @@ public CompletionStage putDataObjectAsync( final String tenantId = request.tenantId() != null ? request.tenantId() : DEFAULT_TENANT; final String tableName = request.index(); final GetItemRequest getItemRequest = buildGetItemRequest(tenantId, id, request.index()); - return executePrivilegedAsync(() -> { + + return doPrivileged(() -> dynamoDbAsyncClient.getItem(getItemRequest).thenCompose(getItemResponse -> { try { - GetItemResponse getItemResponse = dynamoDbClient.getItem(getItemRequest); Long sequenceNumber = initOrIncrementSeqNo(getItemResponse); String source = Strings.toString(MediaTypeRegistry.JSON, request.dataObject()); JsonNode jsonNode = OBJECT_MAPPER.readTree(source); @@ -182,29 +180,35 @@ public CompletionStage putDataObjectAsync( item.put(RANGE_KEY, AttributeValue.builder().s(id).build()); item.put(SOURCE, AttributeValue.builder().m(sourceMap).build()); item.put(SEQ_NO_KEY, AttributeValue.builder().n(sequenceNumber.toString()).build()); - Builder builder = PutItemRequest.builder().tableName(tableName).item(item); + PutItemRequest.Builder builder = PutItemRequest.builder().tableName(tableName).item(item); + if (!request.overwriteIfExists() && getItemResponse != null && getItemResponse.item() != null && !getItemResponse.item().isEmpty()) { throw new OpenSearchStatusException("Existing data object for ID: " + request.id(), RestStatus.CONFLICT); } + final PutItemRequest putItemRequest = builder.build(); - dynamoDbClient.putItem(putItemRequest); - String simulatedIndexResponse = simulateOpenSearchResponse( - request.index(), - id, - source, - sequenceNumber, - Map.of("result", "created") - ); - return PutDataObjectResponse.builder().id(id).parser(createParser(simulatedIndexResponse)).build(); + return dynamoDbAsyncClient.putItem(putItemRequest).thenApply(putItemResponse -> { + String simulatedIndexResponse = simulateOpenSearchResponse( + request.index(), + id, + source, + sequenceNumber, + Map.of("result", "created") + ); + try { + return PutDataObjectResponse.builder().id(id).parser(createParser(simulatedIndexResponse)).build(); + } catch (IOException e) { + throw new OpenSearchStatusException("Failed to create parser for response", RestStatus.INTERNAL_SERVER_ERROR, e); + } + }); } catch (IOException e) { - // Rethrow unchecked exception on XContent parsing error - throw new OpenSearchStatusException("Failed to parse data object " + request.id(), RestStatus.BAD_REQUEST); + throw new OpenSearchStatusException("Failed to parse data object " + request.id(), RestStatus.BAD_REQUEST, e); } - }, executor); + })); } /** @@ -219,9 +223,8 @@ public CompletionStage getDataObjectAsync( Boolean isMultiTenancyEnabled ) { final GetItemRequest getItemRequest = buildGetItemRequest(request.tenantId(), request.id(), request.index()); - return executePrivilegedAsync(() -> { + return doPrivileged(() -> dynamoDbAsyncClient.getItem(getItemRequest)).thenApply(getItemResponse -> { try { - final GetItemResponse getItemResponse = dynamoDbClient.getItem(getItemRequest); ObjectNode sourceObject; boolean found; String sequenceNumberString = null; @@ -264,7 +267,7 @@ public CompletionStage getDataObjectAsync( // Rethrow unchecked exception on XContent parsing error throw new OpenSearchStatusException("Failed to parse response", RestStatus.INTERNAL_SERVER_ERROR); } - }, executor); + }); } /** @@ -279,20 +282,25 @@ public CompletionStage updateDataObjectAsync( Boolean isMultiTenancyEnabled ) { final String tenantId = request.tenantId() != null ? request.tenantId() : DEFAULT_TENANT; - return executePrivilegedAsync(() -> { + return doPrivileged(() -> { try { String source = Strings.toString(MediaTypeRegistry.JSON, request.dataObject()); JsonNode jsonNode = OBJECT_MAPPER.readTree(source); - Long sequenceNumber = updateItemWithRetryOnConflict(tenantId, jsonNode, request); - String simulatedUpdateResponse = simulateOpenSearchResponse( - request.index(), - request.id(), - source, - sequenceNumber, - Map.of("result", "updated") - ); - return UpdateDataObjectResponse.builder().id(request.id()).parser(createParser(simulatedUpdateResponse)).build(); + return updateItemWithRetryOnConflict(tenantId, jsonNode, request).thenApply(sequenceNumber -> { + try { + String simulatedUpdateResponse = simulateOpenSearchResponse( + request.index(), + request.id(), + source, + sequenceNumber, + Map.of("result", "updated") + ); + return UpdateDataObjectResponse.builder().id(request.id()).parser(createParser(simulatedUpdateResponse)).build(); + } catch (IOException e) { + throw new OpenSearchStatusException("Parsing error creating update response", RestStatus.INTERNAL_SERVER_ERROR, e); + } + }); } catch (IOException e) { log.error("Error updating {} in {}: {}", request.id(), request.index(), e.getMessage(), e); // Rethrow unchecked exception on update IOException @@ -301,10 +309,10 @@ public CompletionStage updateDataObjectAsync( RestStatus.BAD_REQUEST ); } - }, executor); + }); } - private Long updateItemWithRetryOnConflict(String tenantId, JsonNode jsonNode, UpdateDataObjectRequest request) { + private CompletionStage updateItemWithRetryOnConflict(String tenantId, JsonNode jsonNode, UpdateDataObjectRequest request) { Map updateItem = DDBJsonTransformer.convertJsonObjectToDDBAttributeMap(jsonNode); updateItem.remove(this.tenantIdField); updateItem.remove(RANGE_KEY); @@ -316,14 +324,22 @@ private Long updateItemWithRetryOnConflict(String tenantId, JsonNode jsonNode, U expressionAttributeNames.put("#source", SOURCE); Map expressionAttributeValues = new HashMap<>(); expressionAttributeValues.put(":incr", AttributeValue.builder().n("1").build()); - int retriesRemaining = request.retryOnConflict(); - do { - try { + + return retryUpdate(request, updateKey, updateItem, expressionAttributeNames, expressionAttributeValues, request.retryOnConflict()); + } + + private CompletionStage retryUpdate( + UpdateDataObjectRequest request, + Map updateKey, + Map updateItem, + Map expressionAttributeNames, + Map expressionAttributeValues, + int retriesRemaining + ) { + return dynamoDbAsyncClient.getItem(GetItemRequest.builder().tableName(request.index()).key(updateKey).build()) + .thenCompose(currentItem -> { // Fetch current item and extract data object - Map currentItem = dynamoDbClient.getItem( - GetItemRequest.builder().tableName(request.index()).key(updateKey).build() - ).item(); - Map dataObject = new HashMap<>(currentItem.get(SOURCE).m()); + Map dataObject = new HashMap<>(currentItem.item().get(SOURCE).m()); // Update existing with changes dataObject.putAll(updateItem); expressionAttributeValues.put(":source", AttributeValue.builder().m(dataObject).build()); @@ -331,7 +347,7 @@ private Long updateItemWithRetryOnConflict(String tenantId, JsonNode jsonNode, U if (request.ifSeqNo() != null) { expressionAttributeValues.put(":currentSeqNo", AttributeValue.builder().n(Long.toString(request.ifSeqNo())).build()); } else { - expressionAttributeValues.put(":currentSeqNo", currentItem.get(SEQ_NO_KEY)); + expressionAttributeValues.put(":currentSeqNo", currentItem.item().get(SEQ_NO_KEY)); } UpdateItemRequest.Builder updateItemRequestBuilder = UpdateItemRequest.builder().tableName(request.index()).key(updateKey); updateItemRequestBuilder.updateExpression("SET #seqNo = #seqNo + :incr, #source = :source "); @@ -339,22 +355,34 @@ private Long updateItemWithRetryOnConflict(String tenantId, JsonNode jsonNode, U updateItemRequestBuilder.expressionAttributeNames(expressionAttributeNames) .expressionAttributeValues(expressionAttributeValues); UpdateItemRequest updateItemRequest = updateItemRequestBuilder.build(); - UpdateItemResponse updateItemResponse = dynamoDbClient.updateItem(updateItemRequest); - if (updateItemResponse != null - && updateItemResponse.attributes() != null - && updateItemResponse.attributes().containsKey(SEQ_NO_KEY)) { - return Long.parseLong(updateItemResponse.attributes().get(SEQ_NO_KEY).n()); - } - } catch (ConditionalCheckFailedException ccfe) { - if (retriesRemaining < 1) { - // Throw exception if retries exhausted - String message = "Document version conflict updating " + request.id() + " in index " + request.index(); - log.error(message + ": {}", ccfe.getMessage(), ccfe); - throw new OpenSearchStatusException(message, RestStatus.CONFLICT); - } - } - } while (retriesRemaining-- > 0); - return null; // Should never get here + + return dynamoDbAsyncClient.updateItem(updateItemRequest).thenApply(updateItemResponse -> { + if (updateItemResponse != null + && updateItemResponse.attributes() != null + && updateItemResponse.attributes().containsKey(SEQ_NO_KEY)) { + return Long.parseLong(updateItemResponse.attributes().get(SEQ_NO_KEY).n()); + } + return null; + }).exceptionally(e -> { + if (e.getCause() instanceof ConditionalCheckFailedException) { + if (retriesRemaining > 0) { + return retryUpdate( + request, + updateKey, + updateItem, + expressionAttributeNames, + expressionAttributeValues, + retriesRemaining - 1 + ).toCompletableFuture().join(); + } else { + String message = "Document version conflict updating " + request.id() + " in index " + request.index(); + log.error(message + ": {}", e.getMessage(), e); + throw new OpenSearchStatusException(message, RestStatus.CONFLICT); + } + } + throw new CompletionException(e); + }); + }); } /** @@ -378,9 +406,8 @@ public CompletionStage deleteDataObjectAsync( ) ) .build(); - return executePrivilegedAsync(() -> { + return doPrivileged(() -> dynamoDbAsyncClient.deleteItem(deleteItemRequest).thenApply(deleteItemResponse -> { try { - DeleteItemResponse deleteItemResponse = dynamoDbClient.deleteItem(deleteItemRequest); Long sequenceNumber = null; if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(SEQ_NO_KEY)) { sequenceNumber = Long.parseLong(deleteItemResponse.attributes().get(SEQ_NO_KEY).n()) + 1; @@ -397,7 +424,7 @@ public CompletionStage deleteDataObjectAsync( // Rethrow unchecked exception on XContent parsing error throw new OpenSearchStatusException("Failed to parse response", RestStatus.INTERNAL_SERVER_ERROR); } - }, executor); + })); } @Override @@ -406,96 +433,103 @@ public CompletionStage bulkDataObjectAsync( Executor executor, Boolean isMultiTenancyEnabled ) { - return executePrivilegedAsync(() -> { + return doPrivileged(() -> { log.info("Performing {} bulk actions on table {}", request.requests().size(), request.getIndices()); + long startNanos = System.nanoTime(); + return processBulkRequestsAsync(request.requests(), 0, new ArrayList<>(), executor, isMultiTenancyEnabled).thenCompose( + responses -> { + long endNanos = System.nanoTime(); + long tookMillis = TimeUnit.NANOSECONDS.toMillis(endNanos - startNanos); + log.info("Bulk action complete for {} items, took {} ms", responses.size(), tookMillis); + return buildBulkDataObjectResponse(responses, tookMillis); + } + ); + }); + } - List responses = new ArrayList<>(); + private CompletionStage> processBulkRequestsAsync( + List requests, + int index, + List responses, + Executor executor, + Boolean isMultiTenancyEnabled + ) { + if (index >= requests.size()) { + return CompletableFuture.completedFuture(responses); + } - // TODO: Ideally if we only have put and delete requests we can use DynamoDB BatchWriteRequest. - long startNanos = System.nanoTime(); - for (DataObjectRequest dataObjectRequest : request.requests()) { - try { - if (dataObjectRequest instanceof PutDataObjectRequest) { - responses.add( - putDataObjectAsync((PutDataObjectRequest) dataObjectRequest, executor, isMultiTenancyEnabled) - .toCompletableFuture() - .join() - ); - } else if (dataObjectRequest instanceof UpdateDataObjectRequest) { - responses.add( - updateDataObjectAsync((UpdateDataObjectRequest) dataObjectRequest, executor, isMultiTenancyEnabled) - .toCompletableFuture() - .join() - ); - } else if (dataObjectRequest instanceof DeleteDataObjectRequest) { - responses.add( - deleteDataObjectAsync((DeleteDataObjectRequest) dataObjectRequest, executor, isMultiTenancyEnabled) - .toCompletableFuture() - .join() - ); - } - } catch (CompletionException e) { - Exception cause = SdkClientUtils.unwrapAndConvertToException(e); - RestStatus status = ExceptionsHelper.status(cause); - if (dataObjectRequest instanceof PutDataObjectRequest) { - responses.add( - new PutDataObjectResponse.Builder().index(dataObjectRequest.index()) - .id(dataObjectRequest.id()) - .failed(true) - .cause(cause) - .status(status) - .build() - ); - } else if (dataObjectRequest instanceof UpdateDataObjectRequest) { - responses.add( - new UpdateDataObjectResponse.Builder().index(dataObjectRequest.index()) - .id(dataObjectRequest.id()) - .failed(true) - .cause(cause) - .status(status) - .build() - ); - } else if (dataObjectRequest instanceof DeleteDataObjectRequest) { - responses.add( - new DeleteDataObjectResponse.Builder().index(dataObjectRequest.index()) - .id(dataObjectRequest.id()) - .failed(true) - .cause(cause) - .status(status) - .build() - ); - } - log.error("Error in bulk operation for id {}: {}", dataObjectRequest.id(), e.getCause().getMessage(), e.getCause()); + DataObjectRequest dataObjectRequest = requests.get(index); + CompletionStage futureResponse; + + if (dataObjectRequest instanceof PutDataObjectRequest) { + futureResponse = putDataObjectAsync((PutDataObjectRequest) dataObjectRequest, executor, isMultiTenancyEnabled); + } else if (dataObjectRequest instanceof UpdateDataObjectRequest) { + futureResponse = updateDataObjectAsync((UpdateDataObjectRequest) dataObjectRequest, executor, isMultiTenancyEnabled); + } else if (dataObjectRequest instanceof DeleteDataObjectRequest) { + futureResponse = deleteDataObjectAsync((DeleteDataObjectRequest) dataObjectRequest, executor, isMultiTenancyEnabled); + } else { + futureResponse = CompletableFuture.failedFuture( + new IllegalArgumentException("Unsupported request type: " + dataObjectRequest.getClass().getSimpleName()) + ); + } + + return futureResponse.handle((response, throwable) -> { + if (throwable != null) { + Exception cause = SdkClientUtils.unwrapAndConvertToException(throwable); + RestStatus status = ExceptionsHelper.status(cause); + if (dataObjectRequest instanceof PutDataObjectRequest) { + return new PutDataObjectResponse.Builder().index(dataObjectRequest.index()) + .id(dataObjectRequest.id()) + .failed(true) + .cause(cause) + .status(status) + .build(); + } else if (dataObjectRequest instanceof UpdateDataObjectRequest) { + return new UpdateDataObjectResponse.Builder().index(dataObjectRequest.index()) + .id(dataObjectRequest.id()) + .failed(true) + .cause(cause) + .status(status) + .build(); + } else if (dataObjectRequest instanceof DeleteDataObjectRequest) { + return new DeleteDataObjectResponse.Builder().index(dataObjectRequest.index()) + .id(dataObjectRequest.id()) + .failed(true) + .cause(cause) + .status(status) + .build(); } + log.error("Error in bulk operation for id {}: {}", dataObjectRequest.id(), throwable.getMessage(), throwable); } - long endNanos = System.nanoTime(); - long tookMillis = TimeUnit.NANOSECONDS.toMillis(endNanos - startNanos); - - log.info("Bulk action complete for {} items, took {} ms", responses.size(), tookMillis); - return buildBulkDataObjectResponse(responses, tookMillis); - }, executor); + return response; + }).thenCompose(response -> { + responses.add(response); + return processBulkRequestsAsync(requests, index + 1, responses, executor, isMultiTenancyEnabled); + }); } - private BulkDataObjectResponse buildBulkDataObjectResponse(List responses, long tookMillis) { + private CompletionStage buildBulkDataObjectResponse(List responses, long tookMillis) { // Reconstruct BulkResponse to leverage its parser and hasFailed methods BulkItemResponse[] responseArray = new BulkItemResponse[responses.size()]; - try { + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { for (int id = 0; id < responses.size(); id++) { responseArray[id] = buildBulkItemResponse(responses, id); } BulkResponse br = new BulkResponse(responseArray, tookMillis); - try (XContentBuilder builder = XContentFactory.jsonBuilder()) { - br.toXContent(builder, ToXContent.EMPTY_PARAMS); - return new BulkDataObjectResponse( + br.toXContent(builder, ToXContent.EMPTY_PARAMS); + return CompletableFuture.completedFuture( + new BulkDataObjectResponse( responses.toArray(new DataObjectResponse[0]), tookMillis, br.hasFailures(), createParser(builder.toString()) - ); - } + ) + ); } catch (IOException e) { // Rethrow unchecked exception on XContent parsing error - throw new OpenSearchStatusException("Failed to parse bulk response", RestStatus.INTERNAL_SERVER_ERROR); + return CompletableFuture.failedFuture( + new OpenSearchStatusException("Failed to parse bulk response", RestStatus.INTERNAL_SERVER_ERROR) + ); } } @@ -624,12 +658,12 @@ private static void validateAwsParams(String clientType, String remoteMetadataEn } } - private static DynamoDbClient createDynamoDbClient(String region) { + private static DynamoDbAsyncClient createDynamoDbAsyncClient(String region) { if (region == null) { throw new IllegalStateException("REGION environment variable needs to be set!"); } return doPrivileged( - () -> DynamoDbClient.builder().region(Region.of(region)).credentialsProvider(createCredentialsProvider()).build() + () -> DynamoDbAsyncClient.builder().region(Region.of(region)).credentialsProvider(createCredentialsProvider()).build() ); } @@ -643,8 +677,8 @@ private static AwsCredentialsProvider createCredentialsProvider() { @Override public void close() throws Exception { - if (dynamoDbClient != null) { - dynamoDbClient.close(); + if (dynamoDbAsyncClient != null) { + dynamoDbAsyncClient.close(); } if (aosOpenSearchClient != null) { aosOpenSearchClient.close(); diff --git a/ddb-client/src/test/java/org/opensearch/remote/metadata/client/impl/DDBOpenSearchClientTests.java b/ddb-client/src/test/java/org/opensearch/remote/metadata/client/impl/DDBOpenSearchClientTests.java index 6c181aa..239b2b1 100644 --- a/ddb-client/src/test/java/org/opensearch/remote/metadata/client/impl/DDBOpenSearchClientTests.java +++ b/ddb-client/src/test/java/org/opensearch/remote/metadata/client/impl/DDBOpenSearchClientTests.java @@ -8,7 +8,7 @@ */ package org.opensearch.remote.metadata.client.impl; -import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException; import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; @@ -105,7 +105,7 @@ public class DDBOpenSearchClientTests { private SdkClient sdkClient; @Mock - private DynamoDbClient dynamoDbClient; + private DynamoDbAsyncClient dynamoDbAsyncClient; @Mock private AOSOpenSearchClient aosOpenSearchClient; @Captor @@ -141,10 +141,13 @@ public void setup() { MockitoAnnotations.openMocks(this); sdkClient = SdkClientFactory.wrapSdkClientDelegate( - new DDBOpenSearchClient(dynamoDbClient, aosOpenSearchClient, TENANT_ID_FIELD), + new DDBOpenSearchClient(dynamoDbAsyncClient, aosOpenSearchClient, TENANT_ID_FIELD), true ); testDataObject = new TestDataObject("foo"); + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn( + CompletableFuture.completedFuture(GetItemResponse.builder().build()) + ); } @Test @@ -157,11 +160,13 @@ public void testPutDataObject_HappyCase() throws IOException { .tenantId(TENANT_ID) .dataObject(testDataObject) .build(); - when(dynamoDbClient.putItem(any(PutItemRequest.class))).thenReturn(PutItemResponse.builder().build()); + when(dynamoDbAsyncClient.putItem(any(PutItemRequest.class))).thenReturn( + CompletableFuture.completedFuture(PutItemResponse.builder().build()) + ); PutDataObjectResponse response = sdkClient.putDataObjectAsync(putRequest, testThreadPool.executor(TEST_THREAD_POOL)) .toCompletableFuture() .join(); - verify(dynamoDbClient).putItem(putItemRequestArgumentCaptor.capture()); + verify(dynamoDbAsyncClient).putItem(putItemRequestArgumentCaptor.capture()); assertEquals(TEST_ID, response.id()); IndexResponse indexActionResponse = IndexResponse.fromXContent(response.parser()); @@ -185,14 +190,18 @@ public void testPutDataObject_ExistingDocument_UpdatesSequenceNumber() throws IO .tenantId(TENANT_ID) .dataObject(testDataObject) .build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn( - GetItemResponse.builder().item(Map.of(SEQ_NUM, AttributeValue.builder().n("5").build())).build() + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn( + CompletableFuture.completedFuture( + GetItemResponse.builder().item(Map.of(SEQ_NUM, AttributeValue.builder().n("5").build())).build() + ) + ); + when(dynamoDbAsyncClient.putItem(any(PutItemRequest.class))).thenReturn( + CompletableFuture.completedFuture(PutItemResponse.builder().build()) ); - when(dynamoDbClient.putItem(any(PutItemRequest.class))).thenReturn(PutItemResponse.builder().build()); PutDataObjectResponse response = sdkClient.putDataObjectAsync(putRequest, testThreadPool.executor(TEST_THREAD_POOL)) .toCompletableFuture() .join(); - verify(dynamoDbClient).putItem(putItemRequestArgumentCaptor.capture()); + verify(dynamoDbAsyncClient).putItem(putItemRequestArgumentCaptor.capture()); PutItemRequest putItemRequest = putItemRequestArgumentCaptor.getValue(); IndexResponse indexActionResponse = IndexResponse.fromXContent(response.parser()); assertEquals(6, indexActionResponse.getSeqNo()); @@ -208,10 +217,14 @@ public void testPutDataObject_ExistingDocument_DisableOverwrite() { .overwriteIfExists(false) .dataObject(testDataObject) .build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn( - GetItemResponse.builder().item(Map.of(SEQ_NUM, AttributeValue.builder().n("5").build())).build() + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn( + CompletableFuture.completedFuture( + GetItemResponse.builder().item(Map.of(SEQ_NUM, AttributeValue.builder().n("5").build())).build() + ) + ); + when(dynamoDbAsyncClient.putItem(any(PutItemRequest.class))).thenReturn( + CompletableFuture.completedFuture(PutItemResponse.builder().build()) ); - when(dynamoDbClient.putItem(any(PutItemRequest.class))).thenReturn(PutItemResponse.builder().build()); CompletableFuture response = sdkClient.putDataObjectAsync( putRequest, testThreadPool.executor(TEST_THREAD_POOL) @@ -235,9 +248,11 @@ public void testPutDataObject_WithComplexData() { .tenantId(TENANT_ID) .dataObject(complexDataObject) .build(); - when(dynamoDbClient.putItem(any(PutItemRequest.class))).thenReturn(PutItemResponse.builder().build()); + when(dynamoDbAsyncClient.putItem(any(PutItemRequest.class))).thenReturn( + CompletableFuture.completedFuture(PutItemResponse.builder().build()) + ); sdkClient.putDataObjectAsync(putRequest, testThreadPool.executor(TEST_THREAD_POOL)).toCompletableFuture().join(); - verify(dynamoDbClient).putItem(putItemRequestArgumentCaptor.capture()); + verify(dynamoDbAsyncClient).putItem(putItemRequestArgumentCaptor.capture()); PutItemRequest putItemRequest = putItemRequestArgumentCaptor.getValue(); assertEquals("testString", putItemRequest.item().get(SOURCE).m().get("testString").s()); assertEquals("123", putItemRequest.item().get(SOURCE).m().get("testNumber").n()); @@ -256,11 +271,13 @@ public void testPutDataObject_NullId_SetsDefaultTenantId() { .tenantId(TENANT_ID) .dataObject(testDataObject) .build(); - when(dynamoDbClient.putItem(any(PutItemRequest.class))).thenReturn(PutItemResponse.builder().build()); + when(dynamoDbAsyncClient.putItem(any(PutItemRequest.class))).thenReturn( + CompletableFuture.completedFuture(PutItemResponse.builder().build()) + ); PutDataObjectResponse response = sdkClient.putDataObjectAsync(putRequest, testThreadPool.executor(TEST_THREAD_POOL)) .toCompletableFuture() .join(); - verify(dynamoDbClient).putItem(putItemRequestArgumentCaptor.capture()); + verify(dynamoDbAsyncClient).putItem(putItemRequestArgumentCaptor.capture()); PutItemRequest putItemRequest = putItemRequestArgumentCaptor.getValue(); assertNotNull(putItemRequest.item().get(RANGE_KEY).s()); @@ -275,7 +292,9 @@ public void testPutDataObject_DDBException_ThrowsException() { .tenantId(TENANT_ID) .dataObject(testDataObject) .build(); - when(dynamoDbClient.putItem(any(PutItemRequest.class))).thenThrow(new RuntimeException("Test exception")); + when(dynamoDbAsyncClient.putItem(any(PutItemRequest.class))).thenReturn( + CompletableFuture.failedFuture(new RuntimeException("Test exception")) + ); CompletableFuture future = sdkClient.putDataObjectAsync( putRequest, testThreadPool.executor(TEST_THREAD_POOL) @@ -299,11 +318,11 @@ public void testGetDataObject_HappyCase() throws IOException { ) ) .build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(getItemResponse); + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn(CompletableFuture.completedFuture(getItemResponse)); GetDataObjectResponse response = sdkClient.getDataObjectAsync(getRequest, testThreadPool.executor(TEST_THREAD_POOL)) .toCompletableFuture() .join(); - verify(dynamoDbClient).getItem(getItemRequestArgumentCaptor.capture()); + verify(dynamoDbAsyncClient).getItem(getItemRequestArgumentCaptor.capture()); GetItemRequest getItemRequest = getItemRequestArgumentCaptor.getValue(); assertEquals(TEST_INDEX, getItemRequest.tableName()); assertEquals(TENANT_ID, getItemRequest.key().get(HASH_KEY).s()); @@ -334,11 +353,11 @@ public void testGetDataObject_ComplexDataObject() throws IOException { ) ) .build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(getItemResponse); + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn(CompletableFuture.completedFuture(getItemResponse)); GetDataObjectResponse response = sdkClient.getDataObjectAsync(getRequest, testThreadPool.executor(TEST_THREAD_POOL)) .toCompletableFuture() .join(); - verify(dynamoDbClient).getItem(getItemRequestArgumentCaptor.capture()); + verify(dynamoDbAsyncClient).getItem(getItemRequestArgumentCaptor.capture()); GetResponse getResponse = GetResponse.fromXContent(response.parser()); XContentParser parser = JsonXContent.jsonXContent.createParser( @@ -359,7 +378,7 @@ public void testGetDataObject_ComplexDataObject() throws IOException { public void testGetDataObject_NoExistingDoc() throws IOException { GetDataObjectRequest getRequest = GetDataObjectRequest.builder().index(TEST_INDEX).id(TEST_ID).tenantId(TENANT_ID).build(); GetItemResponse getItemResponse = GetItemResponse.builder().build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(getItemResponse); + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn(CompletableFuture.completedFuture(getItemResponse)); GetDataObjectResponse response = sdkClient.getDataObjectAsync(getRequest, testThreadPool.executor(TEST_THREAD_POOL)) .toCompletableFuture() .join(); @@ -372,9 +391,9 @@ public void testGetDataObject_NoExistingDoc() throws IOException { public void testGetDataObject_UseDefaultTenantIdIfNull() { GetDataObjectRequest getRequest = GetDataObjectRequest.builder().index(TEST_INDEX).id(TEST_ID).build(); GetItemResponse getItemResponse = GetItemResponse.builder().build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(getItemResponse); + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn(CompletableFuture.completedFuture(getItemResponse)); sdkClient.getDataObjectAsync(getRequest, testThreadPool.executor(TEST_THREAD_POOL)).toCompletableFuture().join(); - verify(dynamoDbClient).getItem(getItemRequestArgumentCaptor.capture()); + verify(dynamoDbAsyncClient).getItem(getItemRequestArgumentCaptor.capture()); GetItemRequest getItemRequest = getItemRequestArgumentCaptor.getValue(); assertEquals("DEFAULT_TENANT", getItemRequest.key().get(HASH_KEY).s()); } @@ -382,7 +401,9 @@ public void testGetDataObject_UseDefaultTenantIdIfNull() { @Test public void testGetDataObject_DDBException_ThrowsOSException() { GetDataObjectRequest getRequest = GetDataObjectRequest.builder().index(TEST_INDEX).id(TEST_ID).tenantId(TENANT_ID).build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenThrow(new RuntimeException("Test exception")); + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn( + CompletableFuture.failedFuture(new RuntimeException("Test exception")) + ); CompletableFuture future = sdkClient.getDataObjectAsync( getRequest, testThreadPool.executor(TEST_THREAD_POOL) @@ -394,8 +415,10 @@ public void testGetDataObject_DDBException_ThrowsOSException() { @Test public void testDeleteDataObject_HappyCase() throws IOException { DeleteDataObjectRequest deleteRequest = DeleteDataObjectRequest.builder().id(TEST_ID).index(TEST_INDEX).tenantId(TENANT_ID).build(); - when(dynamoDbClient.deleteItem(deleteItemRequestArgumentCaptor.capture())).thenReturn( - DeleteItemResponse.builder().attributes(Map.of(SEQ_NUM, AttributeValue.builder().n("5").build())).build() + when(dynamoDbAsyncClient.deleteItem(deleteItemRequestArgumentCaptor.capture())).thenReturn( + CompletableFuture.completedFuture( + DeleteItemResponse.builder().attributes(Map.of(SEQ_NUM, AttributeValue.builder().n("5").build())).build() + ) ); DeleteDataObjectResponse deleteResponse = sdkClient.deleteDataObjectAsync(deleteRequest, testThreadPool.executor(TEST_THREAD_POOL)) .toCompletableFuture() @@ -413,7 +436,6 @@ public void testDeleteDataObject_HappyCase() throws IOException { assertEquals(0, deleteActionResponse.getShardInfo().getFailed()); assertEquals(0, deleteActionResponse.getShardInfo().getSuccessful()); assertEquals(0, deleteActionResponse.getShardInfo().getTotal()); - } @Test @@ -436,8 +458,10 @@ public void testUpdateDataObjectAsync_HappyCase() { ) ) .build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(getItemResponse); - when(dynamoDbClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenReturn(UpdateItemResponse.builder().build()); + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn(CompletableFuture.completedFuture(getItemResponse)); + when(dynamoDbAsyncClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenReturn( + CompletableFuture.completedFuture(UpdateItemResponse.builder().build()) + ); UpdateDataObjectResponse updateResponse = sdkClient.updateDataObjectAsync(updateRequest, testThreadPool.executor(TEST_THREAD_POOL)) .toCompletableFuture() .join(); @@ -472,9 +496,11 @@ public void testUpdateDataObjectAsync_HappyCaseWithMap() throws Exception { ) ) .build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(getItemResponse); - when(dynamoDbClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenReturn( - UpdateItemResponse.builder().attributes(Map.of(SEQ_NUM, AttributeValue.builder().n("5").build())).build() + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn(CompletableFuture.completedFuture(getItemResponse)); + when(dynamoDbAsyncClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenReturn( + CompletableFuture.completedFuture( + UpdateItemResponse.builder().attributes(Map.of(SEQ_NUM, AttributeValue.builder().n("5").build())).build() + ) ); UpdateDataObjectResponse updateResponse = sdkClient.updateDataObjectAsync(updateRequest, testThreadPool.executor(TEST_THREAD_POOL)) .toCompletableFuture() @@ -513,8 +539,10 @@ public void testUpdateDataObjectAsync_NullTenantId_UsesDefaultTenantId() { ) ) .build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(getItemResponse); - when(dynamoDbClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenReturn(UpdateItemResponse.builder().build()); + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn(CompletableFuture.completedFuture(getItemResponse)); + when(dynamoDbAsyncClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenReturn( + CompletableFuture.completedFuture(UpdateItemResponse.builder().build()) + ); sdkClient.updateDataObjectAsync(updateRequest, testThreadPool.executor(TEST_THREAD_POOL)).toCompletableFuture().join(); UpdateItemRequest updateItemRequest = updateItemRequestArgumentCaptor.getValue(); assertEquals(TENANT_ID, updateItemRequest.key().get(HASH_KEY).s()); @@ -528,7 +556,9 @@ public void testUpdateDataObject_DDBException_ThrowsException() { .tenantId(TENANT_ID) .dataObject(testDataObject) .build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenThrow(new RuntimeException("Test exception")); + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn( + CompletableFuture.failedFuture(new RuntimeException("Test exception")) + ); CompletableFuture future = sdkClient.updateDataObjectAsync( updateRequest, testThreadPool.executor(TEST_THREAD_POOL) @@ -557,9 +587,11 @@ public void testUpdateDataObject_VersionCheck() { ) ) .build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(getItemResponse); + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn(CompletableFuture.completedFuture(getItemResponse)); ConditionalCheckFailedException conflictException = ConditionalCheckFailedException.builder().build(); - when(dynamoDbClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenThrow(conflictException); + when(dynamoDbAsyncClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenReturn( + CompletableFuture.failedFuture(conflictException) + ); CompletableFuture future = sdkClient.updateDataObjectAsync( updateRequest, @@ -589,12 +621,14 @@ public void updateDataObjectAsync_VersionCheckRetrySuccess() { ) ) .build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(getItemResponse); + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn(CompletableFuture.completedFuture(getItemResponse)); ConditionalCheckFailedException conflictException = ConditionalCheckFailedException.builder().build(); // throw conflict exception on first time, return on second time, throw on third time (never get here) - when(dynamoDbClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenThrow(conflictException) - .thenReturn(UpdateItemResponse.builder().build()) - .thenThrow(conflictException); + when(dynamoDbAsyncClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenReturn( + CompletableFuture.failedFuture(conflictException) + ) + .thenReturn(CompletableFuture.completedFuture(UpdateItemResponse.builder().build())) + .thenReturn(CompletableFuture.failedFuture(conflictException)); UpdateDataObjectResponse updateResponse = sdkClient.updateDataObjectAsync(updateRequest, testThreadPool.executor(TEST_THREAD_POOL)) .toCompletableFuture() .join(); @@ -624,12 +658,14 @@ public void updateDataObjectAsync_VersionCheckRetryFailure() { ) ) .build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(getItemResponse); + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn(CompletableFuture.completedFuture(getItemResponse)); ConditionalCheckFailedException conflictException = ConditionalCheckFailedException.builder().build(); // throw conflict exception on first two times, return on third time (that never executes) - when(dynamoDbClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenThrow(conflictException) - .thenThrow(conflictException) - .thenReturn(UpdateItemResponse.builder().build()); + when(dynamoDbAsyncClient.updateItem(updateItemRequestArgumentCaptor.capture())).thenReturn( + CompletableFuture.failedFuture(conflictException) + ) + .thenReturn(CompletableFuture.failedFuture(conflictException)) + .thenReturn(CompletableFuture.completedFuture(UpdateItemResponse.builder().build())); CompletableFuture future = sdkClient.updateDataObjectAsync( updateRequest, @@ -662,7 +698,9 @@ public void testBulkDataObject_HappyCase() { .add(updateRequest) .add(deleteRequest); - when(dynamoDbClient.putItem(any(PutItemRequest.class))).thenReturn(PutItemResponse.builder().build()); + when(dynamoDbAsyncClient.putItem(any(PutItemRequest.class))).thenReturn( + CompletableFuture.completedFuture(PutItemResponse.builder().build()) + ); GetItemResponse getItemResponse = GetItemResponse.builder() .item( Map.ofEntries( @@ -671,9 +709,13 @@ public void testBulkDataObject_HappyCase() { ) ) .build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(getItemResponse); - when(dynamoDbClient.updateItem(any(UpdateItemRequest.class))).thenReturn(UpdateItemResponse.builder().build()); - when(dynamoDbClient.deleteItem(any(DeleteItemRequest.class))).thenReturn(DeleteItemResponse.builder().build()); + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn(CompletableFuture.completedFuture(getItemResponse)); + when(dynamoDbAsyncClient.updateItem(any(UpdateItemRequest.class))).thenReturn( + CompletableFuture.completedFuture(UpdateItemResponse.builder().build()) + ); + when(dynamoDbAsyncClient.deleteItem(any(DeleteItemRequest.class))).thenReturn( + CompletableFuture.completedFuture(DeleteItemResponse.builder().build()) + ); BulkDataObjectResponse response = sdkClient.bulkDataObjectAsync(bulkRequest, testThreadPool.executor(TEST_THREAD_POOL)) .toCompletableFuture() @@ -711,7 +753,9 @@ public void testBulkDataObject_WithFailures() { .add(updateRequest) .add(deleteRequest); - when(dynamoDbClient.putItem(any(PutItemRequest.class))).thenReturn(PutItemResponse.builder().build()); + when(dynamoDbAsyncClient.putItem(any(PutItemRequest.class))).thenReturn( + CompletableFuture.completedFuture(PutItemResponse.builder().build()) + ); GetItemResponse getItemResponse = GetItemResponse.builder() .item( Map.ofEntries( @@ -720,10 +764,12 @@ public void testBulkDataObject_WithFailures() { ) ) .build(); - when(dynamoDbClient.getItem(any(GetItemRequest.class))).thenReturn(getItemResponse); + when(dynamoDbAsyncClient.getItem(any(GetItemRequest.class))).thenReturn(CompletableFuture.completedFuture(getItemResponse)); Exception cause = new OpenSearchStatusException("Update failed with conflict", RestStatus.CONFLICT); - when(dynamoDbClient.updateItem(any(UpdateItemRequest.class))).thenThrow(cause); - when(dynamoDbClient.deleteItem(any(DeleteItemRequest.class))).thenReturn(DeleteItemResponse.builder().build()); + when(dynamoDbAsyncClient.updateItem(any(UpdateItemRequest.class))).thenReturn(CompletableFuture.failedFuture(cause)); + when(dynamoDbAsyncClient.deleteItem(any(DeleteItemRequest.class))).thenReturn( + CompletableFuture.completedFuture(DeleteItemResponse.builder().build()) + ); BulkDataObjectResponse response = sdkClient.bulkDataObjectAsync(bulkRequest, testThreadPool.executor(TEST_THREAD_POOL)) .toCompletableFuture()