Skip to content

Commit

Permalink
functioning prototype
Browse files Browse the repository at this point in the history
This is good enough to interact with it as a dev instance with a
slightly modified snipe.

WARNING: This commit reinitializes migrations. Not that anyone besides
me was using this before since it didn't really work. Old databases
should be discarded.
  • Loading branch information
asedeno committed Sep 13, 2020
1 parent 94ea66c commit ae6c4c1
Show file tree
Hide file tree
Showing 13 changed files with 660 additions and 168 deletions.
6 changes: 6 additions & 0 deletions roost_backend/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@

class RoostBackendConfig(AppConfig):
name = 'roost_backend'

def ready(self):
# pylint: disable=import-outside-toplevel, unused-import
# This is for side-effects of hooking up signals.
from . import signals # noqa: F401
super().ready()
130 changes: 104 additions & 26 deletions roost_backend/consumers.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
import logging

from asgiref.sync import async_to_sync
from channels.generic.websocket import JsonWebsocketConsumer
from djangorestframework_camel_case.util import camelize

from .authentication import JWTAuthentication
from . import filters, serializers, utils

_LOGGER = logging.getLogger(__name__)


class UserSocketConsumer(JsonWebsocketConsumer):
groups = ['broadcast']

class BadMessage(Exception):
pass

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = None
self.tails = {}

def connect(self):
self.accept()
self.active_tails = set()

def receive_json(self, content, **kwargs):
msg_type = content.get('type')
Expand All @@ -33,6 +32,9 @@ def receive_json(self, content, **kwargs):
self.close(code=4002)
return
self.user = user
async_to_sync(self.channel_layer.group_add)(
utils.principal_to_user_socket_group_name(user.principal),
self.channel_name)
self.send_json({'type': 'ready'})
return

Expand All @@ -56,40 +58,35 @@ def on_new_tail(self, content):
inclusive = content.get('inclusive', False)

if not all((isinstance(tail_id, int),
start is None or not isinstance(start, str),
start is None or isinstance(start, str),
isinstance(inclusive, bool))):
raise self.BadMessage()

if start is None:
start = 0
else:
# TODO: unseal message_id `start`
start = utils.unseal_message_id(start)
if inclusive:
# TODO: if `inclusive`, decrement `start`
pass
start -= 1

# TODO: construct filter from `content`
t_filter = None
# t_filter = Filter(content)
t_filter = filters.MessageFilter(**content)

if tail_id in self.tails:
# Roost frowned upon reusing tail ids in comments, and then closed the existing tail
# before clobbering it. Let's do the same.
_LOGGER.debug('User "%s" has reused tail id "%i".', self.user, tail_id)
self.tails[tail_id].close()

self.tails['tail_id'] = Tail(self, tail_id, start, t_filter)

raise NotImplementedError()
self.tails[tail_id] = Tail(self, tail_id, start, t_filter)

def on_extend_tail(self, content):
tail_id = content.get('id')
count = content.get('count')
if not all((isinstance(tail_id, int),
isinstance(count, int))):
isinstance(count, int),
tail_id in self.tails)):
raise self.BadMessage()

raise NotImplementedError()
self.tails[tail_id].extend(count)

def on_close_tail(self, content):
tail_id = content.get('id')
Expand All @@ -99,10 +96,27 @@ def on_close_tail(self, content):
if tail_id in self.tails:
self.tails.pop(tail_id).close()

def disconenct(self, close_code):
_LOGGER.debug('WebSocket for user "%s" closed by client with code "%s".', self.user, close_code)
def disconnect(self, code):
_LOGGER.debug('WebSocket for user "%s" closed by client with code "%s".', self.user, code)

if self.user is not None:
async_to_sync(self.channel_layer.group_discard)(
utils.principal_to_user_socket_group_name(self.user.principal),
self.channel_name)

for tail in self.tails.values():
tail.close()
self.tails = {}

self.close()

# Start of Channel Layer message handlers
def incoming_message(self, message):
# don't iterate over active_tails itself as its size may change while we do that.
for tail in list(self.active_tails):
tail.on_message(message['message'])
# End message handlers


