From a021d1d272cc86fc124b4b3b9eb27999723c8655 Mon Sep 17 00:00:00 2001 From: Fabrice Bibonne Date: Mon, 5 Feb 2024 22:05:59 +0100 Subject: [PATCH] feat(step name unicity) #3757 the names of different steps in a job must be different --- .../batch/core/job/AbstractJob.java | 30 +- .../batch/core/job/SimpleJob.java | 11 +- .../batch/core/job/flow/FlowJob.java | 55 +- .../core/job/ExtendedAbstractJobTests.java | 7 + .../core/job/builder/FlowJobBuilderTests.java | 802 +++++++++--------- .../support/SimpleJobOperatorTests.java | 5 + 6 files changed, 485 insertions(+), 425 deletions(-) diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java b/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java index 34d6d19f58..3b98aafb2b 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java @@ -17,8 +17,7 @@ package org.springframework.batch.core.job; import java.time.LocalDateTime; -import java.util.Collection; -import java.util.List; +import java.util.*; import java.util.stream.Collectors; import io.micrometer.core.instrument.LongTaskTimer; @@ -43,6 +42,7 @@ import org.springframework.batch.core.StartLimitExceededException; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.job.builder.AlreadyUsedStepNameException; import org.springframework.batch.core.launch.NoSuchJobException; import org.springframework.batch.core.launch.support.ExitCodeMapper; import org.springframework.batch.core.listener.CompositeJobExecutionListener; @@ -300,6 +300,7 @@ public final void execute(JobExecution execution) { execution.setStartTime(LocalDateTime.now()); updateStatus(execution, BatchStatus.STARTED); + checkStepNamesUnicity(); listener.beforeJob(execution); @@ -368,9 +369,23 @@ public final void execute(JobExecution execution) { finally { JobSynchronizationManager.release(); } - } + } + + protected abstract void checkStepNamesUnicity() throws AlreadyUsedStepNameException ; + private Optional findFirstDoubleElementInList(List strings) { + if (strings==null){ + return Optional.empty(); + } + Set alreadyChecked=new HashSet<>(); + for (String value:strings){ + if (alreadyChecked.contains(value)){ + return Optional.of(value); + } + alreadyChecked.add(value); + } + return Optional.empty(); } private void stopObservation(JobExecution execution, Observation observation) { @@ -430,6 +445,15 @@ else if (ex instanceof NoSuchJobException || ex.getCause() instanceof NoSuchJobE return exitStatus; } + protected static void addToMapCheckingUnicity(Map map, Step step, String name) throws AlreadyUsedStepNameException { + map.merge(name, step, (old, value)->{ + if (!old.equals(value)){ + throw new AlreadyUsedStepNameException(name); + } + return old; + }); + } + private void updateStatus(JobExecution jobExecution, BatchStatus status) { jobExecution.setStatus(status); jobRepository.update(jobExecution); diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/job/SimpleJob.java b/spring-batch-core/src/main/java/org/springframework/batch/core/job/SimpleJob.java index b22317ef28..94aa2911de 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/job/SimpleJob.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/job/SimpleJob.java @@ -16,9 +16,7 @@ package org.springframework.batch.core.job; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import java.util.*; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.Job; @@ -27,6 +25,7 @@ import org.springframework.batch.core.StartLimitExceededException; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.job.builder.AlreadyUsedStepNameException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.batch.core.step.StepLocator; @@ -145,4 +144,10 @@ protected void doExecute(JobExecution execution) } } + @Override + protected void checkStepNamesUnicity() throws AlreadyUsedStepNameException { + Map map = new HashMap<>(); + steps.forEach(step->{addToMapCheckingUnicity(map, step, step.getName());}); + } + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/job/flow/FlowJob.java b/spring-batch-core/src/main/java/org/springframework/batch/core/job/flow/FlowJob.java index 33e2f491fe..37405db252 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/job/flow/FlowJob.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/job/flow/FlowJob.java @@ -15,19 +15,20 @@ */ package org.springframework.batch.core.job.flow; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionException; import org.springframework.batch.core.Step; import org.springframework.batch.core.job.AbstractJob; import org.springframework.batch.core.job.SimpleStepHandler; +import org.springframework.batch.core.job.builder.AlreadyUsedStepNameException; import org.springframework.batch.core.step.StepHolder; import org.springframework.batch.core.step.StepLocator; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + /** * Implementation of the {@link Job} interface that allows for complex flows of steps, * rather than requiring sequential execution. In general, this job implementation was @@ -74,40 +75,47 @@ public void setFlow(Flow flow) { */ @Override public Step getStep(String stepName) { - if (!initialized) { - init(); - } + init(); return stepMap.get(stepName); } + /** * Initialize the step names */ private void init() { - findSteps(flow, stepMap); - initialized = true; + if (!initialized) { + findStepsThrowingIfNameNotUnique(flow, stepMap); + initialized = true; + } } - private void findSteps(Flow flow, Map map) { + private void findStepsThrowingIfNameNotUnique(Flow flow, Map map) { for (State state : flow.getStates()) { if (state instanceof StepLocator locator) { for (String name : locator.getStepNames()) { - map.put(name, locator.getStep(name)); + addToMapCheckingUnicity(map, locator.getStep(name), name); } } - else if (state instanceof StepHolder) { - Step step = ((StepHolder) state).getStep(); - String name = step.getName(); - stepMap.put(name, step); + //TODO remove this else bock ? not executed during tests : the only State wich implements StepHolder is StepState which implements also StepLocator + /* + Tests Coverage + Hits : 30 + state instanceof StepHolder + true hits: 0 + false hits : 30 + */ + else if (state instanceof StepHolder stepHolder) { + Step step = stepHolder.getStep(); + addToMapCheckingUnicity(map, step, step.getName()); } - else if (state instanceof FlowHolder) { - for (Flow subflow : ((FlowHolder) state).getFlows()) { - findSteps(subflow, map); + else if (state instanceof FlowHolder flowHolder) { + for (Flow subflow : flowHolder.getFlows()) { + findStepsThrowingIfNameNotUnique(subflow, map); } } } - } /** @@ -115,9 +123,7 @@ else if (state instanceof FlowHolder) { */ @Override public Collection getStepNames() { - if (!initialized) { - init(); - } + init(); return stepMap.keySet(); } @@ -139,4 +145,9 @@ protected void doExecute(final JobExecution execution) throws JobExecutionExcept } } + @Override + protected void checkStepNamesUnicity() throws AlreadyUsedStepNameException { + init(); + } + } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/job/ExtendedAbstractJobTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/job/ExtendedAbstractJobTests.java index 79a5684b1f..cf04807204 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/job/ExtendedAbstractJobTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/job/ExtendedAbstractJobTests.java @@ -25,6 +25,7 @@ import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.job.builder.AlreadyUsedStepNameException; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.core.step.StepSupport; @@ -36,6 +37,7 @@ import java.time.LocalDateTime; import java.util.Collection; import java.util.Collections; +import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -215,6 +217,11 @@ public StubJob() { protected void doExecute(JobExecution execution) throws JobExecutionException { } + @Override + protected void checkStepNamesUnicity() throws AlreadyUsedStepNameException { + + } + @Override public Step getStep(String stepName) { return null; diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java index 9dfbd7cb30..cd7f22ecf3 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java @@ -27,7 +27,10 @@ import org.springframework.batch.core.job.flow.JobExecutionDecider; import org.springframework.batch.core.job.flow.support.SimpleFlow; import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; +import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.JobRestartException; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.StepSupport; @@ -50,6 +53,7 @@ import javax.sql.DataSource; import java.util.Arrays; +import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -59,434 +63,438 @@ */ class FlowJobBuilderTests { - private JobRepository jobRepository; - - private JobExecution execution; - - private final StepSupport step1 = new StepSupport("step1") { - @Override - public void execute(StepExecution stepExecution) - throws JobInterruptedException, UnexpectedJobExecutionException { - stepExecution.upgradeStatus(BatchStatus.COMPLETED); - stepExecution.setExitStatus(ExitStatus.COMPLETED); - jobRepository.update(stepExecution); - } - }; - - private final StepSupport fails = new StepSupport("fails") { - @Override - public void execute(StepExecution stepExecution) - throws JobInterruptedException, UnexpectedJobExecutionException { - stepExecution.upgradeStatus(BatchStatus.FAILED); - stepExecution.setExitStatus(ExitStatus.FAILED); - jobRepository.update(stepExecution); - } - }; - - private final StepSupport step2 = new StepSupport("step2") { - @Override - public void execute(StepExecution stepExecution) - throws JobInterruptedException, UnexpectedJobExecutionException { - stepExecution.upgradeStatus(BatchStatus.COMPLETED); - stepExecution.setExitStatus(ExitStatus.COMPLETED); - jobRepository.update(stepExecution); - } - }; - - private final StepSupport step3 = new StepSupport("step3") { - @Override - public void execute(StepExecution stepExecution) - throws JobInterruptedException, UnexpectedJobExecutionException { - stepExecution.upgradeStatus(BatchStatus.COMPLETED); - stepExecution.setExitStatus(ExitStatus.COMPLETED); - jobRepository.update(stepExecution); - } - }; - - @BeforeEach - void init() throws Exception { - EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder() - .addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql") - .addScript("/org/springframework/batch/core/schema-hsqldb.sql") - .build(); - JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); - factory.setDataSource(embeddedDatabase); - factory.setTransactionManager(new JdbcTransactionManager(embeddedDatabase)); - factory.afterPropertiesSet(); - jobRepository = factory.getObject(); - execution = jobRepository.createJobExecution("flow", new JobParameters()); - } - - @Test - void testBuildOnOneLine() { - FlowJobBuilder builder = new JobBuilder("flow", jobRepository).start(step1) - .on("COMPLETED") - .to(step2) - .end() - .preventRestart(); - builder.build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals(2, execution.getStepExecutions().size()); - } - - @Test - void testBuildSingleFlow() { - Flow flow = new FlowBuilder("subflow").from(step1).next(step2).build(); - FlowJobBuilder builder = new JobBuilder("flow", jobRepository).start(flow).end().preventRestart(); - builder.build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals(2, execution.getStepExecutions().size()); - } - - @Test - void testBuildSingleFlowAddingStepsViaNext() { - Flow flow = new FlowBuilder("subflow").next(step1).next(step2).build(); - FlowJobBuilder builder = new JobBuilder("flow", jobRepository).start(flow).end().preventRestart(); - builder.build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals(2, execution.getStepExecutions().size()); - } - - @Test - void testBuildOverTwoLines() { - FlowJobBuilder builder = new JobBuilder("flow", jobRepository).start(step1).on("COMPLETED").to(step2).end(); - builder.preventRestart(); - builder.build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals(2, execution.getStepExecutions().size()); - } - - @Test - void testBuildSubflow() { - Flow flow = new FlowBuilder("subflow").from(step1).end(); - JobFlowBuilder builder = new JobBuilder("flow", jobRepository).start(flow); - builder.on("COMPLETED").to(step2); - builder.end().preventRestart().build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals(2, execution.getStepExecutions().size()); - } - - @Test - void testBuildSplit() { - Flow flow = new FlowBuilder("subflow").from(step1).end(); - SimpleJobBuilder builder = new JobBuilder("flow", jobRepository).start(step2); - builder.split(new SimpleAsyncTaskExecutor()).add(flow).end(); - builder.preventRestart().build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals(2, execution.getStepExecutions().size()); - } - - @Test - void testNestedSplitsWithSingleThread() { - SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); - taskExecutor.setConcurrencyLimit(1); - - FlowBuilder flowBuilder = new FlowBuilder<>("flow"); - FlowBuilder.SplitBuilder splitBuilder = flowBuilder.split(taskExecutor); - splitBuilder.add(new FlowBuilder("subflow1").from(step1).end()); - splitBuilder.add(new FlowBuilder("subflow2").from(step2).end()); - Job job = new JobBuilder("job", jobRepository).start(flowBuilder.build()).end().build(); - job.execute(execution); - - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals(2, execution.getStepExecutions().size()); - } - - @Test - void testBuildSplitUsingStartAndAdd_BATCH_2346() { - Flow subflow1 = new FlowBuilder("subflow1").from(step2).end(); - Flow subflow2 = new FlowBuilder("subflow2").from(step3).end(); - Flow splitflow = new FlowBuilder("splitflow").start(subflow1) - .split(new SimpleAsyncTaskExecutor()) - .add(subflow2) - .build(); - - FlowJobBuilder builder = new JobBuilder("flow", jobRepository).start(splitflow).end(); - builder.preventRestart().build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals(2, execution.getStepExecutions().size()); - } - - @Test - void testBuildSplit_BATCH_2282() { - Flow flow1 = new FlowBuilder("subflow1").from(step1).end(); - Flow flow2 = new FlowBuilder("subflow2").from(step2).end(); - Flow splitFlow = new FlowBuilder("splitflow").split(new SimpleAsyncTaskExecutor()) - .add(flow1, flow2) - .build(); - FlowJobBuilder builder = new JobBuilder("flow", jobRepository).start(splitFlow).end(); - builder.preventRestart().build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals(2, execution.getStepExecutions().size()); - } - - @Test - void testBuildDecision() { - JobExecutionDecider decider = new JobExecutionDecider() { - private int count = 0; - - @Override - public FlowExecutionStatus decide(JobExecution jobExecution, @Nullable StepExecution stepExecution) { - count++; - return count < 2 ? new FlowExecutionStatus("ONGOING") : FlowExecutionStatus.COMPLETED; - } - }; - step1.setAllowStartIfComplete(true); - SimpleJobBuilder builder = new JobBuilder("flow", jobRepository).start(step1); - builder.next(decider).on("COMPLETED").end().from(decider).on("*").to(step1).end(); - builder.preventRestart().build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals(2, execution.getStepExecutions().size()); - } - - @Test - void testBuildWithDeciderAtStart() { - JobExecutionDecider decider = new JobExecutionDecider() { - private int count = 0; - - @Override - public FlowExecutionStatus decide(JobExecution jobExecution, @Nullable StepExecution stepExecution) { - count++; - return count < 2 ? new FlowExecutionStatus("ONGOING") : FlowExecutionStatus.COMPLETED; - } - }; - JobFlowBuilder builder = new JobBuilder("flow", jobRepository).start(decider); - builder.on("COMPLETED").end().from(decider).on("*").to(step1).end(); - builder.build().preventRestart().build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals(1, execution.getStepExecutions().size()); - } - - @Test - void testBuildWithDeciderPriority() { - JobExecutionDecider decider = (jobExecution, stepExecution) -> new FlowExecutionStatus("COMPLETED_PARTIALLY"); - JobFlowBuilder builder = new JobBuilder("flow_priority", jobRepository).start(decider); - builder.on("COMPLETED_PARTIALLY").end(); - builder.on("COMPLETED*").fail(); - builder.build().preventRestart().build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - } - - @Test - void testBuildWithWildcardDeciderPriority() { - JobExecutionDecider decider = (jobExecution, stepExecution) -> new FlowExecutionStatus("COMPLETED_PARTIALLY"); - JobFlowBuilder builder = new JobBuilder("flow_priority", jobRepository).start(decider); - builder.on("COMPLETED_?ARTIALLY").end(); - builder.on("COMPLETED_*ARTIALLY").fail(); - builder.build().preventRestart().build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - } - - @Test - void testBuildWithDeciderPrioritySubstringAndWildcard() { - JobExecutionDecider decider = (jobExecution, stepExecution) -> new FlowExecutionStatus("CONTINUABLE"); - JobFlowBuilder builder = new JobBuilder("flow_priority", jobRepository).start(decider); - builder.on("CONTINUABLE").end(); - builder.on("CONTIN*").fail(); - builder.build().preventRestart().build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - } - - @Test - void testBuildWithIntermediateSimpleJob() { - SimpleJobBuilder builder = new JobBuilder("flow", jobRepository).start(step1); - builder.on("COMPLETED").to(step2).end(); - builder.preventRestart(); - builder.build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals(2, execution.getStepExecutions().size()); - } - - @Test - void testBuildWithIntermediateSimpleJobTwoSteps() { - SimpleJobBuilder builder = new JobBuilder("flow", jobRepository).start(step1).next(step2); - builder.on("FAILED").to(step3).end(); - builder.build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals(2, execution.getStepExecutions().size()); - } - - @Test - void testBuildWithCustomEndState() { - SimpleJobBuilder builder = new JobBuilder("flow", jobRepository).start(step1); - builder.on("COMPLETED").end("FOO"); - builder.preventRestart(); - builder.build().execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals("FOO", execution.getExitStatus().getExitCode()); - assertEquals(1, execution.getStepExecutions().size()); - } - - @Test - void testBuildWithStop() { - SimpleJobBuilder builder = new JobBuilder("flow", jobRepository).start(step1); - builder.on("COMPLETED").stop(); - builder.preventRestart(); - builder.build().execute(execution); - assertEquals(BatchStatus.STOPPED, execution.getStatus()); - assertEquals("STOPPED", execution.getExitStatus().getExitCode()); - assertEquals(1, execution.getStepExecutions().size()); - } - - @Test - void testBuildWithStopAndRestart() throws Exception { - SimpleJobBuilder builder = new JobBuilder("flow", jobRepository).start(fails); - builder.on("FAILED").stopAndRestart(step2); - Job job = builder.build(); - job.execute(execution); - assertEquals(BatchStatus.STOPPED, execution.getStatus()); - assertEquals(1, execution.getStepExecutions().size()); - execution = jobRepository.createJobExecution("flow", new JobParameters()); - job.execute(execution); - assertEquals(BatchStatus.COMPLETED, execution.getStatus()); - assertEquals(1, execution.getStepExecutions().size()); - assertEquals("step2", execution.getStepExecutions().iterator().next().getStepName()); - } - - @Test - void testBuildWithJobScopedStep() throws Exception { - // given - ApplicationContext context = new AnnotationConfigApplicationContext(JobConfiguration.class); - JobLauncher jobLauncher = context.getBean(JobLauncher.class); - Job job = context.getBean(Job.class); - JobParameters jobParameters = new JobParametersBuilder().addLong("chunkSize", 2L).toJobParameters(); - - // when - JobExecution jobExecution = jobLauncher.run(job, jobParameters); - - // then - assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); - } + private JobRepository jobRepository; + + private JobExecution execution; + + private final StepSupport step1 = new StepSupport("step1") { + @Override + public void execute(StepExecution stepExecution) + throws JobInterruptedException, UnexpectedJobExecutionException { + stepExecution.upgradeStatus(BatchStatus.COMPLETED); + stepExecution.setExitStatus(ExitStatus.COMPLETED); + jobRepository.update(stepExecution); + } + }; + + private final StepSupport fails = new StepSupport("fails") { + @Override + public void execute(StepExecution stepExecution) + throws JobInterruptedException, UnexpectedJobExecutionException { + stepExecution.upgradeStatus(BatchStatus.FAILED); + stepExecution.setExitStatus(ExitStatus.FAILED); + jobRepository.update(stepExecution); + } + }; + + private final StepSupport step2 = new StepSupport("step2") { + @Override + public void execute(StepExecution stepExecution) + throws JobInterruptedException, UnexpectedJobExecutionException { + stepExecution.upgradeStatus(BatchStatus.COMPLETED); + stepExecution.setExitStatus(ExitStatus.COMPLETED); + jobRepository.update(stepExecution); + } + }; + + private final StepSupport step3 = new StepSupport("step3") { + @Override + public void execute(StepExecution stepExecution) + throws JobInterruptedException, UnexpectedJobExecutionException { + stepExecution.upgradeStatus(BatchStatus.COMPLETED); + stepExecution.setExitStatus(ExitStatus.COMPLETED); + jobRepository.update(stepExecution); + } + }; + + @BeforeEach + void init() throws Exception { + EmbeddedDatabase embeddedDatabase = new EmbeddedDatabaseBuilder() + .addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql") + .addScript("/org/springframework/batch/core/schema-hsqldb.sql") + .build(); + JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); + factory.setDataSource(embeddedDatabase); + factory.setTransactionManager(new JdbcTransactionManager(embeddedDatabase)); + factory.afterPropertiesSet(); + jobRepository = factory.getObject(); + execution = jobRepository.createJobExecution("flow", new JobParameters()); + } + + @Test + void testBuildOnOneLine() { + FlowJobBuilder builder = new JobBuilder("flow", jobRepository).start(step1) + .on("COMPLETED") + .to(step2) + .end() + .preventRestart(); + builder.build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(2, execution.getStepExecutions().size()); + } + + @Test + void testBuildSingleFlow() { + Flow flow = new FlowBuilder("subflow").from(step1).next(step2).build(); + FlowJobBuilder builder = new JobBuilder("flow", jobRepository).start(flow).end().preventRestart(); + builder.build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(2, execution.getStepExecutions().size()); + } + + @Test + void testBuildSingleFlowAddingStepsViaNext() { + Flow flow = new FlowBuilder("subflow").next(step1).next(step2).build(); + FlowJobBuilder builder = new JobBuilder("flow", jobRepository).start(flow).end().preventRestart(); + builder.build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(2, execution.getStepExecutions().size()); + } + + @Test + void testBuildOverTwoLines() { + FlowJobBuilder builder = new JobBuilder("flow", jobRepository).start(step1).on("COMPLETED").to(step2).end(); + builder.preventRestart(); + builder.build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(2, execution.getStepExecutions().size()); + } + + @Test + void testBuildSubflow() { + Flow flow = new FlowBuilder("subflow").from(step1).end(); + JobFlowBuilder builder = new JobBuilder("flow", jobRepository).start(flow); + builder.on("COMPLETED").to(step2); + builder.end().preventRestart().build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(2, execution.getStepExecutions().size()); + } + + @Test + void testBuildSplit() { + Flow flow = new FlowBuilder("subflow").from(step1).end(); + SimpleJobBuilder builder = new JobBuilder("flow", jobRepository).start(step2); + builder.split(new SimpleAsyncTaskExecutor()).add(flow).end(); + builder.preventRestart().build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(2, execution.getStepExecutions().size()); + } + + @Test + void testNestedSplitsWithSingleThread() { + SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); + taskExecutor.setConcurrencyLimit(1); + + FlowBuilder flowBuilder = new FlowBuilder<>("flow"); + FlowBuilder.SplitBuilder splitBuilder = flowBuilder.split(taskExecutor); + splitBuilder.add(new FlowBuilder("subflow1").from(step1).end()); + splitBuilder.add(new FlowBuilder("subflow2").from(step2).end()); + Job job = new JobBuilder("job", jobRepository).start(flowBuilder.build()).end().build(); + job.execute(execution); + + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(2, execution.getStepExecutions().size()); + } + + @Test + void testBuildSplitUsingStartAndAdd_BATCH_2346() { + Flow subflow1 = new FlowBuilder("subflow1").from(step2).end(); + Flow subflow2 = new FlowBuilder("subflow2").from(step3).end(); + Flow splitflow = new FlowBuilder("splitflow").start(subflow1) + .split(new SimpleAsyncTaskExecutor()) + .add(subflow2) + .build(); + + FlowJobBuilder builder = new JobBuilder("flow", jobRepository).start(splitflow).end(); + builder.preventRestart().build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(2, execution.getStepExecutions().size()); + } + + @Test + void testBuildSplit_BATCH_2282() { + Flow flow1 = new FlowBuilder("subflow1").from(step1).end(); + Flow flow2 = new FlowBuilder("subflow2").from(step2).end(); + Flow splitFlow = new FlowBuilder("splitflow").split(new SimpleAsyncTaskExecutor()) + .add(flow1, flow2) + .build(); + FlowJobBuilder builder = new JobBuilder("flow", jobRepository).start(splitFlow).end(); + builder.preventRestart().build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(2, execution.getStepExecutions().size()); + } + + @Test + void testBuildDecision() { + JobExecutionDecider decider = new JobExecutionDecider() { + private int count = 0; + + @Override + public FlowExecutionStatus decide(JobExecution jobExecution, @Nullable StepExecution stepExecution) { + count++; + return count < 2 ? new FlowExecutionStatus("ONGOING") : FlowExecutionStatus.COMPLETED; + } + }; + step1.setAllowStartIfComplete(true); + SimpleJobBuilder builder = new JobBuilder("flow", jobRepository).start(step1); + builder.next(decider).on("COMPLETED").end().from(decider).on("*").to(step1).end(); + builder.preventRestart().build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(2, execution.getStepExecutions().size()); + } + + @Test + void testBuildWithDeciderAtStart() { + JobExecutionDecider decider = new JobExecutionDecider() { + private int count = 0; + + @Override + public FlowExecutionStatus decide(JobExecution jobExecution, @Nullable StepExecution stepExecution) { + count++; + return count < 2 ? new FlowExecutionStatus("ONGOING") : FlowExecutionStatus.COMPLETED; + } + }; + JobFlowBuilder builder = new JobBuilder("flow", jobRepository).start(decider); + builder.on("COMPLETED").end().from(decider).on("*").to(step1).end(); + builder.build().preventRestart().build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(1, execution.getStepExecutions().size()); + } + + @Test + void testBuildWithDeciderPriority() { + JobExecutionDecider decider = (jobExecution, stepExecution) -> new FlowExecutionStatus("COMPLETED_PARTIALLY"); + JobFlowBuilder builder = new JobBuilder("flow_priority", jobRepository).start(decider); + builder.on("COMPLETED_PARTIALLY").end(); + builder.on("COMPLETED*").fail(); + builder.build().preventRestart().build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + } + + @Test + void testBuildWithWildcardDeciderPriority() { + JobExecutionDecider decider = (jobExecution, stepExecution) -> new FlowExecutionStatus("COMPLETED_PARTIALLY"); + JobFlowBuilder builder = new JobBuilder("flow_priority", jobRepository).start(decider); + builder.on("COMPLETED_?ARTIALLY").end(); + builder.on("COMPLETED_*ARTIALLY").fail(); + builder.build().preventRestart().build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + } + + @Test + void testBuildWithDeciderPrioritySubstringAndWildcard() { + JobExecutionDecider decider = (jobExecution, stepExecution) -> new FlowExecutionStatus("CONTINUABLE"); + JobFlowBuilder builder = new JobBuilder("flow_priority", jobRepository).start(decider); + builder.on("CONTINUABLE").end(); + builder.on("CONTIN*").fail(); + builder.build().preventRestart().build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + } + + @Test + void testBuildWithIntermediateSimpleJob() { + SimpleJobBuilder builder = new JobBuilder("flow", jobRepository).start(step1); + builder.on("COMPLETED").to(step2).end(); + builder.preventRestart(); + builder.build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(2, execution.getStepExecutions().size()); + } + + @Test + void testBuildWithIntermediateSimpleJobTwoSteps() { + SimpleJobBuilder builder = new JobBuilder("flow", jobRepository).start(step1).next(step2); + builder.on("FAILED").to(step3).end(); + builder.build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(2, execution.getStepExecutions().size()); + } + + @Test + void testBuildWithCustomEndState() { + SimpleJobBuilder builder = new JobBuilder("flow", jobRepository).start(step1); + builder.on("COMPLETED").end("FOO"); + builder.preventRestart(); + builder.build().execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals("FOO", execution.getExitStatus().getExitCode()); + assertEquals(1, execution.getStepExecutions().size()); + } + + @Test + void testBuildWithStop() { + SimpleJobBuilder builder = new JobBuilder("flow", jobRepository).start(step1); + builder.on("COMPLETED").stop(); + builder.preventRestart(); + builder.build().execute(execution); + assertEquals(BatchStatus.STOPPED, execution.getStatus()); + assertEquals("STOPPED", execution.getExitStatus().getExitCode()); + assertEquals(1, execution.getStepExecutions().size()); + } + + @Test + void testBuildWithStopAndRestart() throws Exception { + SimpleJobBuilder builder = new JobBuilder("flow", jobRepository).start(fails); + builder.on("FAILED").stopAndRestart(step2); + Job job = builder.build(); + job.execute(execution); + assertEquals(BatchStatus.STOPPED, execution.getStatus()); + assertEquals(1, execution.getStepExecutions().size()); + execution = jobRepository.createJobExecution("flow", new JobParameters()); + job.execute(execution); + assertEquals(BatchStatus.COMPLETED, execution.getStatus()); + assertEquals(1, execution.getStepExecutions().size()); + assertEquals("step2", execution.getStepExecutions().iterator().next().getStepName()); + } + + @Test + void testBuildWithJobScopedStep() throws Exception { + // given + ApplicationContext context = new AnnotationConfigApplicationContext(JobConfiguration.class); + JobLauncher jobLauncher = context.getBean(JobLauncher.class); + Job job = context.getBean(Job.class); + JobParameters jobParameters = new JobParametersBuilder().addLong("chunkSize", 2L).toJobParameters(); + + // when + JobExecution jobExecution = jobLauncher.run(job, jobParameters); + + // then + assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); + } // https://github.com/spring-projects/spring-batch/issues/3757#issuecomment-1821593539 @Test - void testStepNamesMustBeUniqueWithinFlowDefinition() { + void testStepNamesMustBeUniqueWithinFlowDefinition() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException { ApplicationContext context = new AnnotationConfigApplicationContext(JobConfigurationForStepNameUnique.class); JobLauncher jobLauncher = context.getBean(JobLauncher.class); Job job = context.getBean(Job.class); - assertThrows(AlreadyUsedStepNameException.class, ()->jobLauncher.run(job, new JobParametersBuilder().addLong("random", 2L).addString("stepTwo.name", JobConfigurationForStepNameUnique.SHARED_NAME).toJobParameters())); - assertThrows(AlreadyUsedStepNameException.class, ()->jobLauncher.run(job, new JobParametersBuilder().addLong("random", 1L).addString("stepTwo.name",JobConfigurationForStepNameUnique.SHARED_NAME).toJobParameters())); + JobExecution jobExecution=jobLauncher.run(job, new JobParametersBuilder().addLong("random", 2L).addString("stepTwo.name", JobConfigurationForStepNameUnique.SHARED_NAME).toJobParameters()); + assertTrue(jobExecution.getAllFailureExceptions().stream().map(Object::getClass).anyMatch(AlreadyUsedStepNameException.class::equals)); + assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode()); + jobExecution=jobLauncher.run(job, new JobParametersBuilder().addLong("random", 1L).addString("stepTwo.name", JobConfigurationForStepNameUnique.SHARED_NAME).toJobParameters()); + assertTrue(jobExecution.getAllFailureExceptions().stream().map(Object::getClass).anyMatch(AlreadyUsedStepNameException.class::equals)); + assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode()); } - @EnableBatchProcessing - @Configuration - static class JobConfigurationForStepNameUnique{ + @EnableBatchProcessing + @Configuration + static class JobConfigurationForStepNameUnique { - private static final String SHARED_NAME ="sharedName"; + private static final String SHARED_NAME = "sharedName"; - private static final Log logger = LogFactory.getLog(FlowJobBuilderTests.class); + private static final Log logger = LogFactory.getLog(FlowJobBuilderTests.class); - @Bean - @JobScope - public Step conditionalStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, - @Value("#{jobParameters['random']}") Integer random) { - return new StepBuilder("conditionalStep", jobRepository).tasklet( - (StepContribution contribution, ChunkContext chunkContext) ->{ - String exitStatus = (random % 2 == 0) ? "EVEN" : "ODD"; - logger.info("'conditionalStep' with exitStatus "+exitStatus); - contribution.setExitStatus(new ExitStatus(exitStatus)); - return RepeatStatus.FINISHED; - }, transactionManager - ).build(); - } + @Bean + @JobScope + public Step conditionalStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, + @Value("#{jobParameters['random']}") Integer random) { + return new StepBuilder("conditionalStep", jobRepository).tasklet( + (StepContribution contribution, ChunkContext chunkContext) -> { + String exitStatus = (random % 2 == 0) ? "EVEN" : "ODD"; + logger.info("'conditionalStep' with exitStatus " + exitStatus); + contribution.setExitStatus(new ExitStatus(exitStatus)); + return RepeatStatus.FINISHED; + }, transactionManager + ).build(); + } @Bean @JobScope public Step stepTwo(JobRepository jobRepository, PlatformTransactionManager transactionManager, @Value("#{jobParameters['stepTwo.name']}") String name) { return new StepBuilder(name, jobRepository) - .tasklet((StepContribution contribution, ChunkContext chunkContext) -> { - logger.info("Hello from stepTwo"); - return RepeatStatus.FINISHED; - }, transactionManager) - .build(); + .tasklet((StepContribution contribution, ChunkContext chunkContext) -> { + logger.info("Hello from stepTwo"); + return RepeatStatus.FINISHED; + }, transactionManager) + .build(); } @Bean public Step stepThree(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder(SHARED_NAME, jobRepository) - .tasklet((StepContribution contribution, ChunkContext chunkContext) -> { - logger.info("Hello from stepThree"); - return RepeatStatus.FINISHED; - }, transactionManager) - .build(); + .tasklet((StepContribution contribution, ChunkContext chunkContext) -> { + logger.info("Hello from stepThree"); + return RepeatStatus.FINISHED; + }, transactionManager) + .build(); } @Bean public Step stepFour(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder(SHARED_NAME, jobRepository) - .tasklet((StepContribution contribution, ChunkContext chunkContext) -> { - logger.info("Hello from stepFour"); - return RepeatStatus.FINISHED; - }, transactionManager) - .build(); + .tasklet((StepContribution contribution, ChunkContext chunkContext) -> { + logger.info("Hello from stepFour"); + return RepeatStatus.FINISHED; + }, transactionManager) + .build(); } @Bean public Job job(JobRepository jobRepository, @Qualifier("conditionalStep") Step conditionalStep, - @Qualifier("stepFour") Step step4, @Qualifier("stepTwo") Step step2, - @Qualifier("stepThree") Step step3) { - JobBuilder jobBuilder = new JobBuilder("flow", jobRepository); - return jobBuilder.start(conditionalStep) - .on("ODD").to(step2) - .from(conditionalStep).on("EVEN").to(step3) - .from(step3) - .next(step4) - .from(step2).next(step4).end().build(); - } - - @Bean - public DataSource dataSource() { - return new EmbeddedDatabaseBuilder().addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql") - .addScript("/org/springframework/batch/core/schema-hsqldb.sql") - .generateUniqueName(true) - .build(); - } - - @Bean - public JdbcTransactionManager transactionManager(DataSource dataSource) { - return new JdbcTransactionManager(dataSource); - } - - - } - - - @EnableBatchProcessing - @Configuration - static class JobConfiguration { - - @Bean - @JobScope - public Step step(JobRepository jobRepository, PlatformTransactionManager transactionManager, - @Value("#{jobParameters['chunkSize']}") Integer chunkSize) { - return new StepBuilder("step", jobRepository).chunk(chunkSize, transactionManager) - .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4))) - .writer(items -> { - }) - .build(); - } - - @Bean - public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) { - Step step = step(jobRepository, transactionManager, null); - return new JobBuilder("job", jobRepository).flow(step).build().build(); - } - - @Bean - public DataSource dataSource() { - return new EmbeddedDatabaseBuilder().addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql") - .addScript("/org/springframework/batch/core/schema-hsqldb.sql") - .generateUniqueName(true) - .build(); - } - - @Bean - public JdbcTransactionManager transactionManager(DataSource dataSource) { - return new JdbcTransactionManager(dataSource); - } - - } + @Qualifier("stepFour") Step step4, @Qualifier("stepTwo") Step step2, + @Qualifier("stepThree") Step step3) { + JobBuilder jobBuilder = new JobBuilder("flow", jobRepository); + return jobBuilder.start(conditionalStep) + .on("ODD").to(step2) + .from(conditionalStep).on("EVEN").to(step3) + .from(step3) + .next(step4) + .from(step2).next(step4).end().build(); + } + + @Bean + public DataSource dataSource() { + return new EmbeddedDatabaseBuilder().addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql") + .addScript("/org/springframework/batch/core/schema-hsqldb.sql") + .generateUniqueName(true) + .build(); + } + + @Bean + public JdbcTransactionManager transactionManager(DataSource dataSource) { + return new JdbcTransactionManager(dataSource); + } + + + } + + + @EnableBatchProcessing + @Configuration + static class JobConfiguration { + + @Bean + @JobScope + public Step step(JobRepository jobRepository, PlatformTransactionManager transactionManager, + @Value("#{jobParameters['chunkSize']}") Integer chunkSize) { + return new StepBuilder("step", jobRepository).chunk(chunkSize, transactionManager) + .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4))) + .writer(items -> { + }) + .build(); + } + + @Bean + public Job job(JobRepository jobRepository, PlatformTransactionManager transactionManager) { + Step step = step(jobRepository, transactionManager, null); + return new JobBuilder("job", jobRepository).flow(step).build().build(); + } + + @Bean + public DataSource dataSource() { + return new EmbeddedDatabaseBuilder().addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql") + .addScript("/org/springframework/batch/core/schema-hsqldb.sql") + .generateUniqueName(true) + .build(); + } + + @Bean + public JdbcTransactionManager transactionManager(DataSource dataSource) { + return new JdbcTransactionManager(dataSource); + } + + } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java index d5d3951c2c..daf961cbc1 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/SimpleJobOperatorTests.java @@ -43,6 +43,7 @@ import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.job.AbstractJob; import org.springframework.batch.core.job.JobSupport; +import org.springframework.batch.core.job.builder.AlreadyUsedStepNameException; import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException; import org.springframework.batch.core.launch.NoSuchJobException; import org.springframework.batch.core.launch.NoSuchJobExecutionException; @@ -455,6 +456,10 @@ protected void doExecute(JobExecution execution) throws JobExecutionException { } + @Override + protected void checkStepNamesUnicity() throws AlreadyUsedStepNameException { + } + } }