Skip to content

Commit

Permalink
iterate through all servers
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 committed Jun 27, 2024
1 parent 1fc8d3b commit 1b76d1c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,5 @@ public static class UpsertCompactionTask {
* number of segments to query in one batch to fetch valid doc id metadata, by default 500
*/
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest";

/**
* skip crc mismatch as deepstore copies are not updated during schema / index changes
* so it's natural for crc to mismatch.
*/
public static final String SKIP_CRC_MISMATCH = "skipCrcMismatch";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@

import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
Expand Down Expand Up @@ -61,55 +66,66 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
String validDocIdsTypeStr =
configs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_TYPE, ValidDocIdsType.SNAPSHOT.name());
ValidDocIdsType validDocIdsType = ValidDocIdsType.valueOf(validDocIdsTypeStr.toUpperCase());
ValidDocIdsBitmapResponse validDocIdsBitmapResponse =
MinionTaskUtils.getValidDocIdsBitmap(tableNameWithType, segmentName, validDocIdsType.toString(),
MINION_CONTEXT);

// Check crc from the downloaded segment against the crc returned from the server along with the valid doc id
// bitmap. If this doesn't match, this means that we are hitting the race condition where the segment has been
// uploaded successfully while the server is still reloading the segment. Reloading can take a while when the
// offheap upsert is used because we will need to delete & add all primary keys.
// `BaseSingleSegmentConversionExecutor.executeTask()` already checks for the crc from the task generator
// against the crc from the current segment zk metadata, so we don't need to check that here.
String clusterName = MINION_CONTEXT.getHelixManager().getClusterName();
HelixAdmin helixAdmin = MINION_CONTEXT.getHelixManager().getClusterManagmentTool();
List<String> servers = MinionTaskUtils.getServers(segmentName, tableNameWithType, helixAdmin, clusterName);
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
String originalSegmentCrcFromTaskGenerator = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
String crcFromDeepStorageSegment = segmentMetadata.getCrc();
String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc();
if (!originalSegmentCrcFromTaskGenerator.equals(crcFromValidDocIdsBitmap)) {
// In this scenario, the segment is refreshed or reloaded (due to a schema or index change) between the
// task generation and task execution phases. It is safer to skip this segment for this execution cycle.
// However, we prefer to return it as a task error state so that if this issue occurs in consecutive runs,
// we can identify and address such scenarios.
String message = String.format("CRC mismatch for segment: %s, expected value based on task generator: %s, "
+ "actual crc from validDocIdsBitmapResponse: %s", segmentName, originalSegmentCrcFromTaskGenerator,
crcFromValidDocIdsBitmap);
LOGGER.error(message);
throw new IllegalStateException(message);
}
RoaringBitmap validDocIds = null;
for (String server : servers) {
InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, server);
String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);

// We only need aggregated table size and the total number of docs/rows. Skipping column related stats, by
// passing an empty list.
ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
try {
validDocIdsBitmapResponse = serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType,
segmentName, endpoint, validDocIdsType.toString(), 60_000);
} catch (Exception e) {
LOGGER.warn(
String.format("Unable to retrieve validDocIds bitmap for segment: %s from endpoint: %s", segmentName,
endpoint), e);
continue;
}