class Tail:
def __init__(self, socket, t_id, start, t_filter):
Expand All @@ -114,20 +128,84 @@ def __init__(self, socket, t_id, start, t_filter):
self.active = False
self.messages_sent = 0
self.messages_wanted = 0
self.message_buffer = None

def close(self):
self.deactivate()
self.socket = None
# TODO: stop doing things, once we figure out what things are.
raise NotImplementedError()

def extend(self, count):
pass
_LOGGER.debug('tail: extending %i', count)

self.messages_wanted = max(count - self.messages_sent,
self.messages_wanted)
self.do_query()

def activate(self):
pass
if not self.active:
self.active = True
self.socket.active_tails.add(self)

def deactivate(self):
pass
if self.active:
self.active = False
self.socket.active_tails.remove(self)

def do_query(self):
pass
if self.socket is None:
return

if self.active:
return

if self.messages_wanted == 0:
return

self.activate()
self.message_buffer = []
qs = self.user.message_set.filter(id__gt=self.last_sent)
qs = self.t_filter.apply_to_queryset(qs)[:self.messages_wanted]
messages = [{'id': msg.id,
'payload': serializers.MessageSerializer(msg).data,
} for msg in list(qs)]
_LOGGER.debug('tail query returned %i messages', len(messages))
self.emit_messages(messages)

if self.messages_wanted:
message_buffer, self.message_buffer = self.message_buffer, None
message_buffer = [msg for msg in message_buffer if msg.id > self.last_sent]
self.emit_messages(message_buffer)

if not self.messages_wanted:
self.deactivate()

def on_message(self, message):
if not self.socket:
return
if not self.t_filter.matches_message(message):
return
if self.last_sent >= message['id']:
return
if isinstance(self.message_buffer, list):
self.message_buffer.append(message)
return

self.emit_messages([message])
if self.messages_wanted == 0:
self.deactivate()

def emit_messages(self, messages):
if messages:
self.socket.send_json({
'type': 'messages',
'id': self.t_id,
'messages': [camelize(msg['payload']) for msg in messages],
'isDone': True,
})
count = len(messages)
self.messages_sent += count
if count >= self.messages_wanted:
self.messages_wanted = 0
else:
self.messages_wanted -= count
self.last_sent = messages[-1]['id']
24 changes: 19 additions & 5 deletions roost_backend/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Generated by Django 3.0.8 on 2020-07-18 15:08
# Generated by Django 3.1.1 on 2020-09-13 17:49

from django.db import migrations, models
import django.db.models.deletion
Expand All @@ -12,15 +12,29 @@ class Migration(migrations.Migration):
]

operations = [
migrations.CreateModel(
name='ServerProcessState',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('data', models.JSONField()),
],
),
migrations.CreateModel(
name='User',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('principal', models.CharField(max_length=255, unique=True)),
('info', models.TextField(default='{}')),
('info', models.JSONField(default=dict)),
('info_version', models.BigIntegerField(default=1)),
],
),
migrations.CreateModel(
name='UserProcessState',
fields=[
('user', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, primary_key=True, related_name='process_state', serialize=False, to='roost_backend.user')),
('data', models.JSONField()),
],
),
migrations.CreateModel(
name='Subscription',
fields=[
Expand All @@ -30,7 +44,7 @@ class Migration(migrations.Migration):
('zrecipient', models.CharField(max_length=255)),
('class_key', models.CharField(max_length=255)),
('instance_key', models.CharField(max_length=255)),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='roost_backend.User')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='roost_backend.user')),
],
options={
'unique_together': {('user', 'zrecipient', 'class_key', 'instance_key')},
Expand All @@ -57,12 +71,12 @@ class Migration(migrations.Migration):
('uid', models.CharField(max_length=16)),
('opcode', models.CharField(blank=True, max_length=255)),
('signature', models.CharField(max_length=255)),
('message', models.BinaryField()),
('message', models.TextField()),
('users', models.ManyToManyField(to='roost_backend.User')),
],
options={
'ordering': ['id'],
'index_together': {('class_key_base', 'instance_key_base'), ('class_key', 'instance_key')},
'index_together': {('class_key', 'instance_key'), ('class_key_base', 'instance_key_base')},
},
),
]
Loading

0 comments on commit ae6c4c1

Please sign in to comment.