Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Cancel Import Request #4702

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -57,7 +58,6 @@ public CancelImportRequestHandlerTests()

[Theory]
[InlineData(JobStatus.Completed)]
[InlineData(JobStatus.Cancelled)]
[InlineData(JobStatus.Failed)]
public async Task GivenAFhirMediator_WhenCancelingExistingBulkImportJobThatHasAlreadyCompleted_ThenConflictStatusCodeShouldBeReturned(JobStatus taskStatus)
{
Expand All @@ -66,14 +66,26 @@ public async Task GivenAFhirMediator_WhenCancelingExistingBulkImportJobThatHasAl
Assert.Equal(HttpStatusCode.Conflict, operationFailedException.ResponseStatusCode);
}

[Theory]
[InlineData(JobStatus.Cancelled)]
public async Task GivenAFhirMediator_WhenCancelingExistingBulkImportJobThatHasAlreadyBeenCanceled_ThenAcceptedStatusCodeShouldBeReturned(JobStatus taskStatus)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new behavior, because if a tastk has been already canceled, then we will return accepted because we will continue trying to cancel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A change in behavior may require giving customers 1 month notice before deploying to prod. We should check w./ PM team

{
SetupBulkImportJob(taskStatus, true);
CancelImportResponse response = await _mediator.CancelImportAsync(JobId, _cancellationToken);

Assert.NotNull(response);
Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
}

[Theory]
[InlineData(JobStatus.Created)]
[InlineData(JobStatus.Running)]
public async Task GivenAFhirMediator_WhenCancelingExistingBulkImportJobThatHasNotCompleted_ThenAcceptedStatusCodeShouldBeReturned(JobStatus jobStatus)
{
JobInfo jobInfo = await SetupAndExecuteCancelImportAsync(jobStatus, HttpStatusCode.Accepted);
List<JobInfo> jobs = await SetupAndExecuteCancelImportAsync(jobStatus, HttpStatusCode.Accepted);

await _queueClient.Received(1).CancelJobByGroupIdAsync((byte)Core.Features.Operations.QueueType.Import, jobInfo.GroupId, _cancellationToken);
var groupJobId = jobs[0].GroupId;
await _queueClient.Received(1).CancelJobByGroupIdAsync((byte)Core.Features.Operations.QueueType.Import, groupJobId, _cancellationToken);
}

[Fact]
Expand All @@ -83,32 +95,35 @@ public async Task GivenAFhirMediator_WhenCancelingWithNotExistJob_ThenNotFoundSh
await Assert.ThrowsAsync<ResourceNotFoundException>(async () => await _mediator.CancelImportAsync(JobId, _cancellationToken));
}

private async Task<JobInfo> SetupAndExecuteCancelImportAsync(JobStatus jobStatus, HttpStatusCode expectedStatusCode, bool isCanceled = false)
private async Task<List<JobInfo>> SetupAndExecuteCancelImportAsync(JobStatus jobStatus, HttpStatusCode expectedStatusCode, bool isCanceled = false)
{
JobInfo jobInfo = SetupBulkImportJob(jobStatus, isCanceled);
List<JobInfo> jobs = SetupBulkImportJob(jobStatus, isCanceled);

CancelImportResponse response = await _mediator.CancelImportAsync(JobId, _cancellationToken);

Assert.NotNull(response);
Assert.Equal(expectedStatusCode, response.StatusCode);

return jobInfo;
return jobs;
}