if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)) {
// We are introducing a skipCrcMismatch option because deepstore copies are not refreshed after schema or
// indexing changes, leading to natural CRC mismatches. If skipCrcMismatch enabled, we can allow all
// segments to be processed once and then disable it again. However, we should develop an intelligent way
// to detect this situation in a self-serve manner.
// Additionally, if skipCrcMismatch is disabled and a CRC mismatch is found, we will mark it as a task-fail error
// instead of marking the task as a success. Otherwise the segment would continue to be picked up in subsequent
// compaction cycles without any execution being actually carried out.
boolean skipCrcMismatch =
Boolean.parseBoolean(configs.get(MinionConstants.UpsertCompactionTask.SKIP_CRC_MISMATCH));
String message = String.format("CRC mismatch for segment: %s, "
+ "expected value based on task generator: %s, actual crc based on deepstore / server metadata copy: %s",
segmentName, originalSegmentCrcFromTaskGenerator, crcFromDeepStorageSegment);
if (skipCrcMismatch) {
// Check crc from the downloaded segment against the crc returned from the server along with the valid doc id
// bitmap. If this doesn't match, this means that we are hitting the race condition where the segment has been
// uploaded successfully while the server is still reloading the segment. Reloading can take a while when the
// offheap upsert is used because we will need to delete & add all primary keys.
// `BaseSingleSegmentConversionExecutor.executeTask()` already checks for the crc from the task generator
// against the crc from the current segment zk metadata, so we don't need to check that here.
String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc();
if (!originalSegmentCrcFromTaskGenerator.equals(crcFromValidDocIdsBitmap)) {
// In this scenario, we are hitting the other replica of the segment which did not commit to ZK or deepstore.
// We will skip processing this bitmap to query other server to confirm if there is a valid matching CRC.
String message = String.format("CRC mismatch for segment: %s, expected value based on task generator: %s, "
+ "actual crc from validDocIdsBitmapResponse from endpoint %s: %s", segmentName,
originalSegmentCrcFromTaskGenerator, endpoint, crcFromValidDocIdsBitmap);
LOGGER.warn(message);
continue;
}
if (!crcFromValidDocIdsBitmap.equals(crcFromDeepStorageSegment)) {
// Deepstore copies might not always have the same CRC as that from the server we queried for ValidDocIdsBitmap
// It can happen due to CRC mismatch issues due to replicas diverging, lucene index issues.
String message = String.format("CRC mismatch for segment: %s, "
+ "expected crc from validDocIdsBitmapResponse from endpoint %s: %s, "
+ "actual crc based on deepstore / server metadata copy: %s", segmentName, endpoint,
crcFromValidDocIdsBitmap, crcFromDeepStorageSegment);
LOGGER.warn(message);
} else {
LOGGER.error(message);
throw new IllegalStateException(message);
continue;
}
validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
}
if (validDocIds == null) {
// no valid crc match found or no validDocIds obtained from all servers
// error out the task instead of silently failing so that we can track it via task-error metrics
LOGGER.error("No validDocIds found from all servers. They either failed to download or did not match crc from"
+ "segment copy obtained from deepstore / servers.");
throw new IllegalStateException("No valid validDocIds found from all servers.");
}

RoaringBitmap validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());

if (validDocIds.isEmpty()) {
// prevents empty segment generation
LOGGER.info("validDocIds is empty, skip the task. Table: {}, segment: {}", tableNameWithType, segmentName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 0.0;
private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 1;
private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500;
private static final String DEFAULT_SKIP_CRC_MISMATCH = "false";

public static class SegmentSelectionResult {

Expand Down Expand Up @@ -165,8 +164,6 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
String.format("deleteRecordColumn must be provided for " + "UpsertCompactionTask with validDocIdsType = %s",
validDocIdsType));
}
String skipCrcMismatch = taskConfigs.getOrDefault(UpsertCompactionTask.SKIP_CRC_MISMATCH,
DEFAULT_SKIP_CRC_MISMATCH);

List<ValidDocIdsMetadataInfo> validDocIdsMetadataList =
serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments,
Expand Down Expand Up @@ -202,7 +199,6 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segment.getCrc()));
configs.put(UpsertCompactionTask.VALID_DOC_IDS_TYPE, validDocIdsType.toString());
configs.put(UpsertCompactionTask.SKIP_CRC_MISMATCH, String.valueOf(skipCrcMismatch));
pinotTaskConfigs.add(new PinotTaskConfig(UpsertCompactionTask.TASK_TYPE, configs));
numTasks++;
}
Expand Down

0 comments on commit 1b76d1c

Please sign in to comment.