Skip to content

Commit

Permalink
Introduce RemoteIndexBuilder, refactor NativeIndexWriter into a separ…
Browse files Browse the repository at this point in the history
…ate interface

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Feb 13, 2025
1 parent 349a715 commit 105ae13
Show file tree
Hide file tree
Showing 16 changed files with 731 additions and 341 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class KNNFeatureFlags {

// Feature flags
private static final String KNN_FORCE_EVICT_CACHE_ENABLED = "knn.feature.cache.force_evict.enabled";
private static final String KNN_REMOTE_VECTOR_BUILD = "knn.feature.remote_index_build.enabled";

@VisibleForTesting
public static final Setting<Boolean> KNN_FORCE_EVICT_CACHE_ENABLED_SETTING = Setting.boolSetting(
Expand All @@ -35,8 +36,15 @@ public class KNNFeatureFlags {
Dynamic
);

public static final Setting<Boolean> KNN_REMOTE_VECTOR_BUILD_SETTING = Setting.boolSetting(
KNN_REMOTE_VECTOR_BUILD,
false,
NodeScope,
Dynamic
);

public static List<Setting<?>> getFeatureFlags() {
return ImmutableList.of(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING);
return ImmutableList.of(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING, KNN_REMOTE_VECTOR_BUILD_SETTING);
}

/**
Expand All @@ -46,4 +54,11 @@ public static List<Setting<?>> getFeatureFlags() {
public static boolean isForceEvictCacheEnabled() {
return Booleans.parseBoolean(KNNSettings.state().getSettingValue(KNN_FORCE_EVICT_CACHE_ENABLED).toString(), false);
}

/**
* @return true if remote vector index build feature flag is enabled
*/
public static boolean isKNNRemoteVectorBuildEnabled() {
return Booleans.parseBooleanStrict(KNNSettings.state().getSettingValue(KNN_REMOTE_VECTOR_BUILD).toString(), false);
}
}
25 changes: 23 additions & 2 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@

import static java.util.stream.Collectors.toUnmodifiableMap;
import static org.opensearch.common.settings.Setting.Property.Dynamic;
import static org.opensearch.common.settings.Setting.Property.Final;
import static org.opensearch.common.settings.Setting.Property.IndexScope;
import static org.opensearch.common.settings.Setting.Property.NodeScope;
import static org.opensearch.common.settings.Setting.Property.Final;
import static org.opensearch.common.settings.Setting.Property.UnmodifiableOnRestore;
import static org.opensearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio;
import static org.opensearch.core.common.unit.ByteSizeValue.parseBytesSizeValue;
Expand Down Expand Up @@ -94,6 +94,8 @@ public class KNNSettings {
public static final String KNN_FAISS_AVX512_SPR_DISABLED = "knn.faiss.avx512_spr.disabled";
public static final String KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED = "index.knn.disk.vector.shard_level_rescoring_disabled";
public static final String KNN_DERIVED_SOURCE_ENABLED = "index.knn.derived_source.enabled";
public static final String KNN_INDEX_REMOTE_VECTOR_BUILD = "index.knn.remote_index_build.enabled";
public static final String KNN_REMOTE_VECTOR_REPO = "knn.remote_index_build.vector_repo";

/**
* Default setting values
Expand Down Expand Up @@ -371,6 +373,15 @@ public class KNNSettings {
NodeScope
);

public static final Setting<Boolean> KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING = Setting.boolSetting(
KNN_INDEX_REMOTE_VECTOR_BUILD,
false,
Dynamic,
IndexScope
);

public static final Setting<String> KNN_REMOTE_VECTOR_REPO_SETTING = Setting.simpleString(KNN_REMOTE_VECTOR_REPO, Dynamic, NodeScope);

/**
* Dynamic settings
*/
Expand Down Expand Up @@ -525,6 +536,14 @@ private Setting<?> getSetting(String key) {
return KNN_DERIVED_SOURCE_ENABLED_SETTING;
}

if (KNN_INDEX_REMOTE_VECTOR_BUILD.equals(key)) {
return KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING;
}

if (KNN_REMOTE_VECTOR_REPO.equals(key)) {
return KNN_REMOTE_VECTOR_REPO_SETTING;
}

throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}

Expand All @@ -550,7 +569,9 @@ public List<Setting<?>> getSettings() {
QUANTIZATION_STATE_CACHE_SIZE_LIMIT_SETTING,
QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES_SETTING,
KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING,
KNN_DERIVED_SOURCE_ENABLED_SETTING
KNN_DERIVED_SOURCE_ENABLED_SETTING,
KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING,
KNN_REMOTE_VECTOR_REPO_SETTING
);
return Stream.concat(settings.stream(), Stream.concat(getFeatureFlags().stream(), dynamicCacheSettings.values().stream()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.knn.index.engine.KNNMethodContext;
import org.opensearch.knn.index.mapper.KNNMappingConfig;
import org.opensearch.knn.index.mapper.KNNVectorFieldType;
import org.opensearch.knn.index.remote.RemoteIndexBuilder;

import java.util.Map;
import java.util.Optional;
Expand All @@ -44,6 +45,7 @@ public abstract class BasePerFieldKnnVectorsFormat extends PerFieldKnnVectorsFor
private final Supplier<KnnVectorsFormat> defaultFormatSupplier;
private final Function<KNNVectorsFormatParams, KnnVectorsFormat> vectorsFormatSupplier;
private Function<KNNScalarQuantizedVectorsFormatParams, KnnVectorsFormat> scalarQuantizedVectorsFormatSupplier;
private final RemoteIndexBuilder remoteIndexBuilder;
private static final String MAX_CONNECTIONS = "max_connections";
private static final String BEAM_WIDTH = "beam_width";

Expand All @@ -54,11 +56,26 @@ public BasePerFieldKnnVectorsFormat(
Supplier<KnnVectorsFormat> defaultFormatSupplier,
Function<KNNVectorsFormatParams, KnnVectorsFormat> vectorsFormatSupplier
) {
this.mapperService = mapperService;
this.defaultMaxConnections = defaultMaxConnections;
this.defaultBeamWidth = defaultBeamWidth;
this.defaultFormatSupplier = defaultFormatSupplier;
this.vectorsFormatSupplier = vectorsFormatSupplier;
this(mapperService, defaultMaxConnections, defaultBeamWidth, defaultFormatSupplier, vectorsFormatSupplier, null, null);
}

public BasePerFieldKnnVectorsFormat(
Optional<MapperService> mapperService,
int defaultMaxConnections,
int defaultBeamWidth,
Supplier<KnnVectorsFormat> defaultFormatSupplier,
Function<KNNVectorsFormatParams, KnnVectorsFormat> vectorsFormatSupplier,
Function<KNNScalarQuantizedVectorsFormatParams, KnnVectorsFormat> scalarQuantizedVectorsFormatSupplier
) {
this(
mapperService,
defaultMaxConnections,
defaultBeamWidth,
defaultFormatSupplier,
vectorsFormatSupplier,
scalarQuantizedVectorsFormatSupplier,
null
);
}

@Override
Expand Down Expand Up @@ -141,7 +158,8 @@ private NativeEngines990KnnVectorsFormat nativeEngineVectorsFormat() {
int approximateThreshold = getApproximateThresholdValue();
return new NativeEngines990KnnVectorsFormat(
new Lucene99FlatVectorsFormat(FlatVectorScorerUtil.getLucene99FlatVectorsScorer()),
approximateThreshold
approximateThreshold,
remoteIndexBuilder
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.codec.BasePerFieldKnnVectorsFormat;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.remote.RemoteIndexBuilder;

import java.util.Optional;
import java.util.concurrent.ExecutorService;
Expand All @@ -25,6 +26,10 @@ public class KNN9120PerFieldKnnVectorsFormat extends BasePerFieldKnnVectorsForma
private static final Tuple<Integer, ExecutorService> DEFAULT_MERGE_THREAD_COUNT_AND_EXECUTOR_SERVICE = Tuple.tuple(1, null);

public KNN9120PerFieldKnnVectorsFormat(final Optional<MapperService> mapperService) {
this(mapperService, null);
}

public KNN9120PerFieldKnnVectorsFormat(final Optional<MapperService> mapperService, RemoteIndexBuilder remoteIndexBuilder) {
super(
mapperService,
Lucene99HnswVectorsFormat.DEFAULT_MAX_CONN,
Expand Down Expand Up @@ -67,7 +72,8 @@ public KNN9120PerFieldKnnVectorsFormat(final Optional<MapperService> mapperServi
// Executor service
mergeThreadCountAndExecutorService.v2()
);
}
},
remoteIndexBuilder
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.lucene.index.SegmentWriteState;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.remote.RemoteIndexBuilder;

import java.io.IOException;

Expand All @@ -33,6 +34,7 @@ public class NativeEngines990KnnVectorsFormat extends KnnVectorsFormat {
private static FlatVectorsFormat flatVectorsFormat;
private static final String FORMAT_NAME = "NativeEngines990KnnVectorsFormat";
private static int approximateThreshold;
private final RemoteIndexBuilder remoteIndexBuilder;

public NativeEngines990KnnVectorsFormat() {
this(new Lucene99FlatVectorsFormat(new DefaultFlatVectorScorer()));
Expand All @@ -47,9 +49,18 @@ public NativeEngines990KnnVectorsFormat(final FlatVectorsFormat flatVectorsForma
}

public NativeEngines990KnnVectorsFormat(final FlatVectorsFormat flatVectorsFormat, int approximateThreshold) {
this(flatVectorsFormat, approximateThreshold, null);
}

public NativeEngines990KnnVectorsFormat(
final FlatVectorsFormat flatVectorsFormat,
int approximateThreshold,
RemoteIndexBuilder remoteIndexBuilder
) {
super(FORMAT_NAME);
NativeEngines990KnnVectorsFormat.flatVectorsFormat = flatVectorsFormat;
NativeEngines990KnnVectorsFormat.approximateThreshold = approximateThreshold;
this.remoteIndexBuilder = remoteIndexBuilder;
}

/**
Expand All @@ -59,7 +70,7 @@ public NativeEngines990KnnVectorsFormat(final FlatVectorsFormat flatVectorsForma
*/
@Override
public KnnVectorsWriter fieldsWriter(final SegmentWriteState state) throws IOException {
return new NativeEngines990KnnVectorsWriter(state, flatVectorsFormat.fieldsWriter(state), approximateThreshold);
return new NativeEngines990KnnVectorsWriter(state, flatVectorsFormat.fieldsWriter(state), approximateThreshold, remoteIndexBuilder);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.codec.nativeindex.NativeIndexWriter;
import org.opensearch.knn.index.quantizationservice.QuantizationService;
import org.opensearch.knn.index.remote.RemoteIndexBuilder;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
import org.opensearch.knn.plugin.stats.KNNGraphValue;
import org.opensearch.knn.quantization.models.quantizationParams.QuantizationParams;
Expand All @@ -54,15 +55,26 @@ public class NativeEngines990KnnVectorsWriter extends KnnVectorsWriter {
private final List<NativeEngineFieldVectorsWriter<?>> fields = new ArrayList<>();
private boolean finished;
private final Integer approximateThreshold;
private final RemoteIndexBuilder remoteIndexBuilder;

public NativeEngines990KnnVectorsWriter(
SegmentWriteState segmentWriteState,
FlatVectorsWriter flatVectorsWriter,
Integer approximateThreshold
) {
this(segmentWriteState, flatVectorsWriter, approximateThreshold, null);
}

public NativeEngines990KnnVectorsWriter(
SegmentWriteState segmentWriteState,
FlatVectorsWriter flatVectorsWriter,
Integer approximateThreshold,
RemoteIndexBuilder remoteIndexBuilder
) {
this.segmentWriteState = segmentWriteState;
this.flatVectorsWriter = flatVectorsWriter;
this.approximateThreshold = approximateThreshold;
this.remoteIndexBuilder = remoteIndexBuilder;
}

/**
Expand Down Expand Up @@ -114,7 +126,13 @@ public void flush(int maxDoc, final Sorter.DocMap sortMap) throws IOException {
);
continue;
}
final NativeIndexWriter writer = NativeIndexWriter.getWriter(fieldInfo, segmentWriteState, quantizationState);
final NativeIndexWriter writer = NativeIndexWriter.getWriter(
fieldInfo,
segmentWriteState,
quantizationState,
remoteIndexBuilder,
knnVectorValuesSupplier
);
final KNNVectorValues<?> knnVectorValues = knnVectorValuesSupplier.get();

StopWatch stopWatch = new StopWatch().start();
Expand Down Expand Up @@ -153,7 +171,13 @@ public void mergeOneField(final FieldInfo fieldInfo, final MergeState mergeState
);
return;
}
final NativeIndexWriter writer = NativeIndexWriter.getWriter(fieldInfo, segmentWriteState, quantizationState);
final NativeIndexWriter writer = NativeIndexWriter.getWriter(
fieldInfo,
segmentWriteState,
quantizationState,
remoteIndexBuilder,
knnVectorValuesSupplier
);
final KNNVectorValues<?> knnVectorValues = knnVectorValuesSupplier.get();

StopWatch stopWatch = new StopWatch().start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,24 @@

package org.opensearch.knn.index.codec;

import org.opensearch.index.codec.CodecServiceConfig;
import org.apache.lucene.codecs.Codec;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.codec.CodecServiceConfig;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.knn.index.remote.RemoteIndexBuilder;

/**
* KNNCodecService to inject the right KNNCodec version
*/
public class KNNCodecService extends CodecService {

private final MapperService mapperService;
private final RemoteIndexBuilder remoteIndexBuilder;

public KNNCodecService(CodecServiceConfig codecServiceConfig) {
public KNNCodecService(CodecServiceConfig codecServiceConfig, RemoteIndexBuilder remoteIndexBuilder) {
super(codecServiceConfig.getMapperService(), codecServiceConfig.getIndexSettings(), codecServiceConfig.getLogger());
mapperService = codecServiceConfig.getMapperService();
this.remoteIndexBuilder = remoteIndexBuilder;
}

/**
Expand All @@ -30,6 +33,6 @@ public KNNCodecService(CodecServiceConfig codecServiceConfig) {
*/
@Override
public Codec codec(String name) {
return KNNCodecVersion.current().getKnnCodecSupplier().apply(super.codec(name), mapperService);
return KNNCodecVersion.current().getKnnCodecSupplier().apply(super.codec(name), mapperService, remoteIndexBuilder);
}
}
Loading

0 comments on commit 105ae13

Please sign in to comment.