private JobInfo SetupBulkImportJob(JobStatus jobStatus, bool isCanceled)
private List<JobInfo> SetupBulkImportJob(JobStatus jobStatus, bool isCanceled)
{
var jobInfo = new JobInfo
var jobs = new List<JobInfo>()
{
Id = JobId,
GroupId = JobId,
Status = jobStatus,
Definition = string.Empty,
CancelRequested = isCanceled,
new JobInfo
{
Id = JobId,
GroupId = JobId,
Status = jobStatus,
Definition = string.Empty,
CancelRequested = isCanceled,
},
};

_queueClient.GetJobByIdAsync(Arg.Any<byte>(), JobId, Arg.Any<bool>(), _cancellationToken).Returns(jobInfo);
_queueClient.GetJobByGroupIdAsync(Arg.Any<byte>(), JobId, Arg.Any<bool>(), _cancellationToken).Returns(jobs);

return jobInfo;
return jobs;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -43,19 +45,43 @@ public async Task<CancelImportResponse> Handle(CancelImportRequest request, Canc
throw new UnauthorizedFhirActionException();
}

JobInfo jobInfo = await _queueClient.GetJobByIdAsync(QueueType.Import, request.JobId, false, cancellationToken);
// We need to check the status of all jobs
IReadOnlyList<JobInfo> jobs = await _queueClient.GetJobByGroupIdAsync(QueueType.Import, request.JobId, false, cancellationToken);

if (jobInfo == null)
if (jobs.Count == 0)
{
throw new ResourceNotFoundException(string.Format(Core.Resources.ImportJobNotFound, request.JobId));
}

if (jobInfo.Status == JobManagement.JobStatus.Completed || jobInfo.Status == JobManagement.JobStatus.Cancelled || jobInfo.Status == JobManagement.JobStatus.Failed)
Copy link
Contributor Author

@abiisnn abiisnn Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check that now we are not returning CONFLICT if the job has been already cancelled.

Now, if the job has been already cancelled, we will retry cancel, so we will return ACCEPTED, as Sergey suggested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just need to check w/ PM on behavior change

var anyFailed = false;
var allComplete = true;

// Check each job status
foreach (var job in jobs)
{
if (job.Status == JobStatus.Failed)
{
anyFailed = true;
break;
}

if (job.Status != JobStatus.Completed)
{
allComplete = false;
break;
}
}

// If the job is already completed or failed, return conflict status.
if (anyFailed || allComplete)
{
throw new OperationFailedException(Core.Resources.ImportOperationCompleted, HttpStatusCode.Conflict);
abiisnn marked this conversation as resolved.
Show resolved Hide resolved
}

await _queueClient.CancelJobByGroupIdAsync(QueueType.Import, jobInfo.GroupId, cancellationToken);
// Try to cancel the job
_logger.LogInformation("Attempting to cancel import job {JobId}", request.JobId);
await _queueClient.CancelJobByGroupIdAsync(QueueType.Import, request.JobId, cancellationToken);

return new CancelImportResponse(HttpStatusCode.Accepted);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public CancelImportRequest(long jobId)
}

/// <summary>
/// Import orchestrator job id
/// Import orchestrator/coordinator job id this is also known as Group Id
/// </summary>
public long JobId { get; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1494,17 +1494,81 @@ public async Task GivenImportWithCancel_ThenTaskShouldBeCanceled()
};

Uri checkLocation = await ImportTestHelper.CreateImportTaskAsync(_client, request);
var respone = await _client.CancelImport(checkLocation);

// wait task completed
while (respone.StatusCode != HttpStatusCode.Conflict)
{
respone = await _client.CancelImport(checkLocation);
await Task.Delay(TimeSpan.FromSeconds(0.2));
}
// Then we cancel import job
var response = await _client.CancelImport(checkLocation);

// The service should accept the cancel request
Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);

// We try to cancel the same job again, it should return Accepted
response = await _client.CancelImport(checkLocation);
Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);

// We get the Import status
FhirClientException fhirException = await Assert.ThrowsAsync<FhirClientException>(async () => await _client.CheckImportAsync(checkLocation));
Assert.Equal(HttpStatusCode.BadRequest, fhirException.StatusCode);
Assert.Contains("User requested cancellation", fhirException.Message);
}

[Fact]
public async Task GivenImportHasCompleted_WhenCancel_ThenTaskShouldReturnConflict()
{
_metricHandler?.ResetCount();
string patientNdJsonResource = Samples.GetNdJson("Import-Patient");
patientNdJsonResource = Regex.Replace(patientNdJsonResource, "##PatientID##", m => Guid.NewGuid().ToString("N"));
(Uri location, string etag) = await ImportTestHelper.UploadFileAsync(patientNdJsonResource, _fixture.StorageAccount);

var request = new ImportRequest()
{
InputFormat = "application/fhir+ndjson",
InputSource = new Uri("https://other-server.example.org"),
StorageDetail = new ImportRequestStorageDetail() { Type = "azure-blob" },
Input = new List<InputResource>()
{
new InputResource()
{
Url = location,
Etag = etag,
Type = "Patient",
},
},
Mode = ImportMode.InitialLoad.ToString(),
};

Uri checkLocation = await ImportTestHelper.CreateImportTaskAsync(_client, request);

// Wait for import job to complete
var importStatus = await _client.CheckImportAsync(checkLocation);

// To avoid an infinite loop, we will try 5 times to get the completed status
// Which we expect to finish because we are importing a single job
for (int i = 0; i < 5; i++)
{
if (importStatus.StatusCode == HttpStatusCode.OK)
{
break;
}

importStatus = await _client.CheckImportAsync(checkLocation);
await Task.Delay(TimeSpan.FromSeconds(5));
}

// Then we cancel import job
var response = await _client.CancelImport(checkLocation);

// The service should return conflict because Import has already completed
Assert.Equal(HttpStatusCode.Conflict, response.StatusCode);

// We try to cancel the same job again, it should return Conflict
// We add this retry, in case customer send multiple cancel requests
// We need to make sure the server returns Conflict
response = await _client.CancelImport(checkLocation);
Assert.Equal(HttpStatusCode.Conflict, response.StatusCode);

// We get the Import status and it should return OK because Import completed
importStatus = await _client.CheckImportAsync(checkLocation);
Assert.Equal(HttpStatusCode.OK, importStatus.StatusCode);
}

[Fact(Skip = "long running tests for invalid url")]
Expand Down
Loading