Skip to content

Commit

Permalink
report error creating cron (#732)
Browse files Browse the repository at this point in the history
Co-authored-by: Andy Zhang <[email protected]>
  • Loading branch information
dtrager02 and Andyz26 authored Jan 16, 2025
1 parent 86a0916 commit 2de4671
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.mantisrx.master.jobcluster.job.MantisJobMetadataView;
import io.mantisrx.master.jobcluster.job.worker.IMantisWorkerMetadata;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.CreateJobClusterResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DeleteJobClusterResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DisableJobClusterRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DisableJobClusterResponse;
Expand Down Expand Up @@ -701,7 +702,7 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i
// create sla enforcer
slaEnforcer = new SLAEnforcer(jobClusterMetadata.getJobClusterDefinition().getSLA());
long expireFrequency = ConfigurationProvider.getConfig().getCompletedJobPurgeFrequencySeqs();

String jobClusterName = jobClusterMetadata.getJobClusterDefinition().getName();
// If cluster is disabled
if(jobClusterMetadata.isDisabled()) {
logger.info("Cluster {} initialized but is Disabled", jobClusterMetadata
Expand All @@ -718,7 +719,7 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i
int count = 50;
if(!initReq.jobList.isEmpty()) {
logger.info("Cluster {} is disabled however it has {} active/accepted jobs",
jobClusterMetadata.getJobClusterDefinition().getName(), initReq.jobList.size());
jobClusterName, initReq.jobList.size());
for(IMantisJobMetadata jobMeta : initReq.jobList) {
try {
if(count == 0) {
Expand All @@ -728,7 +729,7 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i
if(!JobState.isTerminalState(jobMeta.getState())) {
logger.info("Job {} is in non terminal state {} for disabled cluster {}."
+ "Marking it complete", jobMeta.getJobId(), jobMeta.getState(),
jobClusterMetadata.getJobClusterDefinition().getName());
jobClusterName);
count--;
jobManager.markCompleted(jobMeta);
jobStore.archiveJob(jobMeta);
Expand Down Expand Up @@ -758,7 +759,7 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i
eventPublisher.publishAuditEvent(
new LifecycleEventsProto.AuditEvent(
LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_CREATE,
jobClusterMetadata.getJobClusterDefinition().getName(),
jobClusterName,
"saved job cluster " + name)
);
logger.info("successfully saved job cluster {}", name);
Expand Down Expand Up @@ -790,6 +791,11 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i
cronManager = new CronManager(name, getSelf(), jobClusterMetadata.getJobClusterDefinition().getSLA());
} catch (Exception e) {
logger.warn("Exception initializing cron", e);
getSender().tell(new JobClusterManagerProto.CreateJobClusterResponse(
initReq.requestId, e instanceof SchedulerException?CLIENT_ERROR:SERVER_ERROR,
"Job Cluster " + jobClusterName + " could not be created due to cron initialization error" + e.getMessage(),
jobClusterName), getSelf());
return;
}
initRunningJobs(initReq, sender);

Expand Down Expand Up @@ -916,7 +922,7 @@ public void onJobClusterUpdate(final UpdateJobClusterRequest request) {
} catch (Exception e) {
logger.error("job cluster not created");
sender.tell(new UpdateJobClusterResponse(request.requestId, SERVER_ERROR, name
+ " Job cluster updation failed " + e.getMessage()), getSelf());
+ " Job cluster update failed " + e.getMessage()), getSelf());
numJobClusterUpdateErrors.increment();
}
}
Expand Down Expand Up @@ -1271,13 +1277,13 @@ public void onJobClusterEnable(final EnableJobClusterRequest req) {
.withLastJobCount(this.jobClusterMetadata.getLastJobCount())
.withJobClusterDefinition((JobClusterDefinitionImpl)this.jobClusterMetadata.getJobClusterDefinition())
.build();
//update store
jobStore.updateJobCluster(jobClusterMetadata);
this.jobClusterMetadata = jobClusterMetadata;
if (cronManager == null) {
cronManager = new CronManager(name, getSelf(), jobClusterMetadata.getJobClusterDefinition().getSLA());
}
this.cronManager.initCron();
//update store after cron init
jobStore.updateJobCluster(jobClusterMetadata);
this.jobClusterMetadata = jobClusterMetadata;
// change behavior to enabled
getContext().become(initializedBehavior);

Expand All @@ -1294,7 +1300,7 @@ public void onJobClusterEnable(final EnableJobClusterRequest req) {
} catch(Exception e) {
String errorMsg = String.format("Exception enabling cluster %s due to %s", name, e.getMessage());
logger.error(errorMsg,e);
sender.tell(new EnableJobClusterResponse(req.requestId, SERVER_ERROR, errorMsg), getSelf());
sender.tell(new EnableJobClusterResponse(req.requestId, e instanceof SchedulerException?CLIENT_ERROR:SERVER_ERROR, errorMsg), getSelf());
numJobClusterEnableErrors.increment();
}
if(logger.isTraceEnabled()) { logger.trace("Enter onJobClusterEnable"); }
Expand Down Expand Up @@ -2122,24 +2128,23 @@ public void onJobClusterUpdateSLA(UpdateJobClusterSLARequest slaRequest) {
.withJobClusterDefinition(updatedDefn)
.build();

updateAndSaveJobCluster(jobCluster);
if(cronManager != null)
cronManager.destroyCron();
this.cronManager = new CronManager(name, getSelf(), newSla);

updateAndSaveJobCluster(jobCluster); //update after cron succeeds
sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, SUCCESS, name + " SLA updated"), getSelf());

eventPublisher.publishAuditEvent(
new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_UPDATE,
jobClusterMetadata.getJobClusterDefinition().getName(), name+" SLA update")
);
} catch(IllegalArgumentException e) {
} catch(IllegalArgumentException | SchedulerException e) {
logger.error("Invalid arguement job cluster not updated ", e);
sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, CLIENT_ERROR, name + " Job cluster SLA updation failed " + e.getMessage()), getSelf());
sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, CLIENT_ERROR, name + " Job cluster SLA update failed " + e.getMessage()), getSelf());

} catch(Exception e) {
logger.error("job cluster not updated ", e);
sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, SERVER_ERROR, name + " Job cluster SLA updation failed " + e.getMessage()), getSelf());
sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, SERVER_ERROR, name + " Job cluster SLA update failed " + e.getMessage()), getSelf());
}
if(logger.isTraceEnabled()) { logger.trace("Exit onJobClusterUpdateSLA {}", slaRequest); }
}
Expand Down Expand Up @@ -3217,7 +3222,7 @@ private void initCron() throws Exception{
isCronActive = true;
} catch (IllegalArgumentException e) {
destroyCron();
logger.error("Failed to start cron for {}: {}", jobClusterName, e);
logger.error("Failed to start cron for {}: {}. The format of the cron schedule may be incorrect.", jobClusterName, e.getStackTrace());
throw new SchedulerException(e.getMessage(), e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.mantisrx.server.master.domain;

import com.netflix.fenzo.triggers.TriggerUtils;
import io.mantisrx.common.Label;
import io.mantisrx.master.jobcluster.LabelManager.SystemLabels;
import io.mantisrx.master.jobcluster.job.JobState;
Expand Down Expand Up @@ -64,6 +65,8 @@ public JobClusterDefinitionImpl(@JsonProperty("name") String name,
) {
Preconditions.checkNotNull(jobClusterConfigs);
Preconditions.checkArgument(!jobClusterConfigs.isEmpty());
if (sla != null && sla.getCronSpec() != null)
TriggerUtils.validateCronExpression(sla.getCronSpec());
this.owner = owner;
this.name = name;
this.sla = Optional.ofNullable(sla).orElse(new SLA(0, 0, null, CronPolicy.KEEP_EXISTING));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,52 @@ public void testCronTriggersSLAToKillOld() {

}

@Test(expected = IllegalArgumentException.class)
public void testInvalidCronDefined() {
TestKit probe = new TestKit(system);
String clusterName = "testInvalidCronSubmit";
MantisScheduler schedulerMock = mock(MantisScheduler.class);
MantisJobStore jobStoreMock = mock(MantisJobStore.class);

SLA sla = new SLA(1,1,"a b * * * * * * *",IJobClusterDefinition.CronPolicy.KEEP_NEW);
final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); //should throw IllegalArgumentException
}

@Test
public void testInvalidCronSLAUpdate() throws Exception {
TestKit probe = new TestKit(system);
String clusterName = "testJobClusterInvalidSLAUpdateIgnored";
MantisScheduler schedulerMock = mock(MantisScheduler.class);
MantisJobStore jobStoreMock = mock(MantisJobStore.class);

final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName);
ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0));
jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef());
JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class);
assertEquals(SUCCESS, createResp.responseCode);

