Skip to content

Commit

Permalink
Fix crc mismatch during deepstore upload retry task
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 committed Nov 20, 2024
1 parent c964d7c commit 249a4ee
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1555,26 +1555,38 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
segmentName));
}

// Randomly ask one server to upload
URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
serverUploadRequestUrl);
String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
String segmentDownloadUrl =
moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);

// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
// TODO: add version check when persist segment ZK metadata
persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
segmentDownloadUrl);
_controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
int iteration = 0;
// Round robin the servers until we find the one with the correct crc and successful upload
// If server is the last valid URI left then skip crc check as deepstore copy reliability takes a higher
// priority
for (URI uri: peerSegmentURIs) {
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d&expectedCrc=%d", serverUploadRequestUrl,
_deepstoreUploadRetryTimeoutMs,
(iteration == (peerSegmentURIs.size() - 1) ? -1 : segmentZKMetadata.getCrc()));
LOGGER.info("Ask server {} to upload LLC segment {} to deep store by this path: {}", uri, segmentName,
serverUploadRequestUrl);
String tempSegmentDownloadUrl;
try {
tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
} catch (Exception e) {
LOGGER.warn("Failed to upload LLC segment {} to deepstore from server {}", segmentName, uri);
iteration++;
continue;
}

String segmentDownloadUrl = moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
// Update segment ZK metadata by adding the download URL
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
// TODO: add version check when persist segment ZK metadata
persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
segmentDownloadUrl);
_controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
}
} catch (Exception e) {
_controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ public void testUploadToSegmentStore()
when(helixAdmin.getInstanceConfig(CLUSTER_NAME, instance0)).thenReturn(instanceConfig0);
// mock the request/response for 1st segment upload
String serverUploadRequestUrl0 =
String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1", instance0, adminPort,
String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1&expectedCrc=-1", instance0, adminPort,
REALTIME_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName());
// tempSegmentFileLocation is the location where the segment uploader will upload the segment. This usually ends
// with a random UUID
Expand All @@ -998,7 +998,7 @@ public void testUploadToSegmentStore()
when(helixAdmin.getInstanceConfig(CLUSTER_NAME, instance1)).thenReturn(instanceConfig1);
// mock the request/response for 2nd segment upload
String serverUploadRequestUrl1 =
String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1", instance1, adminPort,
String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1&expectedCrc=-1", instance1, adminPort,
REALTIME_TABLE_NAME, segmentsZKMetadata.get(1).getSegmentName());
when(segmentManager._mockedFileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl1)).thenThrow(
new HttpErrorStatusException("failed to upload segment",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ public String uploadLLCSegment(
String realtimeTableName,
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") String segmentName,
@QueryParam("uploadTimeoutMs") @DefaultValue("-1") int timeoutMs,
@QueryParam("expectedCrc") @DefaultValue("-1") long expectedCrc,
@Context HttpHeaders headers)
throws Exception {
realtimeTableName = DatabaseUtils.translateTableName(realtimeTableName, headers);
Expand Down Expand Up @@ -811,6 +812,14 @@ public String uploadLLCSegment(
Response.Status.NOT_FOUND);
}

// check if expected crc is the same as this server, this is to ensure ZK metadata crc matches deepstore CRC
if (expectedCrc != -1
&& expectedCrc != Long.parseLong(segmentDataManager.getSegment().getSegmentMetadata().getCrc())) {
throw new WebApplicationException(
String.format("Table %s segment %s crc does not match the expected crc", realtimeTableName, segmentName),
Response.Status.NOT_FOUND);
}

File segmentTarFile = null;
try {
// Create the tar.gz segment file in the server's segmentTarUploadDir folder with a unique file name.
Expand Down

0 comments on commit 249a4ee

Please sign in to comment.