Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8901] Fix Timeline Server to process requests from multiple storage lakes #12696

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.NetworkUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.timeline.service.TimelineService;

Expand Down Expand Up @@ -180,16 +178,15 @@ private void startServer(TimelineServiceCreator timelineServiceCreator) throws I

this.serviceConfig = timelineServiceConfBuilder.build();

server = timelineServiceCreator.create(context, storageConf.newInstance(), serviceConfig,
HoodieStorageUtils.getStorage(writeConfig.getBasePath(), storageConf.newInstance()), viewManager);
server = timelineServiceCreator.create(context, storageConf.newInstance(), serviceConfig, viewManager);
serverPort = server.startService();
LOG.info("Started embedded timeline server at " + hostAddr + ":" + serverPort);
}

@FunctionalInterface
interface TimelineServiceCreator {
TimelineService create(HoodieEngineContext context, StorageConfiguration<?> storageConf, TimelineService.Config timelineServerConf,
HoodieStorage storage, FileSystemViewManager globalFileSystemViewManager) throws IOException;
FileSystemViewManager globalFileSystemViewManager) throws IOException;
}

private void setHostAddr(String embeddedTimelineServiceHostAddr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void embeddedTimelineServiceReused() throws Exception {
.build();
EmbeddedTimelineService.TimelineServiceCreator mockCreator = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class);
TimelineService mockService = Mockito.mock(TimelineService.class);
when(mockCreator.create(any(), any(), any(), any(), any())).thenReturn(mockService);
when(mockCreator.create(any(), any(), any(), any())).thenReturn(mockService);
when(mockService.startService()).thenReturn(123);
EmbeddedTimelineService service1 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig1, mockCreator);

Expand Down Expand Up @@ -96,7 +96,7 @@ public void embeddedTimelineServiceCreatedForDifferentMetadataConfig() throws Ex
.build();
EmbeddedTimelineService.TimelineServiceCreator mockCreator = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class);
TimelineService mockService = Mockito.mock(TimelineService.class);
when(mockCreator.create(any(), any(), any(), any(), any())).thenReturn(mockService);
when(mockCreator.create(any(), any(), any(), any())).thenReturn(mockService);
when(mockService.startService()).thenReturn(321);
EmbeddedTimelineService service1 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig1, mockCreator);

Expand All @@ -110,7 +110,7 @@ public void embeddedTimelineServiceCreatedForDifferentMetadataConfig() throws Ex
.build();
EmbeddedTimelineService.TimelineServiceCreator mockCreator2 = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class);
TimelineService mockService2 = Mockito.mock(TimelineService.class);
when(mockCreator2.create(any(), any(), any(), any(), any())).thenReturn(mockService2);
when(mockCreator2.create(any(), any(), any(), any())).thenReturn(mockService2);
when(mockService2.startService()).thenReturn(456);
EmbeddedTimelineService service2 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig2, mockCreator2);
assertNotSame(service1, service2);
Expand All @@ -133,7 +133,7 @@ public void embeddedTimelineServerNotReusedIfReuseDisabled() throws Exception {
.build();
EmbeddedTimelineService.TimelineServiceCreator mockCreator = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class);
TimelineService mockService = Mockito.mock(TimelineService.class);
when(mockCreator.create(any(), any(), any(), any(), any())).thenReturn(mockService);
when(mockCreator.create(any(), any(), any(), any())).thenReturn(mockService);
when(mockService.startService()).thenReturn(789);
EmbeddedTimelineService service1 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig1, mockCreator);

Expand All @@ -144,7 +144,7 @@ public void embeddedTimelineServerNotReusedIfReuseDisabled() throws Exception {
.build();
EmbeddedTimelineService.TimelineServiceCreator mockCreator2 = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class);
TimelineService mockService2 = Mockito.mock(TimelineService.class);
when(mockCreator2.create(any(), any(), any(), any(), any())).thenReturn(mockService2);
when(mockCreator2.create(any(), any(), any(), any())).thenReturn(mockService2);
when(mockService2.startService()).thenReturn(987);
EmbeddedTimelineService service2 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig2, mockCreator2);
assertNotSame(service1, service2);
Expand All @@ -169,7 +169,7 @@ public void embeddedTimelineServerIsNotReusedAfterStopped() throws Exception {
.build();
EmbeddedTimelineService.TimelineServiceCreator mockCreator = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class);
TimelineService mockService = Mockito.mock(TimelineService.class);
when(mockCreator.create(any(), any(), any(), any(), any())).thenReturn(mockService);
when(mockCreator.create(any(), any(), any(), any())).thenReturn(mockService);
when(mockService.startService()).thenReturn(555);
EmbeddedTimelineService service1 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig1, mockCreator);

