Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Azure Cosmos DB NoSQL Vector Store & Collection implementation #9296

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6eefa06
Init: Azure Cosmos DB NoSQL Vector Collection Impl
TaoChenOSU Oct 14, 2024
4855c3c
Add serialize and deserialize implementations
TaoChenOSU Oct 15, 2024
e1dcc8f
Cosmos DB NoSQL store
TaoChenOSU Oct 15, 2024
8501dfb
WIP: Integration tests
TaoChenOSU Oct 16, 2024
3398ad8
DONE: Integration test
TaoChenOSU Oct 16, 2024
a5990c3
Merge branch 'main' into local-branch-cosmos-db-no-sql-vector-impl
TaoChenOSU Oct 23, 2024
d69bd94
Merge branch 'main' into taochen/python-azure-cosmos-db-nosql-vector-…
TaoChenOSU Oct 24, 2024
d215aa2
Add integration tests
TaoChenOSU Oct 25, 2024
be33681
Unit tests; Next: more integration tests for Cosmos DB NoSQL specific…
TaoChenOSU Oct 26, 2024
048c9e8
Complete integration tests; next: workflow file update
TaoChenOSU Oct 28, 2024
1e4529e
Workflow
TaoChenOSU Oct 28, 2024
2dac177
Merge branch 'main' into taochen/python-azure-cosmos-db-nosql-vector-…
TaoChenOSU Oct 28, 2024
1f3b6de
Sample
TaoChenOSU Oct 28, 2024
a0896be
Fix unit test
TaoChenOSU Oct 28, 2024
49039f2
fix unit test 2
TaoChenOSU Oct 28, 2024
16ce4a7
Address comments
TaoChenOSU Oct 29, 2024
bee5b8a
Restructure cosmos db module
TaoChenOSU Oct 29, 2024
d4f4488
Handle cosmos client lifetime
TaoChenOSU Oct 30, 2024
d4e591f
Fix unit test 3
TaoChenOSU Oct 31, 2024
a8693cb
Merge branch 'main' into taochen/python-azure-cosmos-db-nosql-vector-…
TaoChenOSU Nov 8, 2024
d68f0b8
Fix distance function
TaoChenOSU Nov 8, 2024
96875cf
Fix distance function 2
TaoChenOSU Nov 8, 2024
eca01ee
Fix unit tests
TaoChenOSU Nov 8, 2024
6f421d1
Fix integration tests
TaoChenOSU Nov 8, 2024
f4df5b9
Address comments 1
TaoChenOSU Nov 8, 2024
530b0ca
Address comments 2
TaoChenOSU Nov 9, 2024
93bd96f
Address comments 3
TaoChenOSU Nov 9, 2024
c1c8eaa
Various fixes
TaoChenOSU Nov 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion .github/workflows/python-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ jobs:
VERTEX_AI_GEMINI_MODEL_ID: ${{ vars.VERTEX_AI_GEMINI_MODEL_ID }}
VERTEX_AI_EMBEDDING_MODEL_ID: ${{ vars.VERTEX_AI_EMBEDDING_MODEL_ID }}
REDIS_CONNECTION_STRING: ${{ vars.REDIS_CONNECTION_STRING }}
COSMOS_DB_NOSQL_URL: ${{ vars.COSMOS_DB_NOSQL_URL }}
COSMOS_DB_NOSQL_KEY: ${{ secrets.COSMOS_DB_NOSQL_KEY }}
steps:
- uses: actions/checkout@v4
- name: Set up uv
Expand Down Expand Up @@ -150,6 +152,12 @@ jobs:
run: docker run -d --name redis-stack-server -p 6379:6379 redis/redis-stack-server:latest
- name: Setup Weaviate docker deployment
run: docker run -d -p 8080:8080 -p 50051:50051 cr.weaviate.io/semitechnologies/weaviate:1.26.6
- name: Start Azure Cosmos DB emulator
if: matrix.os == 'windows-latest'
run: |
Write-Host "Launching Cosmos DB Emulator"
Import-Module "$env:ProgramFiles\Azure Cosmos DB Emulator\PSModules\Microsoft.Azure.CosmosDB.Emulator"
Start-CosmosDbEmulator
- name: Azure CLI Login
if: github.event_name != 'pull_request'
uses: azure/login@v2
Expand Down Expand Up @@ -255,6 +263,8 @@ jobs:
VERTEX_AI_GEMINI_MODEL_ID: ${{ vars.VERTEX_AI_GEMINI_MODEL_ID }}
VERTEX_AI_EMBEDDING_MODEL_ID: ${{ vars.VERTEX_AI_EMBEDDING_MODEL_ID }}
REDIS_CONNECTION_STRING: ${{ vars.REDIS_CONNECTION_STRING }}
COSMOS_DB_NOSQL_URL: ${{ vars.COSMOS_DB_NOSQL_URL }}
COSMOS_DB_NOSQL_KEY: ${{ secrets.COSMOS_DB_NOSQL_KEY }}
steps:
- uses: actions/checkout@v4
- name: Set up uv
Expand Down Expand Up @@ -305,6 +315,12 @@ jobs:
run: docker run -d --name redis-stack-server -p 6379:6379 redis/redis-stack-server:latest
- name: Setup Weaviate docker deployment
run: docker run -d -p 8080:8080 -p 50051:50051 cr.weaviate.io/semitechnologies/weaviate:1.26.6
- name: Start Azure Cosmos DB emulator
if: matrix.os == 'windows-latest'
run: |
Write-Host "Launching Cosmos DB Emulator"
Import-Module "$env:ProgramFiles\Azure Cosmos DB Emulator\PSModules\Microsoft.Azure.CosmosDB.Emulator"
Start-CosmosDbEmulator
TaoChenOSU marked this conversation as resolved.
Show resolved Hide resolved
- name: Azure CLI Login
if: github.event_name != 'pull_request'
uses: azure/login@v2
Expand Down Expand Up @@ -418,4 +434,4 @@ jobs:
dry_run: ${{ env.run_type != 'Daily' && env.run_type != 'Manual'}}
job: ${{ toJson(job) }}
steps: ${{ toJson(steps) }}
overwrite: "{title: ` ${{ env.run_type }}: ${{ env.date }} `, text: ` ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}`}"
overwrite: "{title: ` ${{ env.run_type }}: ${{ env.date }} `, text: ` ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}`}"
1 change: 1 addition & 0 deletions python/.cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"mongocluster",
"ndarray",
"nopep",
"NOSQL",
"ollama",
"onyourdatatest",
"OPENAI",
Expand Down
171 changes: 97 additions & 74 deletions python/samples/concepts/memory/new_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from semantic_kernel.connectors.ai.open_ai import OpenAIEmbeddingPromptExecutionSettings, OpenAITextEmbedding
from semantic_kernel.connectors.ai.open_ai.services.azure_text_embedding import AzureTextEmbedding
from semantic_kernel.connectors.memory.azure_ai_search import AzureAISearchCollection
from semantic_kernel.connectors.memory.azure_cosmos_db.azure_cosmos_db_no_sql_collection import (
AzureCosmosDBNoSQLCollection,
)
from semantic_kernel.connectors.memory.in_memory import InMemoryVectorCollection
from semantic_kernel.connectors.memory.postgres.postgres_collection import PostgresCollection
from semantic_kernel.connectors.memory.qdrant import QdrantCollection
Expand All @@ -25,55 +28,64 @@
VectorStoreRecordVectorField,
vectorstoremodel,
)


