diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java index c9414cb17..25689d1a3 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/jobcluster/JobClusterActor.java @@ -60,6 +60,7 @@ import io.mantisrx.master.jobcluster.job.MantisJobMetadataView; import io.mantisrx.master.jobcluster.job.worker.IMantisWorkerMetadata; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto; +import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.CreateJobClusterResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DeleteJobClusterResponse; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DisableJobClusterRequest; import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DisableJobClusterResponse; @@ -701,7 +702,7 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i // create sla enforcer slaEnforcer = new SLAEnforcer(jobClusterMetadata.getJobClusterDefinition().getSLA()); long expireFrequency = ConfigurationProvider.getConfig().getCompletedJobPurgeFrequencySeqs(); - + String jobClusterName = jobClusterMetadata.getJobClusterDefinition().getName(); // If cluster is disabled if(jobClusterMetadata.isDisabled()) { logger.info("Cluster {} initialized but is Disabled", jobClusterMetadata @@ -718,7 +719,7 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i int count = 50; if(!initReq.jobList.isEmpty()) { logger.info("Cluster {} is disabled however it has {} active/accepted jobs", - jobClusterMetadata.getJobClusterDefinition().getName(), initReq.jobList.size()); + jobClusterName, initReq.jobList.size()); for(IMantisJobMetadata jobMeta : initReq.jobList) { try { if(count == 0) { @@ -728,7 +729,7 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i if(!JobState.isTerminalState(jobMeta.getState())) { logger.info("Job {} is in non terminal state {} for disabled cluster {}." + "Marking it complete", jobMeta.getJobId(), jobMeta.getState(), - jobClusterMetadata.getJobClusterDefinition().getName()); + jobClusterName); count--; jobManager.markCompleted(jobMeta); jobStore.archiveJob(jobMeta); @@ -758,7 +759,7 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i eventPublisher.publishAuditEvent( new LifecycleEventsProto.AuditEvent( LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_CREATE, - jobClusterMetadata.getJobClusterDefinition().getName(), + jobClusterName, "saved job cluster " + name) ); logger.info("successfully saved job cluster {}", name); @@ -790,6 +791,11 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i cronManager = new CronManager(name, getSelf(), jobClusterMetadata.getJobClusterDefinition().getSLA()); } catch (Exception e) { logger.warn("Exception initializing cron", e); + getSender().tell(new JobClusterManagerProto.CreateJobClusterResponse( + initReq.requestId, e instanceof SchedulerException?CLIENT_ERROR:SERVER_ERROR, + "Job Cluster " + jobClusterName + " could not be created due to cron initialization error" + e.getMessage(), + jobClusterName), getSelf()); + return; } initRunningJobs(initReq, sender); @@ -916,7 +922,7 @@ public void onJobClusterUpdate(final UpdateJobClusterRequest request) { } catch (Exception e) { logger.error("job cluster not created"); sender.tell(new UpdateJobClusterResponse(request.requestId, SERVER_ERROR, name - + " Job cluster updation failed " + e.getMessage()), getSelf()); + + " Job cluster update failed " + e.getMessage()), getSelf()); numJobClusterUpdateErrors.increment(); } } @@ -1271,13 +1277,13 @@ public void onJobClusterEnable(final EnableJobClusterRequest req) { .withLastJobCount(this.jobClusterMetadata.getLastJobCount()) .withJobClusterDefinition((JobClusterDefinitionImpl)this.jobClusterMetadata.getJobClusterDefinition()) .build(); - //update store - jobStore.updateJobCluster(jobClusterMetadata); - this.jobClusterMetadata = jobClusterMetadata; if (cronManager == null) { cronManager = new CronManager(name, getSelf(), jobClusterMetadata.getJobClusterDefinition().getSLA()); } this.cronManager.initCron(); + //update store after cron init + jobStore.updateJobCluster(jobClusterMetadata); + this.jobClusterMetadata = jobClusterMetadata; // change behavior to enabled getContext().become(initializedBehavior); @@ -1294,7 +1300,7 @@ public void onJobClusterEnable(final EnableJobClusterRequest req) { } catch(Exception e) { String errorMsg = String.format("Exception enabling cluster %s due to %s", name, e.getMessage()); logger.error(errorMsg,e); - sender.tell(new EnableJobClusterResponse(req.requestId, SERVER_ERROR, errorMsg), getSelf()); + sender.tell(new EnableJobClusterResponse(req.requestId, e instanceof SchedulerException?CLIENT_ERROR:SERVER_ERROR, errorMsg), getSelf()); numJobClusterEnableErrors.increment(); } if(logger.isTraceEnabled()) { logger.trace("Enter onJobClusterEnable"); } @@ -2122,24 +2128,23 @@ public void onJobClusterUpdateSLA(UpdateJobClusterSLARequest slaRequest) { .withJobClusterDefinition(updatedDefn) .build(); - updateAndSaveJobCluster(jobCluster); if(cronManager != null) cronManager.destroyCron(); this.cronManager = new CronManager(name, getSelf(), newSla); - + updateAndSaveJobCluster(jobCluster); //update after cron succeeds sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, SUCCESS, name + " SLA updated"), getSelf()); eventPublisher.publishAuditEvent( new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_UPDATE, jobClusterMetadata.getJobClusterDefinition().getName(), name+" SLA update") ); - } catch(IllegalArgumentException e) { + } catch(IllegalArgumentException | SchedulerException e) { logger.error("Invalid arguement job cluster not updated ", e); - sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, CLIENT_ERROR, name + " Job cluster SLA updation failed " + e.getMessage()), getSelf()); + sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, CLIENT_ERROR, name + " Job cluster SLA update failed " + e.getMessage()), getSelf()); } catch(Exception e) { logger.error("job cluster not updated ", e); - sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, SERVER_ERROR, name + " Job cluster SLA updation failed " + e.getMessage()), getSelf()); + sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, SERVER_ERROR, name + " Job cluster SLA update failed " + e.getMessage()), getSelf()); } if(logger.isTraceEnabled()) { logger.trace("Exit onJobClusterUpdateSLA {}", slaRequest); } } @@ -3217,7 +3222,7 @@ private void initCron() throws Exception{ isCronActive = true; } catch (IllegalArgumentException e) { destroyCron(); - logger.error("Failed to start cron for {}: {}", jobClusterName, e); + logger.error("Failed to start cron for {}: {}. The format of the cron schedule may be incorrect.", jobClusterName, e.getStackTrace()); throw new SchedulerException(e.getMessage(), e); } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/domain/JobClusterDefinitionImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/domain/JobClusterDefinitionImpl.java index 1ee9b0f20..ae55fee0f 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/domain/JobClusterDefinitionImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/domain/JobClusterDefinitionImpl.java @@ -16,6 +16,7 @@ package io.mantisrx.server.master.domain; +import com.netflix.fenzo.triggers.TriggerUtils; import io.mantisrx.common.Label; import io.mantisrx.master.jobcluster.LabelManager.SystemLabels; import io.mantisrx.master.jobcluster.job.JobState; @@ -64,6 +65,8 @@ public JobClusterDefinitionImpl(@JsonProperty("name") String name, ) { Preconditions.checkNotNull(jobClusterConfigs); Preconditions.checkArgument(!jobClusterConfigs.isEmpty()); + if (sla != null && sla.getCronSpec() != null) + TriggerUtils.validateCronExpression(sla.getCronSpec()); this.owner = owner; this.name = name; this.sla = Optional.ofNullable(sla).orElse(new SLA(0, 0, null, CronPolicy.KEEP_EXISTING)); diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java index f9dad2467..9e3c5937d 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/JobClusterAkkaTest.java @@ -1568,6 +1568,52 @@ public void testCronTriggersSLAToKillOld() { } + @Test(expected = IllegalArgumentException.class) + public void testInvalidCronDefined() { + TestKit probe = new TestKit(system); + String clusterName = "testInvalidCronSubmit"; + MantisScheduler schedulerMock = mock(MantisScheduler.class); + MantisJobStore jobStoreMock = mock(MantisJobStore.class); + + SLA sla = new SLA(1,1,"a b * * * * * * *",IJobClusterDefinition.CronPolicy.KEEP_NEW); + final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); //should throw IllegalArgumentException + } + + @Test + public void testInvalidCronSLAUpdate() throws Exception { + TestKit probe = new TestKit(system); + String clusterName = "testJobClusterInvalidSLAUpdateIgnored"; + MantisScheduler schedulerMock = mock(MantisScheduler.class); + MantisJobStore jobStoreMock = mock(MantisJobStore.class); + + final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName); + ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0)); + jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef()); + JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class); + assertEquals(SUCCESS, createResp.responseCode); + + UpdateJobClusterSLARequest updateSlaReq = new UpdateJobClusterSLARequest(clusterName, 2, 1,"a b * * * * * * *",IJobClusterDefinition.CronPolicy.KEEP_NEW,false,"user" ); + jobClusterActor.tell(updateSlaReq, probe.getRef()); + UpdateJobClusterSLAResponse resp = probe.expectMsgClass(UpdateJobClusterSLAResponse.class); + + assertEquals(CLIENT_ERROR, resp.responseCode); + assertEquals(jobClusterActor, probe.getLastSender()); + + jobClusterActor.tell(new GetJobClusterRequest(clusterName), probe.getRef()); + GetJobClusterResponse resp3 = probe.expectMsgClass(GetJobClusterResponse.class); + + assertEquals(SUCCESS, resp3.responseCode); + assertTrue(resp3.getJobCluster() != null); + System.out.println("Job cluster " + resp3.getJobCluster()); + assertEquals(clusterName, resp3.getJobCluster().get().getName()); + // No changes to original SLA + assertEquals(0, resp3.getJobCluster().get().getSla().getMin()); + assertEquals(0, resp3.getJobCluster().get().getSla().getMax()); + + verify(jobStoreMock, times(1)).createJobCluster(any()); + verify(jobStoreMock, times(0)).updateJobCluster(any()); + } + @Test public void testJobSubmitWithUnique() { TestKit probe = new TestKit(system); diff --git a/mantis-testcontainers/src/test/java/TestContainerHelloWorld.java b/mantis-testcontainers/src/test/java/TestContainerHelloWorld.java index 5d27be1a2..9e36e7789 100644 --- a/mantis-testcontainers/src/test/java/TestContainerHelloWorld.java +++ b/mantis-testcontainers/src/test/java/TestContainerHelloWorld.java @@ -211,7 +211,7 @@ public void testQuickSubmitJob() throws IOException, InterruptedException { agentId0, agent0, 5, - Duration.ofSeconds(3).toMillis())) { + Duration.ofSeconds(10).toMillis())) { fail("Failed to register agent: " + agent0.getContainerId()); } @@ -221,7 +221,7 @@ public void testQuickSubmitJob() throws IOException, InterruptedException { controlPlaneHost, controlPlanePort, 10, - Duration.ofSeconds(2).toMillis())) { + Duration.ofSeconds(10).toMillis())) { fail("Failed to start job worker."); } @@ -264,7 +264,7 @@ public void testRegularSubmitJob() throws IOException, InterruptedException { agentId0, agent0, 5, - Duration.ofSeconds(3).toMillis())) { + Duration.ofSeconds(10).toMillis())) { fail("Failed to register agent: " + agent0.getContainerId()); } @@ -274,7 +274,7 @@ public void testRegularSubmitJob() throws IOException, InterruptedException { controlPlaneHost, controlPlanePort, 5, - Duration.ofSeconds(2).toMillis())) { + Duration.ofSeconds(10).toMillis())) { fail("Failed to start job worker."); }