Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][HUDI-8368] Remove unused logic of rollback log block creation in BaseRollbackHelper #12728

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,17 @@
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.util.CommonClientUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,11 +40,9 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -109,7 +101,6 @@ public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context
* May be delete interested files and collect stats or collect stats only.
*
* @param context instance of {@link HoodieEngineContext} to use.
* @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
* @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on.
* @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes.
* @return stats collected with or w/o actual deletions.
Expand All @@ -123,67 +114,13 @@ List<Pair<String, HoodieRollbackStat>> maybeDeleteAndCollectStats(HoodieEngineCo
metaClient.getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)
? rollbackRequests
: groupSerializableRollbackRequestsBasedOnFileGroup(rollbackRequests);
final TaskContextSupplier taskContextSupplier = context.getTaskContextSupplier();
return context.flatMap(processedRollbackRequests, (SerializableFunction<SerializableHoodieRollbackRequest, Stream<Pair<String, HoodieRollbackStat>>>) rollbackRequest -> {
List<String> filesToBeDeleted = rollbackRequest.getFilesToBeDeleted();
if (!filesToBeDeleted.isEmpty()) {
List<HoodieRollbackStat> rollbackStats = deleteFiles(metaClient, filesToBeDeleted, doDelete);
List<Pair<String, HoodieRollbackStat>> partitionToRollbackStats = new ArrayList<>();
rollbackStats.forEach(entry -> partitionToRollbackStats.add(Pair.of(entry.getPartitionPath(), entry)));
return partitionToRollbackStats.stream();
} else if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
HoodieLogFormat.Writer writer = null;
final StoragePath filePath;
try {
String fileId = rollbackRequest.getFileId();
HoodieTableVersion tableVersion = metaClient.getTableConfig().getTableVersion();

writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
.withFileId(fileId)
.withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier))
.withInstantTime(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
? instantToRollback.requestedTime() : rollbackRequest.getLatestBaseInstant()
)
.withStorage(metaClient.getStorage())
.withTableVersion(tableVersion)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();

// generate metadata
if (doDelete) {
Map<HoodieLogBlock.HeaderMetadataType, String> header = generateHeader(instantToRollback.requestedTime());
// if update belongs to an existing log file
// use the log file path from AppendResult in case the file handle may roll over
filePath = writer.appendBlock(new HoodieCommandBlock(header)).logFile().getPath();
} else {
filePath = writer.getLogFile().getPath();
}
} catch (IOException | InterruptedException io) {
throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
} finally {
try {
if (writer != null) {
writer.close();
}
} catch (IOException io) {
throw new HoodieIOException("Error appending rollback block", io);
}
}

// This step is intentionally done after writer is closed. Guarantees that
// getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
// cloud-storage : HUDI-168
Map<StoragePathInfo, Long> filesToNumBlocksRollback = Collections.singletonMap(
metaClient.getStorage().getPathInfo(Objects.requireNonNull(filePath)),
1L
);

return Stream.of(
Pair.of(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder()
.withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
.build()));
} else {
// no action needed.
return Stream.of(
Expand Down
Loading