Skip to content

Commit

Permalink
update crc post segment upload
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 committed Nov 22, 2024
1 parent 4f01b87 commit 271c305
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1556,20 +1556,10 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
segmentName));
}

ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
Collections.shuffle(peerSegmentURIs);
URI uriToUpload = peerSegmentURIs.get(0);
for (URI uri : peerSegmentURIs) {
uriToUpload = uri;
LOGGER.info("Get CRC from server {} for LLC segment {}", uri, segmentName);
String crcFromServer = serverSegmentMetadataReader.getCrcForSegmentFromServer(realtimeTableName,
segmentName, uri.toString());
if (crcFromServer != null && Long.parseLong(crcFromServer) == segmentZKMetadata.getCrc()) {
break;
}
}

String serverUploadRequestUrl = StringUtil.join("/", uriToUpload.toString(), "upload");
// Randomly ask one server to upload
URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
String crcFromServer = getSegmentCrcFromServer(realtimeTableName, segmentName, uri.toString());
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
Expand All @@ -1581,6 +1571,10 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe

// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
// Update ZK crc to that of the server segment crc if unmatched
if (Long.parseLong(crcFromServer) != segmentZKMetadata.getCrc()) {
segmentZKMetadata.setCrc(Long.parseLong(crcFromServer));
}
// TODO: add version check when persist segment ZK metadata
persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
Expand All @@ -1605,6 +1599,16 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
}
}

@VisibleForTesting
String getSegmentCrcFromServer(String tableNameWithType, String segmentName, String endpoint) {
ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
String crcFromServer = serverSegmentMetadataReader.getCrcForSegmentFromServer(tableNameWithType,
segmentName, endpoint);
Preconditions.checkState(crcFromServer != null,
"Failed to get CRC from endpoint %s for segment %s", endpoint, segmentName);
return crcFromServer;
}

@VisibleForTesting
boolean deepStoreUploadExecutorPendingSegmentsIsEmpty() {
return _deepStoreUploadExecutorPendingSegments.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,8 @@ public void testUploadToSegmentStore()

assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(0), null).getDownloadUrl(),
expectedSegmentLocation);
assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(0), null).getCrc(),
Long.parseLong("12345678"));
assertFalse(tempSegmentFileLocation.exists(),
"Deep-store retry task should move the file from temp location to permanent location");

Expand Down Expand Up @@ -1196,6 +1198,11 @@ void persistSegmentZKMetadata(String realtimeTableName, SegmentZKMetadata segmen
_segmentZKMetadataVersionMap.put(segmentName, version + 1);
}

@Override
String getSegmentCrcFromServer(String tableNameWithType, String segmentName, String endpoint) {
return "12345678";
}

@Override
public IdealState getIdealState(String realtimeTableName) {
return _idealState;
Expand Down

0 comments on commit 271c305

Please sign in to comment.