Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crc mismatch during deepstore upload retry task #14506

Merged
merged 5 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
Expand Down Expand Up @@ -1557,6 +1558,7 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe

// Randomly ask one server to upload
URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
String crcFromServer = getSegmentCrcFromServer(realtimeTableName, segmentName, uri.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we read crc and ask server to upload as 2 separate steps. A more ideal solution would be to enhance the server upload API to return both segment location (i.e. tempSegmentDownloadUrl) and also crc. The response can be a json wrapping more info other than the download url.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enhance the server upload API to return both segment location (i.e. tempSegmentDownloadUrl) and also crc.

This change would require implementing a new server API, as users might be directly utilizing the existing API, making the modification backward-incompatible. If introducing a new API is acceptable, I can proceed with its implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new API is okay since the response format is changing. To keep it backward compatible, controller can first try the new API, and fallback to the old API if new API returns error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sgtm! Updated accordingly.

String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
Expand All @@ -1569,6 +1571,10 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe

// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
// Update ZK crc to that of the server segment crc if unmatched
if (Long.parseLong(crcFromServer) != segmentZKMetadata.getCrc()) {
segmentZKMetadata.setCrc(Long.parseLong(crcFromServer));
}
// TODO: add version check when persist segment ZK metadata
persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
Expand All @@ -1593,6 +1599,16 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe
}
}

@VisibleForTesting
String getSegmentCrcFromServer(String tableNameWithType, String segmentName, String endpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this backward compatible when the server is still running the old pinot version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this backward compatible when the server is still running the old pinot version?

The proposed change is not fully backward compatible during the interim period when the controller has been rolled out but the server is still running an older version. This is a common challenge for changes that involve interactions between the controller and server. Below are the considerations and potential solutions to address this issue:

  1. Soft Validation on Controller: Modify the controller to perform a soft validation during this transitional period. If the controller cannot connect to the older server, it proceeds with uploading to deep store as per the existing flow.
    Downside: This approach may result in a mismatch between Zookeeper's CRC and deep store's CRC, introducing risks of data inconsistency.
  2. Config-Driven Rollout: Introduce a configuration flag to enable the change in controller only after the server rollout is complete. This ensures that the new flow is activated only when both controller and server are running the compatible versions.
    Downside: Adds more configuration options for features that are meant to be defaults, leading to potential clutter and management overhead.
  3. Pause Deep Store Uploads During Rollout (Proposed):
    In the worst-case scenario, it might be acceptable to halt deep store uploads temporarily during the period between the controller and server rollouts. Once the rollout is complete, the process resumes normally.
    Rationale: This minimizes risk (e.g., CRC mismatches or partial uploads) and avoids introducing temporary configurations or complex validation mechanisms.
    Downside: There is a short period where new segments may not be uploaded to deep store, but this is generally manageable if the window is small.

ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
String crcFromServer = serverSegmentMetadataReader.getCrcForSegmentFromServer(tableNameWithType,
segmentName, endpoint);
Preconditions.checkState(crcFromServer != null,
"Failed to get CRC from endpoint %s for segment %s", endpoint, segmentName);
return crcFromServer;
}

@VisibleForTesting
boolean deepStoreUploadExecutorPendingSegmentsIsEmpty() {
return _deepStoreUploadExecutorPendingSegments.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,24 @@ public ValidDocIdsBitmapResponse getValidDocIdsBitmapFromServer(String tableName
return response;
}

/**
* Returns the crc value of the segment hosted on the server.
*/
@Nullable
public String getCrcForSegmentFromServer(String tableNameWithType, String segmentName, String endpoint) {
try {
// build the url
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8);
String url = String.format("%s/segments/%s/%s/crc", endpoint, tableNameWithType, segmentName);
ClientConfig clientConfig = new ClientConfig();
return ClientBuilder.newClient(clientConfig).target(url).request(MediaType.APPLICATION_JSON).get(String.class);
} catch (Exception e) {
LOGGER.error("Error in fetching crc from server {} for segment {}: {}", endpoint, segmentName, e.getMessage());
}
return null;
}

private String generateAggregateSegmentMetadataServerURL(String tableNameWithType, List<String> columns,
String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,8 @@ public void testUploadToSegmentStore()

assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(0), null).getDownloadUrl(),
expectedSegmentLocation);
assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(0), null).getCrc(),
Long.parseLong("12345678"));
assertFalse(tempSegmentFileLocation.exists(),
"Deep-store retry task should move the file from temp location to permanent location");

Expand Down Expand Up @@ -1196,6 +1198,11 @@ void persistSegmentZKMetadata(String realtimeTableName, SegmentZKMetadata segmen
_segmentZKMetadataVersionMap.put(segmentName, version + 1);
}

@Override
String getSegmentCrcFromServer(String tableNameWithType, String segmentName, String endpoint) {
return "12345678";
}

@Override
public IdealState getIdealState(String realtimeTableName) {
return _idealState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,35 @@ public ValidDocIdsBitmapResponse downloadValidDocIdsBitmap(
}
}

@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/segments/{tableNameWithType}/{segmentName}/crc")
@ApiOperation(value = "Return crc value for segment in the server.", notes =
"Return crc value for segment in the server.")
public String getCrcForSegment(
@ApiParam(value = "Name of the table with type REALTIME", required = true, example = "myTable_REALTIME")
@PathParam("tableNameWithType") String tableNameWithType,
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
@Context HttpHeaders httpHeaders) {
tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, httpHeaders);
segmentName = URIUtils.decode(segmentName);
LOGGER.info("Received a request to get Crc for segment {} table {}", segmentName, tableNameWithType);
// Validate data access
ServerResourceUtils.validateDataAccess(_accessControlFactory, tableNameWithType, httpHeaders);

TableDataManager tableDataManager =
ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType);
SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
if (segmentDataManager == null) {
throw new WebApplicationException(
String.format("Table %s segment %s does not exist", tableNameWithType, segmentName),
Response.Status.NOT_FOUND);
}
String crc = segmentDataManager.getSegment().getSegmentMetadata().getCrc();
tableDataManager.releaseSegment(segmentDataManager);
return crc;
}

/**
* Download snapshot for the given immutable segment for upsert table. This endpoint is used when get snapshot from
* peer to avoid recompute when reload segments.
Expand Down
Loading