@vectorstoremodel
@dataclass
class MyDataModelArray:
vector: Annotated[
np.ndarray | None,
VectorStoreRecordVectorField(
embedding_settings={"embedding": OpenAIEmbeddingPromptExecutionSettings(dimensions=1536)},
index_kind="hnsw",
dimensions=1536,
distance_function="cosine_similarity",
property_type="float",
serialize_function=np.ndarray.tolist,
deserialize_function=np.array,
),
] = None
other: str | None = None
id: Annotated[str, VectorStoreRecordKeyField()] = field(default_factory=lambda: str(uuid4()))
content: Annotated[
str, VectorStoreRecordDataField(has_embedding=True, embedding_property_name="vector", property_type="str")
] = "content1"


@vectorstoremodel
@dataclass
class MyDataModelList:
vector: Annotated[
list[float] | None,
VectorStoreRecordVectorField(
embedding_settings={"embedding": OpenAIEmbeddingPromptExecutionSettings(dimensions=1536)},
index_kind="hnsw",
dimensions=1536,
distance_function="cosine_similarity",
property_type="float",
),
] = None
other: str | None = None
id: Annotated[str, VectorStoreRecordKeyField()] = field(default_factory=lambda: str(uuid4()))
content: Annotated[
str, VectorStoreRecordDataField(has_embedding=True, embedding_property_name="vector", property_type="str")
] = "content1"
from semantic_kernel.data.const import DistanceFunction, IndexKind


