Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: introducing Vector Search for Qdrant Collection #9621

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/.cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
"opentelemetry",
"SEMANTICKERNEL",
"OTEL",
"vectorizable"
"vectorizable",
"desync"
]
}
73 changes: 51 additions & 22 deletions python/samples/concepts/memory/new_memory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) Microsoft. All rights reserved.

import argparse
import asyncio
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Annotated
Expand All @@ -25,6 +27,8 @@
VectorStoreRecordVectorField,
vectorstoremodel,
)
from semantic_kernel.data.vector_search.vector_search_options import VectorSearchOptions
from semantic_kernel.data.vector_search.vectorized_search import VectorizedSearchMixin


@vectorstoremodel
Expand Down Expand Up @@ -88,7 +92,7 @@ class MyDataModelList:
# function which returns the store.
# Using a function allows for lazy initialization of the store,
# so that settings for unused stores do not cause validation errors.
stores: dict[str, Callable[[], VectorStoreRecordCollection]] = {
services: dict[str, Callable[[], VectorStoreRecordCollection]] = {
"ai_search": lambda: AzureAISearchCollection[MyDataModel](
data_model_type=MyDataModel,
),
Expand Down Expand Up @@ -120,43 +124,67 @@ class MyDataModelList:
}


async def main(store: str, use_azure_openai: bool, embedding_model: str):
async def main(service: str, use_azure_openai: bool, embedding_model: str):
print("-" * 30)
kernel = Kernel()
service_id = "embedding"
if use_azure_openai:
kernel.add_service(AzureTextEmbedding(service_id=service_id, deployment_name=embedding_model))
embedder = AzureTextEmbedding(service_id=service_id, deployment_name=embedding_model)
else:
kernel.add_service(OpenAITextEmbedding(service_id=service_id, ai_model_id=embedding_model))
async with stores[store]() as record_store:
await record_store.create_collection_if_not_exists()

record1 = MyDataModel(content="My text", id="e6103c03-487f-4d7d-9c23-4723651c17f4")
record2 = MyDataModel(content="My other text", id="09caec77-f7e1-466a-bcec-f1d51c5b15be")

embedder = OpenAITextEmbedding(service_id=service_id, ai_model_id=embedding_model)
kernel.add_service(embedder)
async with services[service]() as record_collection:
print(f"Creating {service} collection!")
await record_collection.create_collection_if_not_exists()

record1 = MyDataModel(content="Semantic Kernel is awesome", id="e6103c03-487f-4d7d-9c23-4723651c17f4")
record2 = MyDataModel(
content="Semantic Kernel is available in dotnet, python and Java.",
id="09caec77-f7e1-466a-bcec-f1d51c5b15be",
)
print("Adding records!")
records = await VectorStoreRecordUtils(kernel).add_vector_to_records(
[record1, record2], data_model_type=MyDataModel
)
keys = await record_store.upsert_batch(records)
print(f"upserted {keys=}")

results = await record_store.get_batch([record1.id, record2.id])
keys = await record_collection.upsert_batch(records)
print(f" Upserted {keys=}")
print("Getting records!")
results = await record_collection.get_batch([record1.id, record2.id])
if results:
for result in results:
print(f"found {result.id=}")
print(f"{result.content=}")
print(f" Found id: {result.id}")
print(f" Content: {result.content}")
if result.vector is not None:
print(f"{result.vector[:5]=}")
print(f" Vector (first five): {result.vector[:5]}")
else:
print("not found")
print("Nothing found...")
if isinstance(record_collection, VectorizedSearchMixin):
print("-" * 30)
print("Using vectorized search, the distance function is set to cosine_similarity.")
print("This means that the higher the score the more similar.")
search_results = await record_collection.vectorized_search(
vector=(await embedder.generate_raw_embeddings(["python"]))[0],
options=VectorSearchOptions(vector_field_name="vector", include_vectors=True),
)
results = [record async for record in search_results.results]
for result in results:
print(f" Found id: {result.record.id}")
print(f" Content: {result.record.content}")
if result.record.vector is not None:
print(f" Vector (first five): {result.record.vector[:5]}")
print(f" Score: {result.score:.4f}")
print("")
print("-" * 30)
print("Deleting collection!")
await record_collection.delete_collection()
print("Done!")


if __name__ == "__main__":
import asyncio

argparse.ArgumentParser()

parser = argparse.ArgumentParser()
parser.add_argument("--store", default="in_memory", choices=stores.keys(), help="What store to use.")
parser.add_argument("--service", default="in_memory", choices=services.keys(), help="What store to use.")
# Option of whether to use OpenAI or Azure OpenAI.
parser.add_argument("--use-azure-openai", action="store_true", help="Use Azure OpenAI instead of OpenAI.")
# Model
Expand All @@ -165,4 +193,5 @@ async def main(store: str, use_azure_openai: bool, embedding_model: str):
)
args = parser.parse_args()

asyncio.run(main(store=args.store, use_azure_openai=args.use_azure_openai, embedding_model=args.model))
asyncio.run(main(service=args.service, use_azure_openai=args.use_azure_openai, embedding_model=args.model))
time.sleep(1)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import logging
import sys
from collections.abc import Mapping, Sequence
from typing import Any, ClassVar, TypeVar
from typing import Any, ClassVar, Generic, TypeVar

from semantic_kernel.exceptions.search_exceptions import VectorSearchExecutionException

if sys.version_info >= (3, 12):
from typing import override # pragma: no cover
Expand All @@ -12,12 +14,16 @@

from pydantic import ValidationError
from qdrant_client.async_qdrant_client import AsyncQdrantClient
from qdrant_client.models import PointStruct, VectorParams
from qdrant_client.models import FieldCondition, Filter, MatchAny, PointStruct, QueryResponse, ScoredPoint, VectorParams