Expand All @@ -182,7 +182,7 @@ public void embeddedTimelineServerIsNotReusedAfterStopped() throws Exception {
.build();
EmbeddedTimelineService.TimelineServiceCreator mockCreator2 = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class);
TimelineService mockService2 = Mockito.mock(TimelineService.class);
when(mockCreator2.create(any(), any(), any(), any(), any())).thenReturn(mockService2);
when(mockCreator2.create(any(), any(), any(), any())).thenReturn(mockService2);
when(mockService2.startService()).thenReturn(111);
EmbeddedTimelineService service2 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig2, mockCreator2);
// a new service will be started since the original was shutdown already
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public void setup() throws IOException {
try {
timelineService = new TimelineService(localEngineContext, HadoopFSUtils.getStorageConf(),
TimelineService.Config.builder().serverPort(0).enableMarkerRequests(true).build(),
storage,
FileSystemViewManager.createViewManager(localEngineContext, storageConf, HoodieCommonConfig.newBuilder().build()));
timelineService.startService();
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.timeline.service.TimelineService;
Expand Down Expand Up @@ -288,7 +287,6 @@ public static TimelineService initTimelineService(
TimelineService timelineService = new TimelineService(context, HadoopFSUtils.getStorageConf(),
TimelineService.Config.builder().enableMarkerRequests(true)
.serverPort(config.getViewStorageConfig().getRemoteViewServerPort()).build(),
HoodieStorageUtils.getStorage(HoodieTestUtils.getDefaultStorageConf()),
FileSystemViewManager.createViewManager(context, config.getViewStorageConfig(), config.getCommonConfig()));
timelineService.startService();
LOG.info("Timeline service server port: " + timelineServicePort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.metadata.HoodieBackedTestDelayedTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.timeline.service.TimelineService;

Expand Down Expand Up @@ -73,7 +72,6 @@
import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static org.apache.hudi.common.table.view.FileSystemViewStorageConfig.REMOTE_PORT_NUM;
import static org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -116,7 +114,6 @@ public void initTimelineService() {
timelineService = new TimelineService(localEngineContext, HadoopFSUtils.getStorageConf(),
TimelineService.Config.builder().enableMarkerRequests(true)
.serverPort(config.getViewStorageConfig().getRemoteViewServerPort()).build(),
HoodieStorageUtils.getStorage(getDefaultStorageConf()),
FileSystemViewManager.createViewManager(
context, config.getViewStorageConfig(),
config.getCommonConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.timeline.service.handlers.BaseFileHandler;
import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
Expand Down Expand Up @@ -91,22 +90,22 @@ public class RequestHandler {
private final ScheduledExecutorService asyncResultService;

public RequestHandler(Javalin app, StorageConfiguration<?> conf, TimelineService.Config timelineServiceConfig,
HoodieEngineContext hoodieEngineContext, HoodieStorage storage,
FileSystemViewManager viewManager) throws IOException {
HoodieEngineContext hoodieEngineContext,
FileSystemViewManager viewManager) {
this.timelineServiceConfig = timelineServiceConfig;
this.viewManager = viewManager;
this.app = app;
this.instantHandler = new TimelineHandler(conf, timelineServiceConfig, storage, viewManager);
this.sliceHandler = new FileSliceHandler(conf, timelineServiceConfig, storage, viewManager);
this.dataFileHandler = new BaseFileHandler(conf, timelineServiceConfig, storage, viewManager);
this.instantHandler = new TimelineHandler(conf, timelineServiceConfig, viewManager);
this.sliceHandler = new FileSliceHandler(conf, timelineServiceConfig, viewManager);
this.dataFileHandler = new BaseFileHandler(conf, timelineServiceConfig, viewManager);
if (timelineServiceConfig.enableMarkerRequests) {
this.markerHandler = new MarkerHandler(
conf, timelineServiceConfig, hoodieEngineContext, storage, viewManager, metricsRegistry);
conf, timelineServiceConfig, hoodieEngineContext, viewManager, metricsRegistry);
} else {
this.markerHandler = null;
}
if (timelineServiceConfig.enableInstantStateRequests) {
this.instantStateHandler = new InstantStateHandler(conf, timelineServiceConfig, storage, viewManager);
this.instantStateHandler = new InstantStateHandler(conf, timelineServiceConfig, viewManager);
} else {
this.instantStateHandler = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;

import com.beust.jcommander.JCommander;
Expand Down Expand Up @@ -56,7 +54,6 @@ public class TimelineService {
private final Config timelineServerConf;
private final StorageConfiguration<?> storageConf;
private transient HoodieEngineContext context;
private transient HoodieStorage storage;
private transient Javalin app = null;
private transient FileSystemViewManager fsViewsManager;
private transient RequestHandler requestHandler;
Expand All @@ -66,12 +63,11 @@ public int getServerPort() {
}

public TimelineService(HoodieEngineContext context, StorageConfiguration<?> storageConf, Config timelineServerConf,
HoodieStorage storage, FileSystemViewManager globalFileSystemViewManager) throws IOException {
FileSystemViewManager globalFileSystemViewManager) {
this.storageConf = storageConf;
this.timelineServerConf = timelineServerConf;
this.serverPort = timelineServerConf.serverPort;
this.context = context;
this.storage = storage;
this.fsViewsManager = globalFileSystemViewManager;
}

Expand Down Expand Up @@ -370,7 +366,7 @@ public int startService() throws IOException {
return realServerPort;
}

private void createApp() throws IOException {
private void createApp() {
// if app needs to be recreated, stop the existing one
if (app != null) {
app.stop();
Expand All @@ -390,7 +386,7 @@ private void createApp() throws IOException {
});

requestHandler = new RequestHandler(
app, storageConf, timelineServerConf, context, storage, fsViewsManager);
app, storageConf, timelineServerConf, context, fsViewsManager);
app.get("/", ctx -> ctx.result("Hello Hudi"));
requestHandler.register();
}
Expand Down Expand Up @@ -450,10 +446,6 @@ public StorageConfiguration<?> getStorageConf() {
return storageConf;
}

public HoodieStorage getStorage() {
return storage;
}

public static void main(String[] args) throws Exception {
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, null, args);
Expand All @@ -469,7 +461,6 @@ public static void main(String[] args) throws Exception {
new HoodieLocalEngineContext(storageConf.newInstance()),
storageConf.newInstance(),
cfg,
HoodieStorageUtils.getStorage(storageConf),
viewManager);
service.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@

import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.timeline.service.TimelineService;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -36,8 +34,8 @@
public class BaseFileHandler extends Handler {

public BaseFileHandler(StorageConfiguration<?> conf, TimelineService.Config timelineServiceConfig,
HoodieStorage storage, FileSystemViewManager viewManager) throws IOException {
super(conf, timelineServiceConfig, storage, viewManager);
FileSystemViewManager viewManager) {
super(conf, timelineServiceConfig, viewManager);
}

public List<BaseFileDTO> getLatestDataFiles(String basePath, String partitionPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.timeline.service.TimelineService;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -42,8 +40,8 @@
public class FileSliceHandler extends Handler {

public FileSliceHandler(StorageConfiguration<?> conf, TimelineService.Config timelineServiceConfig,
HoodieStorage storage, FileSystemViewManager viewManager) throws IOException {
super(conf, timelineServiceConfig, storage, viewManager);
FileSystemViewManager viewManager) {
super(conf, timelineServiceConfig, viewManager);
}

public List<FileSliceDTO> getAllFileSlices(String basePath, String partitionPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,24 @@

import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.timeline.service.TimelineService;

import java.io.IOException;

public abstract class Handler {

protected final StorageConfiguration<?> conf;
protected final TimelineService.Config timelineServiceConfig;
protected final HoodieStorage storage;
protected final FileSystemViewManager viewManager;

public Handler(StorageConfiguration<?> conf, TimelineService.Config timelineServiceConfig,
HoodieStorage storage, FileSystemViewManager viewManager) throws IOException {
this.conf = conf;
FileSystemViewManager viewManager) {
this.conf = conf.newInstance();
this.timelineServiceConfig = timelineServiceConfig;
this.storage = storage;
this.viewManager = viewManager;
}

public HoodieStorage getStorage(String path) {
return HoodieStorageUtils.getStorage(path, conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ public class InstantStateHandler extends Handler {
private final AtomicLong requestCount;

public InstantStateHandler(StorageConfiguration<?> conf, TimelineService.Config timelineServiceConfig,
HoodieStorage storage,
FileSystemViewManager viewManager) throws IOException {
super(conf, timelineServiceConfig, storage, viewManager);
FileSystemViewManager viewManager) {
super(conf, timelineServiceConfig, viewManager);
this.cachedInstantStates = new ConcurrentHashMap<>();
this.requestCount = new AtomicLong();
}
Expand Down Expand Up @@ -118,8 +117,9 @@ public boolean refresh(String instantStatePath) {
public List<InstantStateDTO> scanInstantState(StoragePath instantStatePath) {
try {
// Check instantStatePath exists before list status, see HUDI-5915
if (this.storage.exists(instantStatePath)) {
return this.storage.listDirectEntries(instantStatePath).stream()
HoodieStorage storage = getStorage(instantStatePath.toUri().toString());
if (storage.exists(instantStatePath)) {
return storage.listDirectEntries(instantStatePath).stream()
.map(InstantStateDTO::fromStoragePathInfo).collect(Collectors.toList());
} else {
return Collections.emptyList();
Expand Down
Loading
Loading