Skip to content

Commit

Permalink
Merge pull request #352 from conductor-oss/fix/join-latency-issues
Browse files Browse the repository at this point in the history
Reduce FORK/JOIN latency
  • Loading branch information
v1r3n authored Jan 7, 2025
2 parents fa9420f + 95e8c12 commit 8a6c3f7
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.springframework.util.unit.DataSize;
import org.springframework.util.unit.DataUnit;

import com.netflix.conductor.model.TaskModel;

@ConfigurationProperties("conductor.app")
public class ConductorProperties {

Expand Down Expand Up @@ -226,6 +228,24 @@ public class ConductorProperties {
/** Used to limit the size of task execution logs. */
private int taskExecLogSizeLimit = 10;

/**
* This property defines the number of poll counts (executions) after which SystemTasks
* implementing getEvaluationOffset should begin postponing the next execution.
*
* @see
* com.netflix.conductor.core.execution.tasks.WorkflowSystemTask#getEvaluationOffset(TaskModel,
* long)
* @see com.netflix.conductor.core.execution.tasks.Join#getEvaluationOffset(TaskModel, long)
*/
private int systemTaskPostponeThreshold = 200;

/**
* Timeout used by {@link com.netflix.conductor.core.execution.tasks.SystemTaskWorker} when
* polling, i.e.: call to {@link com.netflix.conductor.dao.QueueDAO#pop(String, int, int)}.
*/
@DurationUnit(ChronoUnit.MILLIS)
private Duration systemTaskQueuePopTimeout = Duration.ofMillis(100);

public String getStack() {
return stack;
}
Expand Down Expand Up @@ -567,4 +587,20 @@ public Map<String, Object> getAll() {
props.forEach((key, value) -> map.put(key.toString(), value));
return map;
}

public void setSystemTaskPostponeThreshold(int systemTaskPostponeThreshold) {
this.systemTaskPostponeThreshold = systemTaskPostponeThreshold;
}

public int getSystemTaskPostponeThreshold() {
return systemTaskPostponeThreshold;
}

public Duration getSystemTaskQueuePopTimeout() {
return systemTaskQueuePopTimeout;
}

public void setSystemTaskQueuePopTimeout(Duration systemTaskQueuePopTimeout) {
this.systemTaskQueuePopTimeout = systemTaskQueuePopTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import org.springframework.stereotype.Component;

import com.netflix.conductor.annotations.VisibleForTesting;
import com.netflix.conductor.common.utils.TaskUtils;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;
Expand All @@ -29,8 +31,13 @@
@Component(TASK_TYPE_JOIN)
public class Join extends WorkflowSystemTask {

public Join() {
@VisibleForTesting static final double EVALUATION_OFFSET_BASE = 1.2;

private final ConductorProperties properties;

public Join(ConductorProperties properties) {
super(TASK_TYPE_JOIN);
this.properties = properties;
}

@Override
Expand Down Expand Up @@ -117,12 +124,17 @@ public boolean execute(
}

@Override
public Optional<Long> getEvaluationOffset(TaskModel taskModel, long defaultOffset) {
int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0;
if (index == 0) {
public Optional<Long> getEvaluationOffset(TaskModel taskModel, long maxOffset) {
int pollCount = taskModel.getPollCount();
// Assuming pollInterval = 50ms and evaluationOffsetThreshold = 200 this will cause
// a JOIN task to be evaluated continuously during the first 10 seconds and the FORK/JOIN
// will end with minimal delay.
if (pollCount <= properties.getSystemTaskPostponeThreshold()) {
return Optional.of(0L);
}
return Optional.of(Math.min((long) Math.pow(2, index), defaultOffset));

double exp = pollCount - properties.getSystemTaskPostponeThreshold();
return Optional.of(Math.min((long) Math.pow(EVALUATION_OFFSET_BASE, exp), maxOffset));
}

public boolean isAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class SystemTaskWorker extends LifecycleAwareComponent {
private final AsyncSystemTaskExecutor asyncSystemTaskExecutor;
private final ConductorProperties properties;
private final ExecutionService executionService;
private final int queuePopTimeout;

ConcurrentHashMap<String, ExecutionConfig> queueExecutionConfigMap = new ConcurrentHashMap<>();

Expand All @@ -67,6 +68,7 @@ public SystemTaskWorker(
this.queueDAO = queueDAO;
this.pollInterval = properties.getSystemTaskWorkerPollInterval().toMillis();
this.executionService = executionService;
this.queuePopTimeout = (int) properties.getSystemTaskQueuePopTimeout().toMillis();

LOGGER.info("SystemTaskWorker initialized with {} threads", threadCount);
}
Expand Down Expand Up @@ -114,7 +116,8 @@ void pollAndExecute(WorkflowSystemTask systemTask, String queueName) {

LOGGER.debug("Polling queue: {} with {} slots acquired", queueName, messagesToAcquire);

List<String> polledTaskIds = queueDAO.pop(queueName, messagesToAcquire, 200);
List<String> polledTaskIds =
queueDAO.pop(queueName, messagesToAcquire, queuePopTimeout);

Monitors.recordTaskPoll(queueName);
LOGGER.debug("Polling queue:{}, got {} tasks", queueName, polledTaskIds.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,19 @@ public boolean execute(
*/
public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {}

public Optional<Long> getEvaluationOffset(TaskModel taskModel, long defaultOffset) {
/**
* Determines the time in seconds by which the next execution of a task will be postponed after
* an execution. By default, this method returns {@code Optional.empty()}.
*
* <p>WorkflowSystemTasks may override this method to define a custom evaluation offset based on
* the task's behavior or requirements.
*
* @param taskModel task model
* @param maxOffset the max recommended offset value to use
* @return an {@code Optional<Long>} specifying the evaluation offset in seconds, or {@code
* Optional.empty()} if no postponement is required
*/
public Optional<Long> getEvaluationOffset(TaskModel taskModel, long maxOffset) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public Switch switchTask() {

@Bean(TASK_TYPE_JOIN)
public Join join() {
return new Join();
return new Join(new ConductorProperties());
}

@Bean
Expand Down Expand Up @@ -595,7 +595,8 @@ public void testOptionalWithDynamicFork() {

assertEquals(TaskModel.Status.SCHEDULED, outcome.tasksToBeScheduled.get(0).getStatus());
System.out.println(outcome.tasksToBeScheduled.get(0));
new Join().execute(workflow, outcome.tasksToBeScheduled.get(0), null);
new Join(new ConductorProperties())
.execute(workflow, outcome.tasksToBeScheduled.get(0), null);
assertEquals(TaskModel.Status.COMPLETED, outcome.tasksToBeScheduled.get(0).getStatus());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.junit.Test;

import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;
Expand All @@ -27,6 +28,9 @@
import static org.mockito.Mockito.mock;

public class TestJoin {

private final ConductorProperties properties = new ConductorProperties();

private final WorkflowExecutor executor = mock(WorkflowExecutor.class);

private TaskModel createTask(
Expand Down Expand Up @@ -65,7 +69,7 @@ public void testShouldNotMarkJoinAsCompletedWithErrorsWhenNotDone() {
// task2 is not scheduled yet, so the join is not completed
var wfJoinPair = createJoinWorkflow(List.of(task1), "task2");

var join = new Join();
var join = new Join(properties);
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertFalse(result);
}
Expand All @@ -77,7 +81,7 @@ public void testJoinCompletesSuccessfullyWhenAllTasksSucceed() {

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var join = new Join(properties);
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertTrue("Join task should execute successfully when all tasks succeed", result);
assertEquals(
Expand All @@ -93,7 +97,7 @@ public void testJoinWaitsWhenAnyTaskIsNotTerminal() {

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var join = new Join(properties);
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertFalse("Join task should wait when any task is not in terminal state", result);
}
Expand All @@ -107,7 +111,7 @@ public void testJoinFailsWhenMandatoryTaskFails() {

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var join = new Join(properties);
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertTrue("Join task should be executed when a mandatory task fails", result);
assertEquals(
Expand All @@ -125,7 +129,7 @@ public void testJoinCompletesWithErrorsWhenOnlyOptionalTasksFail() {

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var join = new Join(properties);
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertTrue("Join task should be executed when only optional tasks fail", result);
assertEquals(
Expand All @@ -143,7 +147,7 @@ public void testJoinAggregatesFailureReasonsCorrectly() {

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var join = new Join(properties);
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertTrue("Join task should be executed when tasks fail", result);
assertEquals(
Expand Down Expand Up @@ -174,7 +178,7 @@ public void testJoinWaitsForAllTasksBeforeFailingDueToPermissiveTaskFailure() {
var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

// First execution: Task 2 is not yet terminal.
var join = new Join();
var join = new Join(properties);
boolean result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertFalse("Join task should wait as not all tasks are terminal", result);

Expand All @@ -189,4 +193,33 @@ public void testJoinWaitsForAllTasksBeforeFailingDueToPermissiveTaskFailure() {
TaskModel.Status.FAILED,
wfJoinPair.getRight().getStatus());
}

@Test
public void testEvaluationOffsetWhenPollCountIsBelowThreshold() {
var join = new Join(properties);
var taskModel = createTask("join1", TaskModel.Status.COMPLETED, false, false);
taskModel.setPollCount(properties.getSystemTaskPostponeThreshold() - 1);
var opt = join.getEvaluationOffset(taskModel, 30L);
assertEquals(0L, (long) opt.orElseThrow());
}

@Test
public void testEvaluationOffsetWhenPollCountIsAboveThreshold() {
final var maxOffset = 30L;
var join = new Join(properties);
var taskModel = createTask("join1", TaskModel.Status.COMPLETED, false, false);

taskModel.setPollCount(properties.getSystemTaskPostponeThreshold() + 1);
var opt = join.getEvaluationOffset(taskModel, maxOffset);
assertEquals(1L, (long) opt.orElseThrow());

taskModel.setPollCount(properties.getSystemTaskPostponeThreshold() + 10);
opt = join.getEvaluationOffset(taskModel, maxOffset);
long expected = (long) Math.pow(Join.EVALUATION_OFFSET_BASE, 10);
assertEquals(expected, (long) opt.orElseThrow());

taskModel.setPollCount(properties.getSystemTaskPostponeThreshold() + 40);
opt = join.getEvaluationOffset(taskModel, maxOffset);
assertEquals(maxOffset, (long) opt.orElseThrow());
}
}

0 comments on commit 8a6c3f7

Please sign in to comment.