Install from PyPi, as ravendb.
pip install ravendb
Python client API (v5.2) for RavenDB , a NoSQL document database.
Although new API isn't compatible with the previous one, it comes with many improvements and new features.
Package has been reworked to match Java and other RavenDB clients
Type-hinted entire project and API results - using the API is now much more comfortable with IntelliSense
-
All client versions 5.2.x are fully compatible with and support RavenDB server releases 5.4 and 6.0.
-
Click here to view all Releases and Changelog.
- Changes available in the releases section.
- Bulk insert dependencies bugfix
- Counters
- Counters indexes
- New feature - Bulk insert
- Bugfixes - Cluster-wide operations (here)
- Bugfixes - Serialization while loading/querying (here)
-
Subscriptions
- Document streams
- Secured subscriptions
-
Querying
- Major bugfixes
- Spatial querying and indexing
- Highlighting fixes
- Custom document parsers & loaders
-
New features
- Conditional Load
- SelectFields & Facets
- Projections
- MoreLikeThis
- Suggestions
-
Improvements
- Compare exchange
- Querying
- DocumentConventions
- Patching
- Spatial queries
- Aggregations
-
Lazy Operations
- Lazy loading
- Lazy querying
- Lazy compare exchange operations
-
Structure
- Important classes are now available to import from the top level
ravendb
module
- Important classes are now available to import from the top level
...and many bugfixes
-
Querying
- Simpler, well type hinted querying
- Group by, aggregations
- Spatial querying
- Boost, fuzzy, proximity
- Subclauses support
-
Static Indexes
- Store fields, index fields, pick analyzers & more using
AbstractIndexCreationTask
- Full indexes CRUD
- Index related commands (priority, erros, start/stop, pause, lock)
- Additional assemblies, map-reduce, index query with results "of_type"
- Store fields, index fields, pick analyzers & more using
-
CRUD
- Type hints for results and includes
- Support for dataclasses
-
Attachments
- New attachments API
- Better type hints
-
HTTPS
- Support for https connection
- Certificates CRUD operations
-
Lazy load
- New feature
-
Cluster Transactions, Compare Exchange
- New feature
- Time Series
- Replication & ETL Commands
- Streaming (ready, will be merged on v5.4 - #168)
-
This readme provides short examples for the following:
Getting started,
Crud example,
Query documents,
Attachments,
Changes API,
Suggestions,
Patching,
Subscriptions,
Counters,
Bulk Insert,
Using classes,
Working with secure server,
Building & running tests -
For more information go to the online RavenDB Documentation.
- Import the
DocumentStore
class from the ravendb module
from ravendb import DocumentStore
- Initialize the document store (you should have a single DocumentStore instance per application)
store = DocumentStore('http://live-test.ravendb.net', 'databaseName')
store.initialize()
- Open a session
with store.open_session() as session:
- Call
save_changes()
when you're done
user = session.load('users/1-A')
user.name = "Gracjan"
session.save_changes()
# Data is now persisted
# You can proceed e.g. finish web request
product = Product(
Id=None,
title='iPhone 14 Pro Max',
price=1199.99,
currency='USD',
storage=256,
manufacturer='Apple',
in_stock=True,
last_update=datetime.datetime.now(),
)
session.store(product, 'products/1-A')
print(product.Id) # products/1-A
session.save_changes()
store()
ID generation - session.store()
store document with @metadata
storing docs with same ID in same session should throw
product = session.load('products/1-A')
print(product.title) # iPhone 14 Pro Max
print(product.Id) # products/1-A
# users/1
# {
# "name": "John",
# "kids": ["users/2", "users/3"]
# }
session = store.open_session()
user1 = session.include("kids").load("users/1")
# Document users/1 and all docs referenced in "kids"
# will be fetched from the server in a single request.
user2 = session.load("users/2")
# this won't call server again
assert(user1 is not None)
assert(user2 is not None)
assert(session.advanced.number_of_requests == 1)
import datetime
product = session.load('products/1-A')
product.in_stock = False
product.last_update = datetime.datetime.now()
session.save_changes()
# ...
product = session.load('products/1-A')
print(product.in_stock) # false
print(product.last_update) # the current date
- Using entity
product = session.load('products/1-A')
session.delete(product)
session.save_changes()
product = session.load('products/1-A')
print(product) # None
- Using document ID
session.delete('products/1-A')
delete doc by entity
delete doc by ID
cannot delete after change
loading deleted doc returns null
- Use
query()
session method:
Query by collection:
import datetime
from ravendb import DocumentStore, QueryOperator
store = DocumentStore()
store.initialize()
session = store.open_session()
session.query_collection(str()).not_()
Query by index name:
query = session.query_index("productsByCategory")
Query by index:
query = session.query_index_type(Product_ByName, Product) # the second argument (object_type) is optional, as always
Query by entity type:
query = session.query(object_type=User) # object_type is an optional argument, but we can call this method only with it
- Build up the query - apply search conditions, set ordering, etc.
Query supports chaining calls:
query = session.query_collection("Users")
.wait_for_non_stale_results()
.using_default_operator(QueryOperator.AND)
.where_equals("manufacturer", "Apple")
.where_equals("in_stock", True)
.where_between("last_update", datetime.datetime(2022,11,1), datetime.datetime.now()).order_by("price")
- Execute the query to get results:
results = list(query) # get all results
# ...
first_result = query.first() # gets first result
# ...
single = query.single() # gets single result
# RQL
# from users select name
# Query
class UserProj:
def __init__(self, name: str = None, age: int = None):
self.name = name
self.age = age
user_names = [user_proj.name for user_proj in session.query_collection("Users").select_fields(UserProj, "name")]
# Sample results
# John, Stefanie, Thomas
# RQL
# from users select name, age
# Query
results = list(session.query_collection("Users").select_fields(UserProj, "name", "age"))
# Sample results
# [ { name: 'John', age: 30 },
# { name: 'Stefanie', age: 25 },
# { name: 'Thomas', age: 25 } ]
query with projections (query only two fields)
can_project_id_field
# RQL
# from users select distinct age
# Query
[user_proj.age for user_proj in session.query_collection("Users").select_fields(UserProj, "age").distinct()]
# Sample results
# [ 30, 25 ]
# RQL
# from users where age = 30
# Query
list(session.query_collection("Users").where_equals("age", 30))
# Saple results
# [ User {
# name: 'John',
# age: 30,
# kids: [...],
# registered_at: 2017-11-10T23:00:00.000Z } ]
# RQL
# from users where name in ("John", "Thomas")
# Query
list(session.query_collection("Users").where_in("name", ["John", "Thomas"]))
# Sample results
# [ User {
# name: 'John',
# age: 30,
# registered_at: 2017-11-10T23:00:00.000Z,
# kids: [...],
# id: 'users/1-A' },
# User {
# name: 'Thomas',
# age: 25,
# registered_at: 2016-04-24T22:00:00.000Z,
# id: 'users/3-A' } ]
# RQL
# from users where startsWith(name, 'J')
# Query
list(session.query_collection("Users").where_starts_with("name", "J"))
# Sample results
# [ User {
# name: 'John',
# age: 30,
# kids: [...],
# registered_at: 2017-11-10T23:00:00.000Z } ]
# RQL
# from users where registeredAt between '2016-01-01' and '2017-01-01'
# Query
import datetime
list(session.query_collection("Users").where_between("registered_at", datetime.datetime(2016, 1, 1), datetime.datetime(2017,1,1)))
# Sample results
# [ User {
# name: 'Thomas',
# age: 25,
# registered_at: 2016-04-24T22:00:00.000Z,
# id: 'users/3-A' } ]
where_greater_than() / where_greater_than_or_equal() / where_less_than() / where_less_than_or_equal()
# RQL
# from users where age > 29
# Query
list(session.query_collection("Users").where_greater_than("age", 29))
# Sample results
# [ User {
# name: 'John',
# age: 30,
# registered_at: 2017-11-10T23:00:00.000Z,
# kids: [...],
# id: 'users/1-A' } ]
query with where less than
query with where less than or equal
query with where greater than
query with where greater than or equal
Checks if the field exists.
# RQL
# from users where exists("age")
# Query
session.query_collection("Users").where_exists("kids")
# Sample results
# [ User {
# name: 'John',
# age: 30,
# registered_at: 2017-11-10T23:00:00.000Z,
# kids: [...],
# id: 'users/1-A' } ]
# RQL
# from users where kids in ('Mara')
# Query
list(session.query_collection("Users").contains_all("kids", ["Mara", "Dmitri"]))
# Sample results
# [ User {
# name: 'John',
# age: 30,
# registered_at: 2017-11-10T23:00:00.000Z,
# kids: ["Dmitri", "Mara"]
# id: 'users/1-A' } ]
Perform full-text search.
# RQL
# from users where search(kids, 'Mara')
# Query
list(session.query_collection("Users").search("kids", "Mara Dmitri"))
# Sample results
# [ User {
# name: 'John',
# age: 30,
# registered_at: 2017-11-10T23:00:00.000Z,
# kids: ["Dmitri", "Mara"]
# id: 'users/1-A' } ]
# RQL
# from users where exists(kids) or (age = 25 and name != Thomas)
# Query
list(session.query_collection("Users").where_exists("kids").or_else()
.open_subclause()
.where_equals("age", 25)
.where_not_equals("name", "Thomas")
.close_subclause())
# Sample results
# [ User {
# name: 'John',
# age: 30,
# registered_at: 2017-11-10T23:00:00.000Z,
# kids: ["Dmitri", "Mara"]
# id: 'users/1-A' },
# User {
# name: 'Stefanie',
# age: 25,
# registered_at: 2015-07-29T22:00:00.000Z,
# id: 'users/2-A' } ]
# RQL
# from users where age != 25
# Query
list(session.query_collection("Users").not_().where_equals("age", 25))
# Sample results
# [ User {
# name: 'John',
# age: 30,
# registered_at: 2017-11-10T23:00:00.000Z,
# kids: ["Dmitri", "Mara"]
# id: 'users/1-A' } ]
# RQL
# from users where exists(kids) or age < 30
# Query
list(session.query_collection("Users")
.where_exists("kids")
.or_else()
.where_less_than("age", 30))
# Sample results
# [ User {
# name: 'John',
# age: 30,
# registered_at: 2017-11-10T23:00:00.000Z,
# kids: [ 'Dmitri', 'Mara' ],
# id: 'users/1-A' },
# User {
# name: 'Thomas',
# age: 25,
# registered_at: 2016-04-24T22:00:00.000Z,
# id: 'users/3-A' },
# User {
# name: 'Stefanie',
# age: 25,
# registered_at: 2015-07-29T22:00:00.000Z,
# id: 'users/2-A' } ]
If neither and_also()
nor or_else()
is called then the default operator between the query filtering conditions will be AND
.
You can override that with using_default_operator
which must be called before any other where conditions.
# RQL
# from users where exists(kids) or age < 29
# Query
from ravendb import QueryOperator
list(session.query_collection("Users")
.using_default_operator(QueryOperator.OR) # override the default 'AND' operator
.where_exists("kids")
.where_less_than("age", 29))
# Sample results
# [ User {
# name: 'John',
# age: 30,
# registered_at: 2017-11-10T23:00:00.000Z,
# kids: [ 'Dmitri', 'Mara' ],
# id: 'users/1-A' },
# User {
# name: 'Thomas',
# age: 25,
# registered_at: 2016-04-24T22:00:00.000Z,
# id: 'users/3-A' },
# User {
# name: 'Stefanie',
# age: 25,
# registered_at: 2015-07-29T22:00:00.000Z,
# id: 'users/2-A' } ]
# RQL
# from users order by age
# Query
list(session.query_collection("Users").order_by("age"))
# Sample results
# [ User {
# name: 'Stefanie',
# age: 25,
# registered_at: 2015-07-29T22:00:00.000Z,
# id: 'users/2-A' },
# User {
# name: 'Thomas',
# age: 25,
# registered_at: 2016-04-24T22:00:00.000Z,
# id: 'users/3-A' },
# User {
# name: 'John',
# age: 30,
# registered_at: 2017-11-10T23:00:00.000Z,
# kids: [ 'Dmitri', 'Mara' ],
# id: 'users/1-A' } ]
order_by()
order_by_desc()
query random order
order by AlphaNumeric
query with boost - order by score
Limit the number of query results.
# RQL
# from users order by age
# Query
list(session.query_collection("Users")
.order_by("age")
.take(2)) # only the first 2 entries will be returned
# Sample results
# [ User {
# name: 'Stefanie',
# age: 25,
# registered_at: 2015-07-29T22:00:00.000Z,
# id: 'users/2-A' },
# User {
# name: 'Thomas',
# age: 25,
# registered_at: 2016-04-24T22:00:00.000Z,
# id: 'users/3-A' } ]
Skip a specified number of results from the start.
# RQL
# from users order by age
# Query
list(session.query_collection("Users").order_by("age")
.take(1) # return only 1 result
.skip(1)) # skip the first result, return the second result
# Sample results
# [ User {
# name: 'Thomas',
# age: 25,
# registered_at: 2016-04-24T22:00:00.000Z,
# id: 'users/3-A' } ]
Use the statistics()
method to obtain query statistics.
# Query
statistics: QueryStatistics = None
def __statistics_callback(stats: QueryStatistics):
nonlocal statistics
statistics = stats # plug-in the reference, value will be changed later
results = list(session.query_collection("Users")
.where_greater_than("age", 29)
.statistics(__statistics_callback))
# Sample results
# QueryStatistics {
# is_stale: false,
# duration_in_ms: 744,
# total_results: 1,
# skipped_results: 0,
# timestamp: 2018-09-24T05:34:15.260Z,
# index_name: 'Auto/users/Byage',
# index_timestamp: 2018-09-24T05:34:15.260Z,
# last_query_time: 2018-09-24T05:34:15.260Z,
# result_etag: 8426908718162809000 }
####) / first() / single() / count() )` - returns all results
first()
- first result only
single()
- first result, throws error if there's more entries
count()
- returns the number of entries in the results (not affected by take()
)
doc = User(name="John")
# Store a document, the entity will be tracked.
session.store(doc)
with open("photo.png", "rb+") as file:
session.advanced.attachments.store(doc, "photo.png", file.read(), "image/png")
# OR store attachment using document ID
session.advanced.attachments.store(doc.Id, "photo.png", file.read(), "image/png")
# Persist all changes
session.save_changes()
# Get an attachment
attachment = session.advanced.attachments.get(document_id, "photo.png")
# Attachment.details contains information about the attachment:
# {
# name: 'photo.png',
# document_id: 'users/1-A',
# content_type: 'image/png',
# hash: 'MvUEcrFHSVDts5ZQv2bQ3r9RwtynqnyJzIbNYzu1ZXk=',
# change_vector: '"A:3-K5TR36dafUC98AItzIa6ow"',
# size: 4579
# }
session.advanced.attachments.exists(doc.Id, "photo.png")
# True
session.advanced.attachments.exists(doc.Id, "not_there.avi")
# False
# Use a loaded entity to determine attachments' names
session.advanced.attachments.get_names(doc)
# Sample results:
# [ { name: 'photo.png',
# hash: 'MvUEcrFHSVDts5ZQv2bQ3r9RwtynqnyJzIbNYzu1ZXk=',
# content_type: 'image/png',
# size: 4579 } ]
Listen for database changes e.g. document changes.
# Subscribe to change notifications
changes = store.changes()
all_documents_changes = []
# Subscribe for all documents, or for specific collection (or other database items)
all_observer = self.store.changes().for_all_documents()
close_method = all_observer.subscribe_with_observer(ActionObserver(on_next=all_documents_changes.append))
all_observer.ensure_subscribe_now()
session = store.open_session()
session.store(User("Starlord"))
session.save_changes()
# ...
# Dispose the changes instance when you're done
close_method()
can obtain single document changes
can obtain all documents changes
can obtain notification about documents starting with
Suggest options for similar/misspelled terms
from ravendb.documents.indexes.definitions import FieldIndexing
from ravendb import AbstractIndexCreationTask
# Some documents in users collection with misspelled name term
# [ User {
# name: 'Johne',
# age: 30,
# ...
# id: 'users/1-A' },
# User {
# name: 'Johm',
# age: 31,
# ...
# id: 'users/2-A' },
# User {
# name: 'Jon',
# age: 32,
# ...
# id: 'users/3-A' },
# ]
# Static index definition
class UsersIndex(AbstractIndexCreationTask):
def __init__(self):
super().__init__()
self.map = "from u in docs.Users select new { u.name }"
# Enable the suggestion feature on index-field 'name'
self._index("name", FieldIndexing.SEARCH)
self._index_suggestions.add("name")
# ...
session = store.open_session()
# Query for similar terms to 'John'
# Note: the term 'John' itself will Not be part of the results
suggestedNameTerms = list(session.query_index_type(UsersIndex, User)
.suggest_using(lambda x: x.by_field("name", "John"))
.execute())
# Sample results:
# { name: { name: 'name', suggestions: [ 'johne', 'johm', 'jon' ] } }
can suggest
canChainSuggestions
canUseAliasInSuggestions
canUseSuggestionsWithAutoIndex
can suggest using linq
can suggest using multiple words
can get suggestions with options
# Increment 'age' field by 1
session.advanced.increment("users/1", "age", 1)
# Set 'underAge' field to false
session.advanced.patch("users/1", "underAge", False)
session.save_changes()
can patch
can patch complex
can add to array
can increment
patchWillUpdateTrackedDocumentAfterSaveChanges
can patch multiple documents
# Create a subscription task on the server
# Documents that match the query will be send to the client worker upon opening a connection
from ravendb import DocumentStore
from ravendb.documents.subscriptions.worker import SubscriptionBatch
from ravendb.documents.subscriptions.options import SubscriptionCreationOptions, SubscriptionWorkerOptions
store = DocumentStore("http://live-test.ravendb.net", "TestDatabase")
store.initialize()
subscription_name = store.subscriptions.create_for_options(SubscriptionCreationOptions(query="from users where age >= 30"))
# Open a connection
# Create a subscription worker that will consume document batches sent from the server
# Documents are sent from the last document that was processed for this subscription
with store.subscriptions.get_subscription_worker(SubscriptionWorkerOptions(subscription_name)) as subscription_worker:
def __callback(x: SubscriptionBatch):
# Process the incoming batch items
# Sample batch.items:
# [ Item {
# change_vector: 'A:2-r6nkF5nZtUKhcPEk6/LL+Q',
# id: 'users/1-A',
# raw_result:
# { name: 'John',
# age: 30,
# registered_at: '2017-11-11T00:00:00.0000000',
# kids: [Array],
# '@metadata': [Object],
# id: 'users/1-A' },
# rawMetadata:
# { '@collection': 'Users',
# '@nested-object-types': [Object],
# 'Raven-Node-Type': 'User',
# '@change-vector': 'A:2-r6nkF5nZtUKhcPEk6/LL+Q',
# '@id': 'users/1-A',
# '@last-modified': '2018-10-18T11:15:51.4882011Z' },
# exception_message: undefined } ]
# ...
def __exception_callback(ex: Exception):
# Handle exceptions here
subscription_worker.add_on_unexpected_subscription_error(__exception_callback)
subscription_worker.run(__callback)
can subscribe to index and document
should stream all documents
should send all new and modified docs
should respect max doc count in batch
can disable subscription
can delete subscription
There are many ways to play with counters. The most common path is to use session API (session.counters_for()
).
with store.open_session() as session:
user1 = User("Aviv1")
user2 = User("Aviv2")
session.store(user1, "users/1-A")
session.store(user2, "users/2-A")
session.save_changes()
# storing counters via session API
with store.open_session() as session:
session.counters_for("users/1-A").increment("likes", 100)
session.counters_for("users/1-A").increment("downloads", 500)
session.counters_for("users/2-A").increment("votes", 1000)
session.save_changes()
# alternatively, loading counters via GetCountersOperation
counters = store.operations.send(GetCountersOperation("users/1-A", ["likes", "downloads"])).counters
# loading counters via session API
with store.open_session() as session:
user1_likes = session.counters_for("users/1-A").get("likes")
# deleting counters via session API
with store.open_session() as session:
session.counters_for("users/1-A").delete("likes")
session.counters_for("users/1-A").delete("downloads")
session.counters_for("users/2-A").delete("votes")
session.save_changes()
counter_operation = DocumentCountersOperation(document_id="users/1-A", operations=[])
counter_operation.add_operations(
CounterOperation("Likes", counter_operation_type=CounterOperationType.INCREMENT, delta=4)
)
counter_operation.add_operations(
CounterOperation(
"Shares",
counter_operation_type=CounterOperationType.INCREMENT,
delta=422,
)
)
counter_operation.add_operations(CounterOperation("Likes", counter_operation_type=CounterOperationType.DELETE))
counter_batch = CounterBatch(documents=[counter_operation])
results = self.store.operations.send(CounterBatchOperation(counter_batch))
incrementing counters
document counters operation
including counters
counters indexes
Bulk insert is the efficient way to store a large amount of documents.
For example:
foo_bar1 = FooBar("John Doe")
foo_bar2 = FooBar("Jane Doe")
foo_bar3 = FooBar("John")
foo_bar4 = FooBar("Jane")
with store.bulk_insert() as bulk_insert:
bulk_insert.store(foo_bar1)
bulk_insert.store(foo_bar2)
bulk_insert.store_as(foo_bar3, "foobars/66")
bulk_insert.store_as(foo_bar4, "foobars/99", MetadataAsDictionary({"some_metadata_value": 75}))
The 3rd insert will store document named "foobars/66".
The 4th insert will store document with custom name and extra metadata.
- Define your model as class.
import datetime
class Product:
def __init__(self, Id: str = None, title: str = None, price: int = None, currency: str = None, storage: int = None,
manufacturer: str = None, in_stock: bool = False, last_update: datetime.datetime = None):
self.Id = Id
self.title = title
self.price = price
self.currency = currency
self.storage = storage
self.manufacturer = manufacturer
self.in_stock = in_stock
self.last_update = last_update
- To store a document pass its instance to
store()
.
The collection name will automatically be detected from the entity's class name.
import datetime
from models import Product
product = Product(None, 'iPhone X', 999.99, 'USD', 64, 'Apple', True, datetime.datetime(2017,10,1))
product = session.store(product)
print(isinstance(product, Product)) # True
print('products/' in product.Id) # True
session.save_changes()
- Loading a document
product = session.load('products/1-A')
print(isinstance(product, Product)) # True
print(product.Id) # products/1-A
- Querying for documents
products = list(session.query_collection("Products"))
for product in products:
print(isinstance(product, Product)) # True
print("products/" in product.Id) # True
P.S Python client does support dataclasses
from ravendb import DocumentStore
URLS = ["https://raven.server.url"]
DB_NAME = "SecuredDemo"
CERT_PATH = "path\\to\\cert.pem"
class User:
def __init__(self, name: str, tag: str):
self.name = name
self.tag = tag
store = DocumentStore(URLS, DB_NAME)
store.certificate_pem_path = CERT_PATH
store.initialize()
user = User("Gracjan", "Admin")
with store.open_session() as session:
session.store(user, "users/1")
session.save_changes()
with store.open_session() as session:
user = session.load("users/1", User)
assert user.name == "Gracjan"
assert user.tag == "Admin"
# To run the suite, set the following environment variables:
#
# - Location of RavenDB server binary:
# RAVENDB_TEST_SERVER_PATH="C:\\work\\test\\Server\\Raven.Server.exe"
#
# - Certificates paths for tests requiring a secure server:
# RAVENDB_TEST_SERVER_CERTIFICATE_PATH="C:\\work\\test\\cluster.server.certificate.pfx"
# RAVENDB_TEST_CLIENT_CERTIFICATE_PATH="C:\\work\\test\\python.pem"
# RAVENDB_TEST_CA_PATH="C:\\work\\test\\ca.crt"
#
# - Certificate hostname:
# RAVENDB_TEST_HTTPS_SERVER_URL="https://a.nodejstest.development.run:7326"
#
python -m unittest discover
https://ravendb.net/docs/article-page/5.3/python