Subscribe, act, publish.
This app is intended to streamline integration with Metro for all Django+Celery users by:
- Asynchronous handling of subscriptions and messages with one command
- Execute Celery tasks based on message topics, defined in
settings.py
- Retry failed tasks through your admin dashboard when using the
MetroidTask
base
python
>= 3.10django
>= 4.2 - Forasgiref
, settingsdjango-guid
>= 3.2.0 - Storing correlation IDs for failed tasks in the database, making debugging easy- Choose one:
celery
>= 5.3.0 - Execute tasks based on a subjectdjango-rq
>= 2.4.1 - Execute tasks based on a subject
The python manage.py metroid
app is fully asynchronous, and has no blocking code. It utilizes Celery
to execute tasks.
It works by:
- Going through all your configured subscriptions and start a new async connection for each one of them
- Metro sends messages on the subscriptions
- This app filters out messages matching subjects you have defined, and queues a celery task to execute
the function as specified for that subject
3.1. If no task is found for that subject, the message is marked as complete - The message is marked as complete after the Celery task has successfully been queued
- If the task is failed, an entry is automatically created in your database
- All failed tasks can be retried manually through the admin dashboard
Note For a complete example, have a look in
demoproj/settings.py
.
- Create a
METROID
key insettings.py
with all your subscriptions and handlers. Example settings:
METROID = {
'subscriptions': [
{
'topic_name': 'metro-demo',
'subscription_name': 'sub-metrodemo-metrodemoerfett',
'connection_string': config('CONNECTION_STRING_METRO_DEMO', None),
'handlers': [
{
'subject': 'MetroDemo/Type/GeekJokes',
'regex': False,
'handler_function': 'demoproj.demoapp.services.my_func'
}
],
},
],
'worker_type': 'celery', # default
}
The handler_function
is defined by providing the full dotted path as a string. For example,from demoproj.demoapp.services import my_func
is provided as 'demoproj.demoapp.services.my_func'
.
The handlers subject can be a regular expression or a string. If a regular expression is provided, the variable regex must be set to True. Example:
'handlers': [{'subject': r'^MetroDemo/Type/.*$','regex':True,'handler_function': my_func}],
- Configure
Django-GUID
by adding the app to your installed apps, to your middlewares and configuring logging as described here. Make sure you enable theCeleryIntegration
:
from django_guid.integrations import CeleryIntegration
DJANGO_GUID = {
'INTEGRATIONS': [
CeleryIntegration(
use_django_logging=True,
log_parent=True,
)
],
}
Your functions will be called with keyword arguments for
message
, topic_name
, subscription_name
and subject
. You function should in other words
look something like this:
@app.task(base=MetroidTask)
def my_func(*, message: dict, topic_name: str, subscription_name: str, subject: str) -> None:
def my_func(*, message: dict, topic_name: str, subscription_name: str, subject: str) -> None:
- Ensure you have redis running:
docker-compose up
- Run migrations
python manage.py migrate
- Create an admin account
python manage.py createsuperuser
- Start a worker:
celery -A demoproj worker -l info
- Run the subscriber:
python manage.py metroid
- Send messages to Metro. Example code can be found in
demoproj/demoapp/services.py
- Run the webserver:
python manage.py runserver 8000
- See failed messages under
http://localhost:8080/admin
To contribute, please see CONTRIBUTING.md