def get_data_model_array(index_kind: IndexKind, distance_function: DistanceFunction) -> type:
@vectorstoremodel
@dataclass
class DataModelArray:
vector: Annotated[
np.ndarray | None,
VectorStoreRecordVectorField(
embedding_settings={"embedding": OpenAIEmbeddingPromptExecutionSettings(dimensions=1536)},
index_kind=index_kind,
dimensions=1536,
distance_function=distance_function,
property_type="float",
serialize_function=np.ndarray.tolist,
deserialize_function=np.array,
),
] = None
other: str | None = None
id: Annotated[str, VectorStoreRecordKeyField()] = field(default_factory=lambda: str(uuid4()))
content: Annotated[
str, VectorStoreRecordDataField(has_embedding=True, embedding_property_name="vector", property_type="str")
] = "content1"

return DataModelArray


def get_data_model_list(index_kind: IndexKind, distance_function: DistanceFunction) -> type:
@vectorstoremodel
@dataclass
class DataModelList:
vector: Annotated[
list[float] | None,
VectorStoreRecordVectorField(
embedding_settings={"embedding": OpenAIEmbeddingPromptExecutionSettings(dimensions=1536)},
index_kind=index_kind,
dimensions=1536,
distance_function=distance_function,
property_type="float",
),
] = None
other: str | None = None
id: Annotated[str, VectorStoreRecordKeyField()] = field(default_factory=lambda: str(uuid4()))
content: Annotated[
str, VectorStoreRecordDataField(has_embedding=True, embedding_property_name="vector", property_type="str")
] = "content1"

return DataModelList


collection_name = "test"
MyDataModel = MyDataModelArray
# Depending on the vector database, the index kind and distance function may need to be adjusted,
# since not all combinations are supported by all databases.
DataModel = get_data_model_array(IndexKind.HNSW, DistanceFunction.COSINE)

