From 8a395ea9c1d8acc7b3a5b7b45148735acb0e5ae9 Mon Sep 17 00:00:00 2001 From: Vivek Iyer Vaidyanathan Iyer Date: Tue, 19 Nov 2024 02:04:13 +0530 Subject: [PATCH] Address review comments. --- ...shSegmentMinionClusterIntegrationTest.java | 21 ++++++++++++------- .../plugin/minion/tasks/MinionTaskUtils.java | 18 ++++++++++++++++ .../RefreshSegmentTaskExecutor.java | 3 ++- ...=> RefreshSegmentTaskExecutorFactory.java} | 2 +- .../RefreshSegmentTaskGenerator.java | 3 ++- ...shSegmentTaskProgressObserverFactory.java} | 2 +- 6 files changed, 38 insertions(+), 11 deletions(-) rename pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/{SegmentRefreshTaskExecutorFactory.java => RefreshSegmentTaskExecutorFactory.java} (96%) rename pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/{SegmentRefreshTaskProgressObserverFactory.java => RefreshSegmentTaskProgressObserverFactory.java} (95%) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java index 0620f45c6239..7f91a8671ed1 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java @@ -118,13 +118,13 @@ public void testFirstSegmentRefresh() { waitForTaskToComplete(); // Check that metadata contains expected values - Map segmentRefreshTime = new HashMap<>(); + Map segmentRefreshTime = new HashMap<>(); String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX; for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { // Get the value in segment metadata Map customMap = metadata.getCustomMap(); assertTrue(customMap.containsKey(refreshKey)); - segmentRefreshTime.put(metadata.getSegmentName(), Long.parseLong(customMap.get(refreshKey))); + segmentRefreshTime.put(metadata.getSegmentName(), customMap.get(refreshKey)); } // This should be no-op as nothing changes. @@ -135,7 +135,7 @@ public void testFirstSegmentRefresh() { Map customMap = metadata.getCustomMap(); assertTrue( customMap.containsKey(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX)); - assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), Long.parseLong(customMap.get(refreshKey)), + assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), customMap.get(refreshKey), "Refresh Time doesn't match"); } } @@ -387,10 +387,15 @@ public void checkColumnAddition() throws Exception { assertTrue(derivedNullStringColumnIndex.has(StandardIndexes.NULL_VALUE_VECTOR_ID)); } - @Test(priority = 5) public void checkRefreshNotNecessary() throws Exception { String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); + + Map segmentCrc = new HashMap<>(); + for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { + segmentCrc.put(metadata.getSegmentName(), metadata.getCrc()); + } + TableConfig tableConfig = getOfflineTableConfig(); tableConfig.setQuotaConfig(new QuotaConfig(null, "10")); @@ -406,13 +411,15 @@ public void checkRefreshNotNecessary() throws Exception { waitForTaskToComplete(); // Check that metadata contains expected values - Map segmentRefreshTime = new HashMap<>(); + Map segmentRefreshTime = new HashMap<>(); + String refreshKey = MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX; for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { // Get the value in segment metadata Map customMap = metadata.getCustomMap(); assertTrue(customMap.containsKey(refreshKey)); - segmentRefreshTime.put(metadata.getSegmentName(), Long.parseLong(customMap.get(refreshKey))); + segmentRefreshTime.put(metadata.getSegmentName(), customMap.get(refreshKey)); + assertEquals(segmentCrc.get(metadata.getSegmentName()), metadata.getCrc(), "CRC does not match"); } // This should be no-op as nothing changes. @@ -423,7 +430,7 @@ public void checkRefreshNotNecessary() throws Exception { Map customMap = metadata.getCustomMap(); assertTrue( customMap.containsKey(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX)); - assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), Long.parseLong(customMap.get(refreshKey)), + assertEquals(segmentRefreshTime.get(metadata.getSegmentName()), customMap.get(refreshKey), "Refresh Time doesn't match"); } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java index 55dfb97f981e..5e41720cdea2 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java @@ -19,10 +19,14 @@ package org.apache.pinot.plugin.minion.tasks; import java.net.URI; +import java.text.SimpleDateFormat; +import java.time.Instant; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TimeZone; import javax.annotation.Nullable; import org.apache.helix.HelixAdmin; import org.apache.helix.model.ExternalView; @@ -54,6 +58,9 @@ public class MinionTaskUtils { private static final String DEFAULT_DIR_PATH_TERMINATOR = "/"; + public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + public static final String UTC = "UTC"; + private MinionTaskUtils() { } @@ -235,4 +242,15 @@ public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String tableNameW } return validDocIds; } + + public static String toUTCString(long epochMillis) { + Date date = new Date(epochMillis); + SimpleDateFormat isoFormat = new SimpleDateFormat(DATETIME_PATTERN); + isoFormat.setTimeZone(TimeZone.getTimeZone(UTC)); + return isoFormat.format(date); + } + + public static long fromUTCString(String utcString) { + return Instant.parse(utcString).toEpochMilli(); + } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java index e554a6e24731..a4a77818ae1d 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java @@ -27,6 +27,7 @@ import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor; +import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils; import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; @@ -202,6 +203,6 @@ protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifi SegmentConversionResult segmentConversionResult) { return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX, - String.valueOf(_taskStartTime))); + MinionTaskUtils.toUTCString(_taskStartTime))); } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/SegmentRefreshTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutorFactory.java similarity index 96% rename from pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/SegmentRefreshTaskExecutorFactory.java rename to pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutorFactory.java index 2f8f93be6589..5214d466456b 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/SegmentRefreshTaskExecutorFactory.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutorFactory.java @@ -27,7 +27,7 @@ @TaskExecutorFactory -public class SegmentRefreshTaskExecutorFactory implements PinotTaskExecutorFactory { +public class RefreshSegmentTaskExecutorFactory implements PinotTaskExecutorFactory { @Override public void init(MinionTaskZkMetadataManager zkMetadataManager) { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java index 6e28dcca307c..59e85c1b1e8e 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java @@ -32,6 +32,7 @@ import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.common.MinionConstants.RefreshSegmentTask; import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils; import org.apache.pinot.spi.annotations.minion.TaskGenerator; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableTaskConfig; @@ -149,7 +150,7 @@ private boolean shouldRefreshSegment(SegmentZKMetadata segmentZKMetadata, TableC long lastProcessedTime = 0L; if (segmentZKMetadata.getCustomMap() != null && segmentZKMetadata.getCustomMap().containsKey(timestampKey)) { - lastProcessedTime = Long.parseLong(segmentZKMetadata.getCustomMap().get(timestampKey)); + lastProcessedTime = MinionTaskUtils.fromUTCString(segmentZKMetadata.getCustomMap().get(timestampKey)); } if (tableStat == null || schemaStat == null) { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/SegmentRefreshTaskProgressObserverFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskProgressObserverFactory.java similarity index 95% rename from pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/SegmentRefreshTaskProgressObserverFactory.java rename to pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskProgressObserverFactory.java index 8387c2d71fa4..b10db9490194 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/SegmentRefreshTaskProgressObserverFactory.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskProgressObserverFactory.java @@ -24,7 +24,7 @@ @EventObserverFactory -public class SegmentRefreshTaskProgressObserverFactory extends BaseMinionProgressObserverFactory { +public class RefreshSegmentTaskProgressObserverFactory extends BaseMinionProgressObserverFactory { @Override public String getTaskType() {