Skip to content

Commit

Permalink
update crc post segment upload
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 committed Nov 22, 2024
1 parent 4f01b87 commit 4888c37
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1556,20 +1556,10 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
segmentName));
}

ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
Collections.shuffle(peerSegmentURIs);
URI uriToUpload = peerSegmentURIs.get(0);
for (URI uri : peerSegmentURIs) {
uriToUpload = uri;
LOGGER.info("Get CRC from server {} for LLC segment {}", uri, segmentName);
String crcFromServer = serverSegmentMetadataReader.getCrcForSegmentFromServer(realtimeTableName,
segmentName, uri.toString());
if (crcFromServer != null && Long.parseLong(crcFromServer) == segmentZKMetadata.getCrc()) {
break;
}
}

String serverUploadRequestUrl = StringUtil.join("/", uriToUpload.toString(), "upload");
// Randomly ask one server to upload
URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
String crcFromServer = getSegmentCrcFromServer(realtimeTableName, segmentName, uri.toString());
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
Expand All @@ -1581,6 +1571,10 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe

// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
// Update ZK crc to that of the server segment crc if unmatched
if (Long.parseLong(crcFromServer) != segmentZKMetadata.getCrc()) {
segmentZKMetadata.setCrc(Long.parseLong(crcFromServer));
}
// TODO: add version check when persist segment ZK metadata
persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
Expand All @@ -1605,6 +1599,16 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
}
}

@VisibleForTesting
String getSegmentCrcFromServer(String tableNameWithType, String segmentName, String endpoint) {
ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
String crcFromServer = serverSegmentMetadataReader.getCrcForSegmentFromServer(tableNameWithType,
segmentName, endpoint);
Preconditions.checkState(crcFromServer != null,
"Failed to get CRC from endpoint %s for segment %s", endpoint, segmentName);
return crcFromServer;
}

@VisibleForTesting
boolean deepStoreUploadExecutorPendingSegmentsIsEmpty() {
return _deepStoreUploadExecutorPendingSegments.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,7 @@ public void testUploadToSegmentStore()
// Verify later the download url is fixed after upload success.
segmentsZKMetadata.get(0).setStatus(Status.DONE);
segmentsZKMetadata.get(0).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
segmentsZKMetadata.get(0).setCrc(12345678);
// set up the external view for 1st segment
String instance0 = "instance0";
int adminPort = 2077;
Expand All @@ -970,10 +971,11 @@ public void testUploadToSegmentStore()
instanceConfig0.setHostName(instance0);
instanceConfig0.getRecord().setIntField(Instance.ADMIN_PORT_KEY, adminPort);
when(helixAdmin.getInstanceConfig(CLUSTER_NAME, instance0)).thenReturn(instanceConfig0);
String instanceEndpoint = String.format("http://%s:%d/segments/%s/%s", instance0, adminPort, REALTIME_TABLE_NAME,
segmentsZKMetadata.get(0).getSegmentName());
// mock the request/response for 1st segment upload
String serverUploadRequestUrl0 =
String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1", instance0, adminPort,
REALTIME_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName());
String.format("%s/upload?uploadTimeoutMs=-1", instanceEndpoint);
// tempSegmentFileLocation is the location where the segment uploader will upload the segment. This usually ends
// with a random UUID
File tempSegmentFileLocation = new File(TEMP_DIR, segmentsZKMetadata.get(0).getSegmentName() + UUID.randomUUID());
Expand All @@ -984,6 +986,8 @@ public void testUploadToSegmentStore()
segmentManager.createSegmentPath(RAW_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName()).toString();
when(segmentManager._mockedFileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl0)).thenReturn(
tempSegmentFileLocation.getPath());
when(segmentManager.getSegmentCrcFromServer(REALTIME_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName(),
instanceEndpoint)).thenReturn("12345678");

// Change 2nd segment status to be DONE, but with default peer download url.
// Verify later the download url isn't fixed after upload failure.
Expand Down

0 comments on commit 4888c37

Please sign in to comment.