Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
vvivekiyer committed Nov 18, 2024
1 parent 9a7814c commit 8a395ea
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ public void testFirstSegmentRefresh() {
waitForTaskToComplete();

// Check that metadata contains expected values
Map<String, Long> segmentRefreshTime = new HashMap<>();
Map<String, String> segmentRefreshTime = new HashMap<>();
String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX;
for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Get the value in segment metadata
Map<String, String> customMap = metadata.getCustomMap();
assertTrue(customMap.containsKey(refreshKey));
segmentRefreshTime.put(metadata.getSegmentName(), Long.parseLong(customMap.get(refreshKey)));
segmentRefreshTime.put(metadata.getSegmentName(), customMap.get(refreshKey));
}

// This should be no-op as nothing changes.
Expand All @@ -135,7 +135,7 @@ public void testFirstSegmentRefresh() {
Map<String, String> customMap = metadata.getCustomMap();
assertTrue(
customMap.containsKey(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX));
assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), Long.parseLong(customMap.get(refreshKey)),
assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), customMap.get(refreshKey),
"Refresh Time doesn't match");
}
}
Expand Down Expand Up @@ -387,10 +387,15 @@ public void checkColumnAddition() throws Exception {
assertTrue(derivedNullStringColumnIndex.has(StandardIndexes.NULL_VALUE_VECTOR_ID));
}


@Test(priority = 5)
public void checkRefreshNotNecessary() throws Exception {
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());

Map<String, Long> segmentCrc = new HashMap<>();
for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
segmentCrc.put(metadata.getSegmentName(), metadata.getCrc());
}

TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setQuotaConfig(new QuotaConfig(null, "10"));

Expand All @@ -406,13 +411,15 @@ public void checkRefreshNotNecessary() throws Exception {
waitForTaskToComplete();

// Check that metadata contains expected values
Map<String, Long> segmentRefreshTime = new HashMap<>();
Map<String, String> segmentRefreshTime = new HashMap<>();

String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX;
for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Get the value in segment metadata
Map<String, String> customMap = metadata.getCustomMap();
assertTrue(customMap.containsKey(refreshKey));
segmentRefreshTime.put(metadata.getSegmentName(), Long.parseLong(customMap.get(refreshKey)));
segmentRefreshTime.put(metadata.getSegmentName(), customMap.get(refreshKey));
assertEquals(segmentCrc.get(metadata.getSegmentName()), metadata.getCrc(), "CRC does not match");
}

// This should be no-op as nothing changes.
Expand All @@ -423,7 +430,7 @@ public void checkRefreshNotNecessary() throws Exception {
Map<String, String> customMap = metadata.getCustomMap();
assertTrue(
customMap.containsKey(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX));
assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), Long.parseLong(customMap.get(refreshKey)),
assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), customMap.get(refreshKey),
"Refresh Time doesn't match");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
package org.apache.pinot.plugin.minion.tasks;

import java.net.URI;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import javax.annotation.Nullable;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.ExternalView;
Expand Down Expand Up @@ -54,6 +58,9 @@ public class MinionTaskUtils {

private static final String DEFAULT_DIR_PATH_TERMINATOR = "/";

public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
public static final String UTC = "UTC";

private MinionTaskUtils() {
}

Expand Down Expand Up @@ -235,4 +242,15 @@ public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String tableNameW
}
return validDocIds;
}

public static String toUTCString(long epochMillis) {
Date date = new Date(epochMillis);
SimpleDateFormat isoFormat = new SimpleDateFormat(DATETIME_PATTERN);
isoFormat.setTimeZone(TimeZone.getTimeZone(UTC));
return isoFormat.format(date);
}

public static long fromUTCString(String utcString) {
return Instant.parse(utcString).toEpochMilli();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
Expand Down Expand Up @@ -202,6 +203,6 @@ protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifi
SegmentConversionResult segmentConversionResult) {
return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX,
String.valueOf(_taskStartTime)));
MinionTaskUtils.toUTCString(_taskStartTime)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@


@TaskExecutorFactory
public class SegmentRefreshTaskExecutorFactory implements PinotTaskExecutorFactory {
public class RefreshSegmentTaskExecutorFactory implements PinotTaskExecutorFactory {

@Override
public void init(MinionTaskZkMetadataManager zkMetadataManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.RefreshSegmentTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
Expand Down Expand Up @@ -149,7 +150,7 @@ private boolean shouldRefreshSegment(SegmentZKMetadata segmentZKMetadata, TableC

long lastProcessedTime = 0L;
if (segmentZKMetadata.getCustomMap() != null && segmentZKMetadata.getCustomMap().containsKey(timestampKey)) {
lastProcessedTime = Long.parseLong(segmentZKMetadata.getCustomMap().get(timestampKey));
lastProcessedTime = MinionTaskUtils.fromUTCString(segmentZKMetadata.getCustomMap().get(timestampKey));
}

if (tableStat == null || schemaStat == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@


@EventObserverFactory
public class SegmentRefreshTaskProgressObserverFactory extends BaseMinionProgressObserverFactory {
public class RefreshSegmentTaskProgressObserverFactory extends BaseMinionProgressObserverFactory {

@Override
public String getTaskType() {
Expand Down

0 comments on commit 8a395ea

Please sign in to comment.