Skip to content

Commit

Permalink
Introduce RemoteIndexBuilder skeleton
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Feb 13, 2025
1 parent 349a715 commit 25981af
Show file tree
Hide file tree
Showing 21 changed files with 2,460 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class KNNFeatureFlags {

// Feature flags
private static final String KNN_FORCE_EVICT_CACHE_ENABLED = "knn.feature.cache.force_evict.enabled";
private static final String KNN_REMOTE_VECTOR_BUILD = "knn.feature.remote_index_build.enabled";

@VisibleForTesting
public static final Setting<Boolean> KNN_FORCE_EVICT_CACHE_ENABLED_SETTING = Setting.boolSetting(
Expand All @@ -35,8 +36,15 @@ public class KNNFeatureFlags {
Dynamic
);

public static final Setting<Boolean> KNN_REMOTE_VECTOR_BUILD_SETTING = Setting.boolSetting(
KNN_REMOTE_VECTOR_BUILD,
false,
NodeScope,
Dynamic
);

public static List<Setting<?>> getFeatureFlags() {
return ImmutableList.of(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING);
return ImmutableList.of(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING, KNN_REMOTE_VECTOR_BUILD_SETTING);
}

/**
Expand All @@ -46,4 +54,11 @@ public static List<Setting<?>> getFeatureFlags() {
public static boolean isForceEvictCacheEnabled() {
return Booleans.parseBoolean(KNNSettings.state().getSettingValue(KNN_FORCE_EVICT_CACHE_ENABLED).toString(), false);
}

/**
* @return true if remote vector index build feature flag is enabled
*/
public static boolean isKNNRemoteVectorBuildEnabled() {
return Booleans.parseBooleanStrict(KNNSettings.state().getSettingValue(KNN_REMOTE_VECTOR_BUILD).toString(), false);
}
}
25 changes: 23 additions & 2 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@

import static java.util.stream.Collectors.toUnmodifiableMap;
import static org.opensearch.common.settings.Setting.Property.Dynamic;
import static org.opensearch.common.settings.Setting.Property.Final;
import static org.opensearch.common.settings.Setting.Property.IndexScope;
import static org.opensearch.common.settings.Setting.Property.NodeScope;
import static org.opensearch.common.settings.Setting.Property.Final;
import static org.opensearch.common.settings.Setting.Property.UnmodifiableOnRestore;
import static org.opensearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio;
import static org.opensearch.core.common.unit.ByteSizeValue.parseBytesSizeValue;
Expand Down Expand Up @@ -94,6 +94,8 @@ public class KNNSettings {
public static final String KNN_FAISS_AVX512_SPR_DISABLED = "knn.faiss.avx512_spr.disabled";
public static final String KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED = "index.knn.disk.vector.shard_level_rescoring_disabled";
public static final String KNN_DERIVED_SOURCE_ENABLED = "index.knn.derived_source.enabled";
public static final String KNN_INDEX_REMOTE_VECTOR_BUILD = "index.knn.remote_index_build.enabled";
public static final String KNN_REMOTE_VECTOR_REPO = "knn.remote_index_build.vector_repo";

/**
* Default setting values
Expand Down Expand Up @@ -371,6 +373,15 @@ public class KNNSettings {
NodeScope
);

public static final Setting<Boolean> KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING = Setting.boolSetting(
KNN_INDEX_REMOTE_VECTOR_BUILD,
false,
Dynamic,
IndexScope
);

public static final Setting<String> KNN_REMOTE_VECTOR_REPO_SETTING = Setting.simpleString(KNN_REMOTE_VECTOR_REPO, Dynamic, NodeScope);

/**
* Dynamic settings
*/
Expand Down Expand Up @@ -525,6 +536,14 @@ private Setting<?> getSetting(String key) {
return KNN_DERIVED_SOURCE_ENABLED_SETTING;
}

if (KNN_INDEX_REMOTE_VECTOR_BUILD.equals(key)) {
return KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING;
}

if (KNN_REMOTE_VECTOR_REPO.equals(key)) {
return KNN_REMOTE_VECTOR_REPO_SETTING;
}

throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}

Expand All @@ -550,7 +569,9 @@ public List<Setting<?>> getSettings() {
QUANTIZATION_STATE_CACHE_SIZE_LIMIT_SETTING,
QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES_SETTING,
KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING,
KNN_DERIVED_SOURCE_ENABLED_SETTING
KNN_DERIVED_SOURCE_ENABLED_SETTING,
KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING,
KNN_REMOTE_VECTOR_REPO_SETTING
);
return Stream.concat(settings.stream(), Stream.concat(getFeatureFlags().stream(), dynamicCacheSettings.values().stream()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public KnnVectorsFormat getKnnVectorsFormatForField(final String field) {
return nativeEngineVectorsFormat();
}

private NativeEngines990KnnVectorsFormat nativeEngineVectorsFormat() {
protected KnnVectorsFormat nativeEngineVectorsFormat() {
// mapperService is already checked for null or valid instance type at caller, hence we don't need
// addition isPresent check here.
int approximateThreshold = getApproximateThresholdValue();
Expand All @@ -145,7 +145,7 @@ private NativeEngines990KnnVectorsFormat nativeEngineVectorsFormat() {
);
}

private int getApproximateThresholdValue() {
protected int getApproximateThresholdValue() {
// This is private method and mapperService is already checked for null or valid instance type before this call
// at caller, hence we don't need additional isPresent check here.
final IndexSettings indexSettings = mapperService.get().getIndexSettings();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.codec.KNN10010Codec;

import org.apache.lucene.codecs.KnnVectorsFormat;
import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil;
import org.apache.lucene.codecs.lucene99.Lucene99FlatVectorsFormat;
import org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat;
import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsFormat;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.codec.BasePerFieldKnnVectorsFormat;
import org.opensearch.knn.index.codec.KNN9120Codec.KNN9120HnswBinaryVectorsFormat;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.remote.RemoteIndexBuilder;

import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Class provides per field format implementation for Lucene Knn vector type
*/
public class KNN10010PerFieldKnnVectorsFormat extends BasePerFieldKnnVectorsFormat {
private static final Tuple<Integer, ExecutorService> DEFAULT_MERGE_THREAD_COUNT_AND_EXECUTOR_SERVICE = Tuple.tuple(1, null);
@Nullable
private RemoteIndexBuilder remoteIndexBuilder;

public KNN10010PerFieldKnnVectorsFormat(final Optional<MapperService> mapperService, final RemoteIndexBuilder remoteIndexBuilder) {
super(
mapperService,
Lucene99HnswVectorsFormat.DEFAULT_MAX_CONN,
Lucene99HnswVectorsFormat.DEFAULT_BEAM_WIDTH,
Lucene99HnswVectorsFormat::new,
knnVectorsFormatParams -> {
final Tuple<Integer, ExecutorService> mergeThreadCountAndExecutorService = getMergeThreadCountAndExecutorService();
// There is an assumption here that hamming space will only be used for binary vectors. This will need to be fixed if that
// changes in the future.
if (knnVectorsFormatParams.getSpaceType() == SpaceType.HAMMING) {
return new KNN9120HnswBinaryVectorsFormat(
knnVectorsFormatParams.getMaxConnections(),
knnVectorsFormatParams.getBeamWidth(),
// number of merge threads
mergeThreadCountAndExecutorService.v1(),
// executor service
mergeThreadCountAndExecutorService.v2()
);
} else {
return new Lucene99HnswVectorsFormat(
knnVectorsFormatParams.getMaxConnections(),
knnVectorsFormatParams.getBeamWidth(),
// number of merge threads
mergeThreadCountAndExecutorService.v1(),
// executor service
mergeThreadCountAndExecutorService.v2()
);
}
},
knnScalarQuantizedVectorsFormatParams -> {
final Tuple<Integer, ExecutorService> mergeThreadCountAndExecutorService = getMergeThreadCountAndExecutorService();
return new Lucene99HnswScalarQuantizedVectorsFormat(
knnScalarQuantizedVectorsFormatParams.getMaxConnections(),
knnScalarQuantizedVectorsFormatParams.getBeamWidth(),
// Number of merge threads
mergeThreadCountAndExecutorService.v1(),
knnScalarQuantizedVectorsFormatParams.getBits(),
knnScalarQuantizedVectorsFormatParams.isCompressFlag(),
knnScalarQuantizedVectorsFormatParams.getConfidenceInterval(),
// Executor service
mergeThreadCountAndExecutorService.v2()
);
}
);
this.remoteIndexBuilder = remoteIndexBuilder;
}

public KNN10010PerFieldKnnVectorsFormat(final Optional<MapperService> mapperService) {
this(mapperService, null);
}

/**
* This method returns the maximum dimension allowed from KNNEngine for Lucene codec
*
* @param fieldName Name of the field, ignored
* @return Maximum constant dimension set by KNNEngine
*/
@Override
public int getMaxDimensions(String fieldName) {
return KNNEngine.getMaxDimensionByEngine(KNNEngine.LUCENE);
}

private static Tuple<Integer, ExecutorService> getMergeThreadCountAndExecutorService() {
// To ensure that only once we are fetching the settings per segment, we are fetching the num threads once while
// creating the executors
int mergeThreadCount = KNNSettings.getIndexThreadQty();
// We need to return null whenever the merge threads are <=1, as lucene assumes that if number of threads are 1
// then we should be giving a null value of the executor
if (mergeThreadCount <= 1) {
return DEFAULT_MERGE_THREAD_COUNT_AND_EXECUTOR_SERVICE;
} else {
return Tuple.tuple(mergeThreadCount, Executors.newFixedThreadPool(mergeThreadCount));
}
}

@Override
protected KnnVectorsFormat nativeEngineVectorsFormat() {
int approximateThreshold = getApproximateThresholdValue();
return new NativeEngines10010KnnVectorsFormat(
new Lucene99FlatVectorsFormat(FlatVectorScorerUtil.getLucene99FlatVectorsScorer()),
approximateThreshold,
remoteIndexBuilder
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.knn.index.codec.KNN10010Codec;

import org.apache.lucene.codecs.KnnVectorsFormat;
import org.apache.lucene.codecs.KnnVectorsReader;
import org.apache.lucene.codecs.KnnVectorsWriter;
import org.apache.lucene.codecs.hnsw.DefaultFlatVectorScorer;
import org.apache.lucene.codecs.hnsw.FlatVectorsFormat;
import org.apache.lucene.codecs.lucene99.Lucene99FlatVectorsFormat;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SegmentWriteState;
import org.opensearch.common.Nullable;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.codec.KNN990Codec.NativeEngines990KnnVectorsReader;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.remote.RemoteIndexBuilder;

import java.io.IOException;

/**
* This is a Vector format that will be used for Native engines like Faiss and Nmslib for reading and writing vector
* related data structures.
*/
public class NativeEngines10010KnnVectorsFormat extends KnnVectorsFormat {
/** The format for storing, reading, merging vectors on disk */
private static FlatVectorsFormat flatVectorsFormat;
private static final String FORMAT_NAME = "NativeEngines10010KnnVectorsFormat";
private static int approximateThreshold;
@Nullable
private final RemoteIndexBuilder remoteIndexBuilder;

// For Testing Only
public NativeEngines10010KnnVectorsFormat() {
this(new Lucene99FlatVectorsFormat(new DefaultFlatVectorScorer()));
}

// For Testing Only
public NativeEngines10010KnnVectorsFormat(int approximateThreshold) {
this(new Lucene99FlatVectorsFormat(new DefaultFlatVectorScorer()), approximateThreshold, null);
}

// For Testing Only
public NativeEngines10010KnnVectorsFormat(final FlatVectorsFormat flatVectorsFormat) {
this(flatVectorsFormat, KNNSettings.INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD_DEFAULT_VALUE, null);
}

public NativeEngines10010KnnVectorsFormat(
final FlatVectorsFormat flatVectorsFormat,
int approximateThreshold,
RemoteIndexBuilder remoteIndexBuilder
) {
super(FORMAT_NAME);
NativeEngines10010KnnVectorsFormat.flatVectorsFormat = flatVectorsFormat;
NativeEngines10010KnnVectorsFormat.approximateThreshold = approximateThreshold;
this.remoteIndexBuilder = remoteIndexBuilder;
}

/**
* Returns a {@link org.apache.lucene.codecs.KnnVectorsWriter} to write the vectors to the index.
*
* @param state {@link org.apache.lucene.index.SegmentWriteState}
*/
@Override
public KnnVectorsWriter fieldsWriter(final SegmentWriteState state) throws IOException {
return new NativeEngines10010KnnVectorsWriter(
state,
flatVectorsFormat.fieldsWriter(state),
approximateThreshold,
remoteIndexBuilder
);
}

/**
* Returns a {@link org.apache.lucene.codecs.KnnVectorsReader} to read the vectors from the index.
*
* @param state {@link org.apache.lucene.index.SegmentReadState}
*/
@Override
public KnnVectorsReader fieldsReader(final SegmentReadState state) throws IOException {
return new NativeEngines990KnnVectorsReader(state, flatVectorsFormat.fieldsReader(state));
}

/**
* @param s
* @return
*/
@Override
public int getMaxDimensions(String s) {
return KNNEngine.getMaxDimensionByEngine(KNNEngine.LUCENE);
}

@Override
public String toString() {
return "NativeEngines99KnnVectorsFormat(name="
+ this.getClass().getSimpleName()
+ ", flatVectorsFormat="
+ flatVectorsFormat
+ ", approximateThreshold="
+ approximateThreshold
+ ")";
}
}
Loading

0 comments on commit 25981af

Please sign in to comment.