# A list of VectorStoreRecordCollection that can be used.
# Available stores are:
# Available collections are:
# - ai_search: Azure AI Search
# - postgres: PostgreSQL
# - redis_json: Redis JSON
Expand All @@ -83,63 +95,74 @@ class MyDataModelList:
# - weaviate: Weaviate
# Please either configure the weaviate settings via environment variables or provide them through the constructor.
# Note that embed mode is not supported on Windows: https://github.com/weaviate/weaviate/issues/3315
#
# This is represented as a mapping from the store name to a
# function which returns the store.
# Using a function allows for lazy initialization of the store,
# so that settings for unused stores do not cause validation errors.
stores: dict[str, Callable[[], VectorStoreRecordCollection]] = {
"ai_search": lambda: AzureAISearchCollection[MyDataModel](
data_model_type=MyDataModel,
# - azure_cosmos_nosql: Azure Cosmos NoSQL
# https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/how-to-create-account?tabs=azure-portal
# Please see the link above to learn how to set up an Azure Cosmos NoSQL account.
# https://learn.microsoft.com/en-us/azure/cosmos-db/how-to-develop-emulator?tabs=windows%2Cpython&pivots=api-nosql
# Please see the link above to learn how to set up the Azure Cosmos NoSQL emulator on your machine.
TaoChenOSU marked this conversation as resolved.
Show resolved Hide resolved
# For this sample to work with Azure Cosmos NoSQL, please adjust the index_kind of the data model to QUANTIZED_FLAT.
# This is represented as a mapping from the collection name to a
# function which returns the collection.
# Using a function allows for lazy initialization of the collection,
# so that settings for unused collections do not cause validation errors.
collections: dict[str, Callable[[], VectorStoreRecordCollection]] = {
"ai_search": lambda: AzureAISearchCollection[DataModel](
data_model_type=DataModel,
),
"postgres": lambda: PostgresCollection[str, MyDataModel](
data_model_type=MyDataModel,
"postgres": lambda: PostgresCollection[str, DataModel](
data_model_type=DataModel,
collection_name=collection_name,
),
"redis_json": lambda: RedisJsonCollection[MyDataModel](
data_model_type=MyDataModel,
"redis_json": lambda: RedisJsonCollection[DataModel](
data_model_type=DataModel,
collection_name=collection_name,
prefix_collection_name_to_key_names=True,
),
"redis_hashset": lambda: RedisHashsetCollection[MyDataModel](
data_model_type=MyDataModel,
"redis_hashset": lambda: RedisHashsetCollection[DataModel](
data_model_type=DataModel,
collection_name=collection_name,
prefix_collection_name_to_key_names=True,
),
"qdrant": lambda: QdrantCollection[MyDataModel](
data_model_type=MyDataModel, collection_name=collection_name, prefer_grpc=True, named_vectors=False
"qdrant": lambda: QdrantCollection[DataModel](
data_model_type=DataModel, collection_name=collection_name, prefer_grpc=True, named_vectors=False
),
"in_memory": lambda: InMemoryVectorCollection[DataModel](
data_model_type=DataModel,
collection_name=collection_name,
),
"in_memory": lambda: InMemoryVectorCollection[MyDataModel](
data_model_type=MyDataModel,
"weaviate": lambda: WeaviateCollection[DataModel](
data_model_type=DataModel,
collection_name=collection_name,
),
"weaviate": lambda: WeaviateCollection[MyDataModel](
data_model_type=MyDataModel,
"azure_cosmos_nosql": lambda: AzureCosmosDBNoSQLCollection(
data_model_type=DataModel,
database_name="sample_database",
collection_name=collection_name,
create_database=True,
),
}


async def main(store: str, use_azure_openai: bool, embedding_model: str):
async def main(collection: str, use_azure_openai: bool, embedding_model: str):
kernel = Kernel()
service_id = "embedding"
if use_azure_openai:
kernel.add_service(AzureTextEmbedding(service_id=service_id, deployment_name=embedding_model))
else:
kernel.add_service(OpenAITextEmbedding(service_id=service_id, ai_model_id=embedding_model))
async with stores[store]() as record_store:
await record_store.create_collection_if_not_exists()
async with collections[collection]() as record_collection:
await record_collection.create_collection_if_not_exists()

record1 = MyDataModel(content="My text", id="e6103c03-487f-4d7d-9c23-4723651c17f4")
record2 = MyDataModel(content="My other text", id="09caec77-f7e1-466a-bcec-f1d51c5b15be")
record1 = DataModel(content="My text", id="e6103c03-487f-4d7d-9c23-4723651c17f4")
record2 = DataModel(content="My other text", id="09caec77-f7e1-466a-bcec-f1d51c5b15be")

records = await VectorStoreRecordUtils(kernel).add_vector_to_records(
[record1, record2], data_model_type=MyDataModel
[record1, record2], data_model_type=DataModel
)
keys = await record_store.upsert_batch(records)
keys = await record_collection.upsert_batch(records)
print(f"upserted {keys=}")

results = await record_store.get_batch([record1.id, record2.id])
results = await record_collection.get_batch([record1.id, record2.id])
if results:
for result in results:
print(f"found {result.id=}")
Expand All @@ -156,7 +179,7 @@ async def main(store: str, use_azure_openai: bool, embedding_model: str):
argparse.ArgumentParser()

parser = argparse.ArgumentParser()
parser.add_argument("--store", default="in_memory", choices=stores.keys(), help="What store to use.")
parser.add_argument("--collection", default="in_memory", choices=collections.keys(), help="What collection to use.")
# Option of whether to use OpenAI or Azure OpenAI.
parser.add_argument("--use-azure-openai", action="store_true", help="Use Azure OpenAI instead of OpenAI.")
# Model
Expand All @@ -165,4 +188,4 @@ async def main(store: str, use_azure_openai: bool, embedding_model: str):
)
args = parser.parse_args()

asyncio.run(main(store=args.store, use_azure_openai=args.use_azure_openai, embedding_model=args.model))
asyncio.run(main(collection=args.collection, use_azure_openai=args.use_azure_openai, embedding_model=args.model))
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright (c) Microsoft. All rights reserved.

from azure.cosmos.aio import ContainerProxy, CosmosClient, DatabaseProxy
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from pydantic import ValidationError

from semantic_kernel.connectors.memory.azure_cosmos_db.azure_cosmos_db_no_sql_settings import AzureCosmosDBNoSQLSettings
from semantic_kernel.connectors.memory.azure_cosmos_db.utils import CosmosClientWrapper, DefaultAzureCredentialWrapper
from semantic_kernel.exceptions.memory_connector_exceptions import (
MemoryConnectorInitializationError,
MemoryConnectorResourceNotFound,
)
from semantic_kernel.kernel_pydantic import KernelBaseModel
from semantic_kernel.utils.experimental_decorator import experimental_class


@experimental_class
class AzureCosmosDBNoSQLBase(KernelBaseModel):
"""An Azure Cosmos DB NoSQL collection stores documents in a Azure Cosmos DB NoSQL account."""

cosmos_client: CosmosClient
database_name: str
# If create_database is True, the database will be created
# if it does not exist when an operation requires a database.
create_database: bool

def __init__(
self,
database_name: str,
url: str | None = None,
key: str | None = None,
cosmos_client: CosmosClient | None = None,
create_database: bool = False,
**kwargs,
):
"""Initialize the AzureCosmosDBNoSQLBase.

Args:
database_name (str): The name of the database. The database may not exist yet.
If it does not exist, it will be created when the first collection is created.
url (str): The URL of the Azure Cosmos DB NoSQL account. Defaults to None.
key (str): The key of the Azure Cosmos DB NoSQL account. Defaults to None.
cosmos_client (CosmosClient): The custom Azure Cosmos DB NoSQL client whose lifetime is managed by the user.
Defaults to None.
create_database (bool): If True, the database will be created if it does not exist.
Defaults to False.
kwargs: Additional keyword arguments.
"""
try:
cosmos_db_nosql_settings = AzureCosmosDBNoSQLSettings.create(url=url, key=key)
except ValidationError as e:
raise MemoryConnectorInitializationError("Failed to validate Azure Cosmos DB NoSQL settings.") from e

if cosmos_client is None:
if cosmos_db_nosql_settings.key is not None:
cosmos_client = CosmosClientWrapper(
str(cosmos_db_nosql_settings.url), credential=cosmos_db_nosql_settings.key.get_secret_value()
)
else:
cosmos_client = CosmosClientWrapper(
str(cosmos_db_nosql_settings.url), credential=DefaultAzureCredentialWrapper()
)

super().__init__(
database_name=database_name,
cosmos_client=cosmos_client,
create_database=create_database,
**kwargs,
)

async def _does_database_exist(self) -> bool:
"""Checks if the database exists."""
try:
await self.cosmos_client.get_database_client(self.database_name).read()
return True
except CosmosResourceNotFoundError:
return False
except Exception as e:
raise MemoryConnectorResourceNotFound(f"Failed to check if database '{self.database_name}' exists.") from e

async def _get_database_proxy(self) -> DatabaseProxy:
"""Gets the database proxy."""
try:
if await self._does_database_exist():
return self.cosmos_client.get_database_client(self.database_name)

if self.create_database:
return await self.cosmos_client.create_database(self.database_name)
TaoChenOSU marked this conversation as resolved.
Show resolved Hide resolved
raise MemoryConnectorResourceNotFound(f"Database '{self.database_name}' does not exist.")
except Exception as e:
raise MemoryConnectorResourceNotFound(f"Failed to get database proxy for '{id}'.") from e

async def _get_container_proxy(self, container_name: str) -> ContainerProxy:
"""Gets the container proxy."""
try:
database_proxy = await self._get_database_proxy()
return database_proxy.get_container_client(container_name)
except Exception as e:
raise MemoryConnectorResourceNotFound(f"Failed to get container proxy for '{container_name}'.") from e
Loading
Loading