UpdateJobClusterSLARequest updateSlaReq = new UpdateJobClusterSLARequest(clusterName, 2, 1,"a b * * * * * * *",IJobClusterDefinition.CronPolicy.KEEP_NEW,false,"user" );
jobClusterActor.tell(updateSlaReq, probe.getRef());
UpdateJobClusterSLAResponse resp = probe.expectMsgClass(UpdateJobClusterSLAResponse.class);

assertEquals(CLIENT_ERROR, resp.responseCode);
assertEquals(jobClusterActor, probe.getLastSender());

jobClusterActor.tell(new GetJobClusterRequest(clusterName), probe.getRef());
GetJobClusterResponse resp3 = probe.expectMsgClass(GetJobClusterResponse.class);

assertEquals(SUCCESS, resp3.responseCode);
assertTrue(resp3.getJobCluster() != null);
System.out.println("Job cluster " + resp3.getJobCluster());
assertEquals(clusterName, resp3.getJobCluster().get().getName());
// No changes to original SLA
assertEquals(0, resp3.getJobCluster().get().getSla().getMin());
assertEquals(0, resp3.getJobCluster().get().getSla().getMax());

verify(jobStoreMock, times(1)).createJobCluster(any());
verify(jobStoreMock, times(0)).updateJobCluster(any());
}

@Test
public void testJobSubmitWithUnique() {
TestKit probe = new TestKit(system);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void testQuickSubmitJob() throws IOException, InterruptedException {
agentId0,
agent0,
5,
Duration.ofSeconds(3).toMillis())) {
Duration.ofSeconds(10).toMillis())) {
fail("Failed to register agent: " + agent0.getContainerId());
}

Expand All @@ -221,7 +221,7 @@ public void testQuickSubmitJob() throws IOException, InterruptedException {
controlPlaneHost,
controlPlanePort,
10,
Duration.ofSeconds(2).toMillis())) {
Duration.ofSeconds(10).toMillis())) {
fail("Failed to start job worker.");
}

Expand Down Expand Up @@ -264,7 +264,7 @@ public void testRegularSubmitJob() throws IOException, InterruptedException {
agentId0,
agent0,
5,
Duration.ofSeconds(3).toMillis())) {
Duration.ofSeconds(10).toMillis())) {
fail("Failed to register agent: " + agent0.getContainerId());
}

Expand All @@ -274,7 +274,7 @@ public void testRegularSubmitJob() throws IOException, InterruptedException {
controlPlaneHost,
controlPlanePort,
5,
Duration.ofSeconds(2).toMillis())) {
Duration.ofSeconds(10).toMillis())) {
fail("Failed to start job worker.");
}

Expand Down

0 comments on commit 2de4671

Please sign in to comment.