from semantic_kernel.connectors.memory.qdrant.const import DISTANCE_FUNCTION_MAP, TYPE_MAPPER_VECTOR
from semantic_kernel.connectors.memory.qdrant.utils import AsyncQdrantClientWrapper
from semantic_kernel.data.kernel_search_results import KernelSearchResults
from semantic_kernel.data.record_definition import VectorStoreRecordDefinition, VectorStoreRecordVectorField
from semantic_kernel.data.vector_storage import VectorStoreRecordCollection
from semantic_kernel.data.vector_search.vector_search import VectorSearchBase
from semantic_kernel.data.vector_search.vector_search_options import VectorSearchOptions
from semantic_kernel.data.vector_search.vector_search_result import VectorSearchResult
from semantic_kernel.data.vector_search.vectorized_search import VectorizedSearchMixin
from semantic_kernel.exceptions import (
MemoryConnectorInitializationError,
VectorStoreModelValidationError,
Expand All @@ -34,7 +40,11 @@


@experimental_class
class QdrantCollection(VectorStoreRecordCollection[str | int, TModel]):
class QdrantCollection(
VectorSearchBase[str | int, TModel],
VectorizedSearchMixin[TModel],
Generic[TModel],
):
"""A QdrantCollection is a memory collection that uses Qdrant as the backend."""

qdrant_client: AsyncQdrantClient
Expand Down Expand Up @@ -70,6 +80,8 @@ def __init__(
When nothing is supplied, it defaults to an in-memory qdrant instance.
You can also supply a async qdrant client directly.

If you want to use the vectorizable_text_search you will need to install `qrant_client[fastembed]`.

Args:
data_model_type (type[TModel]): The type of the data model.
data_model_definition (VectorStoreRecordDefinition): The model fields, optional.
Expand Down Expand Up @@ -163,6 +175,53 @@ async def _inner_delete(self, keys: Sequence[TKey], **kwargs: Any) -> None:
**kwargs,
)

@override
async def _inner_search(
self,
options: VectorSearchOptions,
search_text: str | None = None,
vectorizable_text: str | None = None,
vector: list[float | int] | None = None,
**kwargs: Any,
) -> KernelSearchResults[VectorSearchResult[TModel]]:
query_vector: tuple[str, list[float | int]] | list[float | int] | None = None
if vector is not None:
if self.named_vectors and options.vector_field_name:
query_vector = (options.vector_field_name, vector)
else:
query_vector = vector
if query_vector is None:
raise VectorSearchExecutionException("Search requires either a vector.")
results = await self.qdrant_client.search(
collection_name=self.collection_name,
query_vector=query_vector,
query_filter=self._create_filter(options),
with_vectors=options.include_vectors,
limit=options.top,
offset=options.skip,
**kwargs,
)
return KernelSearchResults(
results=self._get_vector_search_results_from_results(results),
total_count=len(results) if options.include_total_count else None,
)

@override
def _get_record_from_result(self, result: ScoredPoint | QueryResponse) -> Any:
return result

@override
def _get_score_from_result(self, result: ScoredPoint) -> float:
return result.score

def _create_filter(self, options: VectorSearchOptions) -> Filter:
return Filter(
must=[
FieldCondition(key=filter.field_name, match=MatchAny(any=filter.value))
for filter in options.filter.filters
]
)

@override
def _serialize_dicts_to_store_models(
self,
Expand All @@ -183,15 +242,17 @@ def _serialize_dicts_to_store_models(
@override
def _deserialize_store_models_to_dicts(
self,
records: Sequence[PointStruct],
records: Sequence[PointStruct] | Sequence[ScoredPoint],
**kwargs: Any,
) -> Sequence[dict[str, Any]]:
return [
{
self._key_field_name: record.id,
**(record.payload if record.payload else {}),
**(
record.vector
{}
if not record.vector
else record.vector
if isinstance(record.vector, dict)
else {self.data_model_definition.vector_field_names[0]: record.vector}
),
Expand Down
7 changes: 5 additions & 2 deletions python/semantic_kernel/data/vector_search/vector_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
from abc import abstractmethod
from collections.abc import AsyncIterable
from collections.abc import AsyncIterable, Sequence
from typing import Any, Generic, TypeVar

from semantic_kernel.data.kernel_search_results import KernelSearchResults
Expand All @@ -11,6 +11,7 @@
from semantic_kernel.data.vector_search.vector_search_result import VectorSearchResult
from semantic_kernel.data.vector_storage.vector_store_record_collection import VectorStoreRecordCollection
from semantic_kernel.utils.experimental_decorator import experimental_class
from semantic_kernel.utils.list_handler import desync_list

TModel = TypeVar("TModel")
TKey = TypeVar("TKey")
Expand Down Expand Up @@ -100,8 +101,10 @@ def _get_score_from_result(self, result: Any) -> float | None:
# region: New methods

async def _get_vector_search_results_from_results(
self, results: AsyncIterable[Any]
self, results: AsyncIterable[Any] | Sequence[Any]
) -> AsyncIterable[VectorSearchResult[TModel]]:
if isinstance(results, Sequence):
results = desync_list(results)
async for result in results:
record = self.deserialize(self._get_record_from_result(result))
score = self._get_score_from_result(result)
Expand Down
13 changes: 13 additions & 0 deletions python/semantic_kernel/utils/list_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) Microsoft. All rights reserved.


from collections.abc import AsyncIterable, Sequence
from typing import TypeVar

_T = TypeVar("_T")


async def desync_list(sync_list: Sequence[_T]) -> AsyncIterable[_T]: # noqa: RUF029
"""De synchronize a list of synchronous objects."""
for x in sync_list:
yield x
Loading