diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java index 4beb2057437f..2535d5b9a7c2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java @@ -21,23 +21,17 @@ import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.function.SerializableFunction; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; -import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.storage.StoragePath; -import org.apache.hudi.storage.StoragePathInfo; -import org.apache.hudi.util.CommonClientUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,11 +40,9 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -109,7 +101,6 @@ public List collectRollbackStats(HoodieEngineContext context * May be delete interested files and collect stats or collect stats only. * * @param context instance of {@link HoodieEngineContext} to use. - * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested. * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on. * @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes. * @return stats collected with or w/o actual deletions. @@ -123,7 +114,6 @@ List> maybeDeleteAndCollectStats(HoodieEngineCo metaClient.getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT) ? rollbackRequests : groupSerializableRollbackRequestsBasedOnFileGroup(rollbackRequests); - final TaskContextSupplier taskContextSupplier = context.getTaskContextSupplier(); return context.flatMap(processedRollbackRequests, (SerializableFunction>>) rollbackRequest -> { List filesToBeDeleted = rollbackRequest.getFilesToBeDeleted(); if (!filesToBeDeleted.isEmpty()) { @@ -131,59 +121,6 @@ List> maybeDeleteAndCollectStats(HoodieEngineCo List> partitionToRollbackStats = new ArrayList<>(); rollbackStats.forEach(entry -> partitionToRollbackStats.add(Pair.of(entry.getPartitionPath(), entry))); return partitionToRollbackStats.stream(); - } else if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) { - HoodieLogFormat.Writer writer = null; - final StoragePath filePath; - try { - String fileId = rollbackRequest.getFileId(); - HoodieTableVersion tableVersion = metaClient.getTableConfig().getTableVersion(); - - writer = HoodieLogFormat.newWriterBuilder() - .onParentPath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) - .withFileId(fileId) - .withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier)) - .withInstantTime(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) - ? instantToRollback.requestedTime() : rollbackRequest.getLatestBaseInstant() - ) - .withStorage(metaClient.getStorage()) - .withTableVersion(tableVersion) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); - - // generate metadata - if (doDelete) { - Map header = generateHeader(instantToRollback.requestedTime()); - // if update belongs to an existing log file - // use the log file path from AppendResult in case the file handle may roll over - filePath = writer.appendBlock(new HoodieCommandBlock(header)).logFile().getPath(); - } else { - filePath = writer.getLogFile().getPath(); - } - } catch (IOException | InterruptedException io) { - throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io); - } finally { - try { - if (writer != null) { - writer.close(); - } - } catch (IOException io) { - throw new HoodieIOException("Error appending rollback block", io); - } - } - - // This step is intentionally done after writer is closed. Guarantees that - // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in - // cloud-storage : HUDI-168 - Map filesToNumBlocksRollback = Collections.singletonMap( - metaClient.getStorage().getPathInfo(Objects.requireNonNull(filePath)), - 1L - ); - - return Stream.of( - Pair.of(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder() - .withPartitionPath(rollbackRequest.getPartitionPath()) - .withRollbackBlockAppendResults(filesToNumBlocksRollback) - .build())); } else { // no action needed. return Stream.of(