Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crc mismatch during deepstore upload retry task #14506

Merged
merged 5 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.restlet.resources;

import com.fasterxml.jackson.annotation.JsonProperty;


public class TableSegmentUploadV2Response {
private final String _segmentName;
private final String _segmentCrc;
tibrewalpratik17 marked this conversation as resolved.
Show resolved Hide resolved
private final String _downloadUrl;

public TableSegmentUploadV2Response(@JsonProperty("segmentName") String segmentName,
@JsonProperty("segmentCrc") String crc, @JsonProperty("downloadUrl") String downloadUrl) {
_segmentName = segmentName;
_segmentCrc = crc;
_downloadUrl = downloadUrl;
}

public String getSegmentName() {
return _segmentName;
}

public String getSegmentCrc() {
return _segmentCrc;
}

public String getDownloadUrl() {
return _downloadUrl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.restlet.resources.TableSegmentUploadV2Response;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.common.utils.http.HttpClientConfig;
import org.apache.pinot.spi.auth.AuthProvider;
Expand Down Expand Up @@ -963,6 +964,32 @@ public String uploadToSegmentStore(String uri)
return downloadUrl;
}

/**
* Used by controllers to send requests to servers: Controller periodic task uses this endpoint to ask servers
* to upload committed llc segment to segment store if missing.
* @param uri The uri to ask servers to upload segment to segment store
* @return {@link TableSegmentUploadV2Response} - segment download url, crc, other metadata
* @throws URISyntaxException
* @throws IOException
* @throws HttpErrorStatusException
*/
public TableSegmentUploadV2Response uploadToSegmentStoreV2(String uri)
throws URISyntaxException, IOException, HttpErrorStatusException {
ClassicRequestBuilder requestBuilder = ClassicRequestBuilder.post(new URI(uri)).setVersion(HttpVersion.HTTP_1_1);
// sendRequest checks the response status code
SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(
_httpClient.sendRequest(requestBuilder.build(), HttpClient.DEFAULT_SOCKET_TIMEOUT_MS));
TableSegmentUploadV2Response tableSegmentUploadV2Response = JsonUtils.stringToObject(response.getResponse(),
TableSegmentUploadV2Response.class);
if (tableSegmentUploadV2Response.getDownloadUrl() == null
|| tableSegmentUploadV2Response.getDownloadUrl().isEmpty()) {
throw new HttpErrorStatusException(
String.format("Returned segment download url is empty after requesting servers to upload by the path: %s",
uri), response.getStatusCode());
}
return tableSegmentUploadV2Response;
}

/**
* Send segment uri.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.restlet.resources.TableSegmentUploadV2Response;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
Expand Down Expand Up @@ -1557,22 +1558,41 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe

// Randomly ask one server to upload
URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
serverUploadRequestUrl);
String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
String segmentDownloadUrl =
moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);

// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
try {
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "uploadV2");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
serverUploadRequestUrl);
TableSegmentUploadV2Response tableSegmentUploadV2Response
= _fileUploadDownloadClient.uploadToSegmentStoreV2(serverUploadRequestUrl);
String segmentDownloadUrl =
moveSegmentFile(rawTableName, segmentName, tableSegmentUploadV2Response.getDownloadUrl(), pinotFS);
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
// Update ZK crc to that of the server segment crc if unmatched
if (Long.parseLong(tableSegmentUploadV2Response.getSegmentCrc()) != segmentZKMetadata.getCrc()) {
segmentZKMetadata.setCrc(Long.parseLong(tableSegmentUploadV2Response.getSegmentCrc()));
tibrewalpratik17 marked this conversation as resolved.
Show resolved Hide resolved
}
} catch (Exception e) {
// this is a fallback call for backward compatibility to the original API /upload in pinot-server
// should be deprecated in the long run
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
serverUploadRequestUrl);
String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
String segmentDownloadUrl = moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
}
// TODO: add version check when persist segment ZK metadata
persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
segmentDownloadUrl);
segmentZKMetadata.getDownloadUrl());
_controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.restlet.resources.TableSegmentUploadV2Response;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
Expand Down Expand Up @@ -1045,6 +1046,134 @@ public void testUploadToSegmentStore()
assertNull(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(4), null).getDownloadUrl());
}

/**
* Test cases for fixing LLC segment by uploading to segment store if missing
*/
@Test
public void testUploadToSegmentStoreV2()
throws HttpErrorStatusException, IOException, URISyntaxException {
// mock the behavior for PinotHelixResourceManager
PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
HelixManager helixManager = mock(HelixManager.class);
HelixAdmin helixAdmin = mock(HelixAdmin.class);
ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore =
(ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class);
when(pinotHelixResourceManager.getHelixZkManager()).thenReturn(helixManager);
when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
when(helixManager.getClusterName()).thenReturn(CLUSTER_NAME);
when(pinotHelixResourceManager.getPropertyStore()).thenReturn(zkHelixPropertyStore);

// init fake PinotLLCRealtimeSegmentManager
ControllerConf controllerConfig = new ControllerConf();
controllerConfig.setProperty(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT,
true);
controllerConfig.setDataDir(TEMP_DIR.toString());
FakePinotLLCRealtimeSegmentManager segmentManager =
new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager, controllerConfig);
Assert.assertTrue(segmentManager.isDeepStoreLLCSegmentUploadRetryEnabled());

// Set up a new table with 2 replicas, 5 instances, 5 partition.
setUpNewTable(segmentManager, 2, 5, 5);
SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig =
new SegmentsValidationAndRetentionConfig();
segmentsValidationAndRetentionConfig.setRetentionTimeUnit(TimeUnit.DAYS.toString());
segmentsValidationAndRetentionConfig.setRetentionTimeValue("3");
segmentManager._tableConfig.setValidationConfig(segmentsValidationAndRetentionConfig);
List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(segmentManager._segmentZKMetadataMap.values());
Assert.assertEquals(segmentsZKMetadata.size(), 5);

// Set up external view for this table
ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
when(helixAdmin.getResourceExternalView(CLUSTER_NAME, REALTIME_TABLE_NAME)).thenReturn(externalView);

// Change 1st segment status to be DONE, but with default peer download url.
// Verify later the download url is fixed after upload success.
segmentsZKMetadata.get(0).setStatus(Status.DONE);
segmentsZKMetadata.get(0).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
// set up the external view for 1st segment
String instance0 = "instance0";
int adminPort = 2077;
externalView.setState(segmentsZKMetadata.get(0).getSegmentName(), instance0, "ONLINE");
InstanceConfig instanceConfig0 = new InstanceConfig(instance0);
instanceConfig0.setHostName(instance0);
instanceConfig0.getRecord().setIntField(Instance.ADMIN_PORT_KEY, adminPort);
when(helixAdmin.getInstanceConfig(CLUSTER_NAME, instance0)).thenReturn(instanceConfig0);
// mock the request/response for 1st segment upload
String serverUploadRequestUrl0 =
String.format("http://%s:%d/segments/%s/%s/uploadV2?uploadTimeoutMs=-1", instance0, adminPort,
REALTIME_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName());
// tempSegmentFileLocation is the location where the segment uploader will upload the segment. This usually ends
// with a random UUID
File tempSegmentFileLocation = new File(TEMP_DIR, segmentsZKMetadata.get(0).getSegmentName() + UUID.randomUUID());
FileUtils.write(tempSegmentFileLocation, "test");
// After the deep-store retry task gets the segment location returned by Pinot server, it will move the segment to
// its final location. This is the expected segment location.
String expectedSegmentLocation =
segmentManager.createSegmentPath(RAW_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName()).toString();
when(segmentManager._mockedFileUploadDownloadClient.uploadToSegmentStoreV2(serverUploadRequestUrl0)).thenReturn(
new TableSegmentUploadV2Response(segmentsZKMetadata.get(0).getSegmentName(), "12345678",
tempSegmentFileLocation.getPath()));

// Change 2nd segment status to be DONE, but with default peer download url.
// Verify later the download url isn't fixed after upload failure.
segmentsZKMetadata.get(1).setStatus(Status.DONE);
segmentsZKMetadata.get(1).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
// set up the external view for 2nd segment
String instance1 = "instance1";
externalView.setState(segmentsZKMetadata.get(1).getSegmentName(), instance1, "ONLINE");
InstanceConfig instanceConfig1 = new InstanceConfig(instance1);
instanceConfig1.setHostName(instance1);
instanceConfig1.getRecord().setIntField(Instance.ADMIN_PORT_KEY, adminPort);
when(helixAdmin.getInstanceConfig(CLUSTER_NAME, instance1)).thenReturn(instanceConfig1);
// mock the request/response for 2nd segment upload
String serverUploadRequestUrl1 =
String.format("http://%s:%d/segments/%s/%s/uploadV2?uploadTimeoutMs=-1", instance1, adminPort,
REALTIME_TABLE_NAME, segmentsZKMetadata.get(1).getSegmentName());
when(segmentManager._mockedFileUploadDownloadClient.uploadToSegmentStoreV2(serverUploadRequestUrl1)).thenThrow(
new HttpErrorStatusException("failed to upload segment",
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()));

// Change 3rd segment status to be DONE, but with default peer download url.
// Verify later the download url isn't fixed because no ONLINE replica found in any server.
segmentsZKMetadata.get(2).setStatus(Status.DONE);
segmentsZKMetadata.get(2).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
// set up the external view for 3rd segment
String instance2 = "instance2";
externalView.setState(segmentsZKMetadata.get(2).getSegmentName(), instance2, "OFFLINE");

// Change 4th segment status to be DONE and with segment download url.
// Verify later the download url is still the same.
String defaultDownloadUrl = "canItBeDownloaded";
segmentsZKMetadata.get(3).setStatus(Status.DONE);
segmentsZKMetadata.get(3).setDownloadUrl(defaultDownloadUrl);

// Keep 5th segment status as IN_PROGRESS.

List<String> segmentNames =
segmentsZKMetadata.stream().map(SegmentZKMetadata::getSegmentName).collect(Collectors.toList());
when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(segmentManager._tableConfig);

// Verify the result
segmentManager.uploadToDeepStoreIfMissing(segmentManager._tableConfig, segmentsZKMetadata);

// Block until all tasks have been able to complete
TestUtils.waitForCondition(aVoid -> segmentManager.deepStoreUploadExecutorPendingSegmentsIsEmpty(), 30_000L,
"Timed out waiting for upload retry tasks to finish");

assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(0), null).getDownloadUrl(),
expectedSegmentLocation);
assertFalse(tempSegmentFileLocation.exists(),
"Deep-store retry task should move the file from temp location to permanent location");

assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(1), null).getDownloadUrl(),
METADATA_URI_FOR_PEER_DOWNLOAD);
assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(2), null).getDownloadUrl(),
METADATA_URI_FOR_PEER_DOWNLOAD);
assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(3), null).getDownloadUrl(),
defaultDownloadUrl);
assertNull(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(4), null).getDownloadUrl());
}

@Test
public void testDeleteTmpSegmentFiles()
throws Exception {
Expand Down
Loading
Loading