Skip to content

Commit

Permalink
Revert to old job status check for standard tasks and use task status…
Browse files Browse the repository at this point in the history
… only for array tasks

Signed-off-by: jorgee <[email protected]>
  • Loading branch information
jorgee committed Jan 29, 2025
1 parent e60d4b9 commit 959dd2b
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package nextflow.cloud.google.batch

import com.google.api.gax.rpc.NotFoundException
import com.google.cloud.batch.v1.JobStatus
import com.google.cloud.batch.v1.Task

import java.nio.file.Path
Expand Down Expand Up @@ -94,6 +95,11 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
*/
private String uid

/**
* Flag to indicate if task belong to an TaskRunArray
*/
private boolean belongsToArray

/**
* Task state assigned by Google Batch service
*/
Expand Down Expand Up @@ -183,19 +189,20 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` submitted > job=$jobId; uid=$uid; work-dir=${task.getWorkDirStr()}"
}

protected void updateStatus(String jobId, String taskId, String uid) {
protected void updateStatus(String jobId, String taskId, String uid, boolean belongsToArray = false) {
if( task instanceof TaskArrayRun ) {
// update status for children
for( int i=0; i<task.children.size(); i++ ) {
final handler = task.children[i] as GoogleBatchTaskHandler
final arrayTaskId = executor.getArrayTaskId(jobId, i)
handler.updateStatus(jobId, arrayTaskId, uid)
handler.updateStatus(jobId, arrayTaskId, uid, true)
}
}
else {
this.jobId = jobId
this.taskId = taskId
this.uid = uid
this.belongsToArray = belongsToArray
this.status = TaskStatus.SUBMITTED
}
}
Expand Down Expand Up @@ -454,10 +461,17 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
* @return Retrieve the submitted task state
*/
protected String getTaskState() {
if (belongsToArray) {
return getStateFromTaskStatus()
} else {
return getStateFromJobStatus()
}
}

protected String getStateFromTaskStatus() {
final tasks = client.listTasks(jobId)
if( !tasks.iterator().hasNext() ) {
// if there are no tasks checks the job status
return checkJobStatus()
return getStateFromJobStatus()
}
final now = System.currentTimeMillis()
final delta = now - timestamp;
Expand All @@ -472,6 +486,16 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
return taskState
}

protected String getStateFromJobStatus() {
final now = System.currentTimeMillis()
final delta = now - timestamp;
if( !taskState || delta >= 1_000) {
final status = client.getJobStatus(jobId)
inspectJobStatus(status)
}
return taskState
}

private void inspectTaskStatus(com.google.cloud.batch.v1.TaskStatus status) {
final newState = status?.state as String
if (newState) {
Expand All @@ -489,36 +513,39 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {

protected String manageNotFound( Iterable<Task> tasks) {
// If task is array, check if the in the task list
if (tasks.size() > 1) {
for (Task t in tasks) {
if (t.name == client.generateTaskName(jobId, taskId)) {
inspectTaskStatus(t.status)
return taskState
}
for (Task t in tasks) {
if (t.name == client.generateTaskName(jobId, taskId)) {
inspectTaskStatus(t.status)
return taskState
}
}
// if not array or it task is not in the list, check job status.
checkJobStatus()
final status = client.getJobStatus(jobId)
inspectJobStatus(status)
return taskState
}

protected String checkJobStatus() {
final jobStatus = client.getJobStatus(jobId)
final newState = jobStatus?.state as String
protected String inspectJobStatus(JobStatus status) {
final newState = status?.state as String
if (newState) {
log.trace "[GOOGLE BATCH] Get job=$jobId state=$newState"
taskState = newState
timestamp = System.currentTimeMillis()
if (newState == "FAILED") {
noTaskJobfailure = true
}
return taskState
} else {
return "PENDING"
}
if (newState == 'SCHEDULED') {
final eventsCount = status.getStatusEventsCount()
final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null
if (lastEvent?.getDescription()?.contains('CODE_GCE_QUOTA_EXCEEDED'))
log.warn1 "Batch job cannot be run: ${lastEvent.getDescription()}"
}
}

static private final List<String> RUNNING_OR_COMPLETED = ['RUNNING', 'SUCCEEDED', 'FAILED']
static private final List<String> RUNNING_OR_COMPLETED = ['RUNNING', 'SUCCEEDED', 'FAILED', 'DELETION_IN_PROGRESS']

static private final List<String> COMPLETED = ['SUCCEEDED', 'FAILED']
static private final List<String> COMPLETED = ['SUCCEEDED', 'FAILED', 'DELETION_IN_PROGRESS']

@Override
boolean checkIfRunning() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ class GoogleBatchTaskHandlerTest extends Specification {
builder.build()
}

def 'should check job status when no tasks in job '() {
def 'should check job status when no tasks in task array '() {

given:
def jobId = 'job-id'
Expand All @@ -609,7 +609,7 @@ class GoogleBatchTaskHandlerTest extends Specification {
def task = Mock(TaskRun) {
lazyName() >> 'foo (1)'
}
def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task))
def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task, belongsToArray: true))
final message = 'Job failed when Batch tries to schedule it: Batch Error: code - CODE_MACHINE_TYPE_NOT_FOUND'
when:
client.listTasks(jobId) >>> [new LinkedList<Task>(), new LinkedList<Task>()]
Expand All @@ -619,20 +619,20 @@ class GoogleBatchTaskHandlerTest extends Specification {
makeJobStatus(JobStatus.State.FAILED, message)
]
then:
handler.getTaskState() == "PENDING"
handler.getTaskState() == null
handler.getTaskState() == "FAILED"
handler.getJobError().message == message
}

def 'should manage not found when getting task state '() {
def 'should manage not found when getting task state in task array'() {
given:
def jobId = '1'
def taskId = '1'
def client = Mock(BatchClient)
def task = Mock(TaskRun) {
lazyName() >> 'foo (1)'
}
def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task))
def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task, belongsToArray: true))

when:
client.generateTaskName(jobId, taskId) >> "$jobId/group0/$taskId"
Expand Down

0 comments on commit 959dd2b

Please sign in to comment.