-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsubscribers.py
707 lines (591 loc) · 26 KB
/
subscribers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
import asyncio
import base64
import functools
import logging
import multiprocessing as mp
import os
import random
import signal
from asgiref.sync import sync_to_async, async_to_sync
import channels.consumer
from channels import DEFAULT_CHANNEL_LAYER
from channels.db import database_sync_to_async
import channels.layers
import channels.utils
import django
import django.apps
from django.core.exceptions import AppRegistryNotReady
from django.db import IntegrityError, transaction
from django.db.models import Q
from djangorestframework_camel_case.util import underscoreize
import setproctitle
import zephyr
import _zephyr
from . import utils
_LOGGER = logging.getLogger(__name__)
class _MPDjangoSetupMixin:
"""This mixin runs django.setup() on __init__. It is to be used by classes that are
mp.Process targets."""
# pylint: disable=too-few-public-methods
def __init__(self):
try:
django.apps.apps.check_models_ready()
except AppRegistryNotReady:
django.setup()
super().__init__()
class _ChannelLayerMixin:
"""This mixin can be used to add Django Channels Layers support to a class. To ues it, inherit from
it and define a member `groups` or property `groups` of no arguments that returns an iterable of
groups to subscribe to. Then start a task to run the `channel_layer_handler`, cancel it when you
want to stop. This may be worth extracting to a utility module."""
channel_layer_alias = DEFAULT_CHANNEL_LAYER
def __init__(self):
super().__init__()
self.channel_layer = None
self.channel_name = None
self.channel_receive = None
@property
def groups(self):
raise NotImplementedError()
@property
def log_prefix(self):
return '?'
async def channel_layer_resubscribe(self):
while True:
for group_name in self.groups:
await self.channel_layer.group_add(group_name, self.channel_name)
await asyncio.sleep(self.channel_layer.group_expiry / 2)
async def channel_layer_handler(self):
# Initialize channel layer.
self.channel_layer = channels.layers.get_channel_layer(self.channel_layer_alias)
self.channel_name = await self.channel_layer.new_channel()
self.channel_receive = functools.partial(self.channel_layer.receive, self.channel_name)
# Subscribe to groups
resubscriber_task = asyncio.create_task(self.channel_layer_resubscribe())
await asyncio.sleep(0)
# wait for and dispatch messages until we get cancelled.
while not self.stop_event.is_set():
try:
await channels.utils.await_many_dispatch([self.channel_receive], self.dispatch)
except ValueError as exc:
_LOGGER.error('[%s] Dispatch failed: %s', self.log_prefix, exc)
resubscriber_task.cancel()
async def dispatch(self, message):
# Let's use the same dispatching mechanism that django channels consumers use.
handler_name = channels.consumer.get_handler_name(message)
handler = getattr(self, handler_name, None)
if handler:
await handler(message)
else:
raise ValueError(f'No handler for message type "{message["type"]}"')
class _ZephyrProcessMixin(_ChannelLayerMixin):
"""This mixin contains the core zephyr support for the User Processes and Server Process."""
def __init__(self):
super().__init__()
# Event to indicate that zephyr has been initialized
self.z_initialized = mp.Event()
# Lock to be used around non-threadsafe bits of libzephyr.
self.zephyr_lock = None
self.resync_event = None
self.waiting_for_acks = {}
self._zsubs = None
@property
def principal(self):
raise NotImplementedError()
@property
def log_prefix(self):
return self.principal
def get_subs_qs(self):
raise NotImplementedError()
def _initialize_memory_ccache(self):
utils.kerberos.initialize_memory_ccache(self.principal)
def _add_credential_to_ccache(self, creds):
_LOGGER.debug('[%s] injecting credentials.', self.log_prefix)
utils.kerberos.add_credential_to_ccache(creds, self.principal)
_LOGGER.debug('[%s] (re)initializing zephyr.', self.log_prefix)
self.zinit()
_LOGGER.debug('[%s] asking for subs resync.', self.log_prefix)
self.resync_event.set()
def _have_valid_zephyr_creds(self):
return utils.kerberos.have_valid_zephyr_creds(_zephyr.realm())
def zinit(self):
if self.z_initialized.is_set():
return
_LOGGER.debug('[%s] zinit...', self.log_prefix)
try:
zephyr.init()
self.z_initialized.set()
except OSError:
pass
@database_sync_to_async
def get_subs_to_resync(self):
subs_qs = self.get_subs_qs()
return set(subs_qs.values_list('class_key', 'instance_key', 'zrecipient'))
async def resync_handler(self):
_LOGGER.debug('[%s] resync task started.', self.log_prefix)
try:
while True:
await self.resync_event.wait()
self.resync_event.clear()
_LOGGER.debug('[%s] resync task triggered.', self.log_prefix)
if not self._have_valid_zephyr_creds():
_LOGGER.debug('[%s] resync skipped due to lack of credentials.', self.log_prefix)
continue
if self._zsubs is None:
_LOGGER.debug('[%s] instantiating subs object.', self.log_prefix)
self._zsubs = zephyr.Subscriptions()
self._zsubs.cleanup = False # Don't cancel subs on delete.
async with self.zephyr_lock:
_LOGGER.debug('[%s] resyncing subs object with libzephyr.', self.log_prefix)
self._zsubs.resync()
_LOGGER.debug('[%s] resync got %i subs.', self.log_prefix, len(self._zsubs))
subs = await self.get_subs_to_resync()
good_subs = set()
for sub in subs:
async with self.zephyr_lock:
self._zsubs.add(sub)
good_subs.add(self._zsubs._fixTuple(sub)) # pylint: disable=protected-access
for sub in self._zsubs - good_subs:
async with self.zephyr_lock:
self._zsubs.remove(sub)
except asyncio.CancelledError:
_LOGGER.debug('[%s] resync task cancelled.', self.log_prefix)
raise
async def zephyr_handler(self):
self.zephyr_lock = asyncio.Lock()
self.resync_event = asyncio.Event()
receive_event = asyncio.Event()
loop = asyncio.get_running_loop()
zephyr_fd = None
resync_task = asyncio.create_task(self.resync_handler())
_LOGGER.debug('[%s] zephyr handler started.', self.log_prefix)
try:
await self.load_user_data()
# No need to start looking for incoming messages until we have initialized zephyr.
await sync_to_async(self.z_initialized.wait, thread_sensitive=False)()
zephyr_fd = _zephyr.getFD()
loop.add_reader(zephyr_fd, receive_event.set)
_LOGGER.debug('[%s] zephyr handler now receiving...', self.log_prefix)
while True:
async with self.zephyr_lock:
# Since we're calling this non-blocking, not bothering to wrap and await.
notice = zephyr.receive()
if notice is None:
await receive_event.wait()
_LOGGER.debug('[%s] zephyr handler receive event...', self.log_prefix)
receive_event.clear()
continue
_LOGGER.debug('[%s] got: %s, %s', self.log_prefix, notice, notice.kind)
if notice.kind == zephyr.ZNotice.Kind.hmack:
# Ignore HM Acks
continue
if notice.kind in (zephyr.ZNotice.Kind.servnak,
zephyr.ZNotice.Kind.servack):
# TODO: maybe do something different for servnak?
key = utils.notice_to_zuid_key(notice)
ack_reply_channel = self.waiting_for_acks.pop(key, None)
if ack_reply_channel:
await self.channel_layer.send(ack_reply_channel, {
'ack': notice.fields[0].decode('utf-8')
})
continue
if notice.opcode.lower() == b'ping':
# Ignoring pings
continue
# This appears to be an incoming message.
msg = django.apps.apps.get_model('roost_backend', 'Message').from_notice(notice)
_LOGGER.debug('%s', msg)
await database_sync_to_async(msg.save)()
except asyncio.CancelledError:
_LOGGER.debug('[%s] zephyr handler cancelled.', self.log_prefix)
if zephyr_fd:
loop.remove_reader(zephyr_fd)
await self.save_user_data()
raise
finally:
resync_task.cancel()
if not self.z_initialized.is_set():
# unstick sync_to_async on z_initialized.wait if it's still there.
self.z_initialized.set()
_LOGGER.debug('[%s] zephyr handler done.', self.log_prefix)
@database_sync_to_async
def _load_user_data(self):
if self.principal is None:
# Server process
obj = django.apps.apps.get_model('roost_backend', 'ServerProcessState').load()
else:
obj = django.apps.apps.get_model('roost_backend', 'UserProcessState').objects.filter(
user__principal=self.principal).first()
_LOGGER.debug('[%s] user data: %s', self.log_prefix, obj)
if obj:
return obj.data
return None
async def load_user_data(self):
data = await self._load_user_data()
if data:
if 'session_data' in data:
# If we have session data, reinitialize libzephyr with it.
session_data = base64.b64decode(data['session_data'])
try:
async with self.zephyr_lock:
zephyr.init(session_data=session_data)
self.z_initialized.set()
except OSError:
pass
if 'kerberos_data' in data and data['kerberos_data']:
# If we have credentials, inject them into our ccache.
# This will also initialize libzephyr if there was no session data.
# TODO: filter out expired credentials?
# TODO: support importing a list of credentials.
await database_sync_to_async(self._add_credential_to_ccache)(data['kerberos_data'])
if self.principal is None:
# The server process always has credentials; if we did not load state, initialize things now.
await sync_to_async(self.zinit)()
self.resync_event.set()
@database_sync_to_async
def _save_user_data(self, data):
if self.principal is None:
obj = django.apps.apps.get_model('roost_backend', 'ServerProcessState').load()
if 'kerberos_data' in data:
del data['kerberos_data']
obj.data = data
obj.save()
else:
ups = django.apps.apps.get_model('roost_backend', 'UserProcessState')
try:
with transaction.atomic():
ups.objects.update_or_create(user_id=self.uid, defaults={
'data': data,
})
except IntegrityError:
_LOGGER.debug('[%s] saving user data failed; user deleted?', self.log_prefix)
return
_LOGGER.debug('[%s] saving user data done.', self.log_prefix)
async def save_user_data(self):
# TODO: support exporting multiple credentials.
if not self.z_initialized.is_set():
return
_LOGGER.debug('[%s] saving user data...', self.log_prefix)
async with self.zephyr_lock:
zephyr_session = _zephyr.dumpSession()
zephyr_realm = _zephyr.realm()
data = {
'session_data': base64.b64encode(zephyr_session).decode('ascii'),
'kerberos_data': underscoreize(utils.kerberos.get_zephyr_creds_dict(zephyr_realm)),
}
for _ in range(4):
try:
await self._save_user_data(data)
break
except django.db.utils.OperationalError:
_LOGGER.warning('[%s] saving user data failed, trying again...', self.log_prefix)
await asyncio.sleep(random.random()) # jitter
else:
_LOGGER.error('[%s] saving user data failed, giving up.', self.log_prefix)
# Start of Channel Layer message handlers
async def zwrite(self, message):
msg_args = message['message']
reply_channel = message.pop('_reply_to', None)
await sync_to_async(self.zinit)()
if not self.z_initialized.is_set():
if reply_channel:
await self.channel_layer.send(reply_channel, {
'error': 'failed to initialize zephyr library.'
})
return
notice_args = {
k: v.encode()
for k, v in msg_args.items()
}
if 'signature' in notice_args:
sig = notice_args.pop('signature')
if isinstance(sig, bytes):
sig = sig.split(b'\x00', 1)[0]
notice_args['message'] = b'\x00'.join([
sig,
notice_args['message'],
])
if notice_args['recipient'].startswith(b'*'):
notice_args['recipient'] = notice_args['recipient'][1:]
notice_args['cls'] = notice_args.pop('class')
notice = zephyr.ZNotice(**notice_args)
async with self.zephyr_lock:
await sync_to_async(notice.send)()
if reply_channel is not None:
# Doing this under the lock ensures that we put the reply_channel in the dict before
# we can process any ACK.
self.waiting_for_acks[utils.notice_to_zuid_key(notice)] = reply_channel
msg = django.apps.apps.get_model('roost_backend', 'Message').from_notice(notice, is_outgoing=True)
_LOGGER.debug('%s', msg)
if msg.is_personal:
# Only save outbound personals.
# TODO: re-evaluate this decision.
await database_sync_to_async(msg.save)()
async def resync_subscriptions(self, _message):
self.resync_event.set()
async def retrieve_subscriptions(self, message):
# This is a debugging endpoint to query for the set of subs held by this subscriber.
_LOGGER.debug('[%s] retrieving subscriptions...', self.log_prefix)
reply_channel = message.pop('_reply_to')
ret = set()
if reply_channel and self._zsubs is not None:
async with self.zephyr_lock:
# While this does not actually call libzephyr, we don't want subs changing out from
# under us.
ret.update(tuple(elt.decode('utf-8') for elt in sub) for sub in self._zsubs)
_LOGGER.debug('[%s] retrieved %i subscriptions.', self.log_prefix, len(ret))
if reply_channel:
await self.channel_layer.send(reply_channel, {'subscriptions': sorted(list(ret))})
# End message handlers
class Manager:
"""This class is used by an outside caller to start and stop the set
of user processes."""
def __init__(self, enabled=True):
self._proc = None
self._stop_event = None
self._enabled = enabled
def __enter__(self):
if self._enabled:
self.start()
def __exit__(self, exc_type, exc_value, traceback):
self.stop()
@property
def log_prefix(self):
return 'Manager'
def start(self):
if self._proc:
if self._proc.is_alive():
# Alive and well; nothing to do.
return
# Clean up after an unexpectedly dead process.
self.stop()
self._stop_event = mp.Event()
self._proc = mp.Process(target=Overseer, args=(self._stop_event,))
self._proc.start()
def stop(self):
ret = False
if self._proc:
if self._proc.is_alive():
ret = True
self._stop_event.set()
self._proc.join()
self._proc = None
self._stop_event = None
return ret
class Overseer(_MPDjangoSetupMixin, _ChannelLayerMixin):
"""This class is forked by the Manager class. It is responsible for
forking off the individual user processes and restarting them if
necessary, as well as for telling them to stop upon from request
the Manager."""
# TODO: make this more async
# TODO: hook into channels layer to alert about new/deleted users.
groups = ['OVERSEER']
def __init__(self, stop_event, start=True):
super().__init__()
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
self.stop_event = stop_event
self.pid = os.getpid()
self.user_tasks = {}
self.user_stop_events = {}
self.ctx = mp.get_context('forkserver')
self.server_stop_event = None
if start:
self.start()
def __str__(self):
return f'Overseer<{self.pid}>'
@property
def log_prefix(self):
return 'OVERSEER'
def start(self):
setproctitle.setproctitle('roost:OVERSEER')
async_to_sync(self.oversee)()
@database_sync_to_async
def get_users(self):
return list(django.apps.apps.get_model('roost_backend', 'User').objects.all().values_list('id', 'principal'))
async def wait_for_stop_event(self):
# Wrapper around sync_to_async to make asyncio happy.
await sync_to_async(self.stop_event.wait, thread_sensitive=False)()
async def oversee(self):
_LOGGER.debug('[OVERSEER] starting...')
channel_task = asyncio.create_task(self.channel_layer_handler())
server_task = asyncio.create_task(self.server_process_watcher())
user_list = await self.get_users()
for (uid, principal) in user_list:
self.user_tasks[principal] = asyncio.create_task(self.user_process_watcher(principal, uid))
await asyncio.sleep(0)
_LOGGER.debug('[OVERSEER] waiting for stop event...')
await sync_to_async(self.stop_event.wait, thread_sensitive=False)()
_LOGGER.debug('[OVERSEER] received stop event...')
if self.server_stop_event:
self.server_stop_event.set()
for event in self.user_stop_events.values():
event.set()
tasks = [task for task in self.user_tasks.values() if task is not None]
tasks.append(server_task)
await asyncio.wait(tasks)
channel_task.cancel()
try:
await channel_task
except asyncio.CancelledError:
pass
_LOGGER.debug('[OVERSEER] done.')
async def server_process_watcher(self):
while not self.stop_event.is_set():
stop_event = self.server_stop_event = self.ctx.Event()
proc = self.ctx.Process(target=ServerSubscriber, args=(stop_event,))
proc.start()
await sync_to_async(proc.join)()
if stop_event.is_set():
break
async def user_process_watcher(self, principal, uid):
while not self.stop_event.is_set():
stop_event = self.user_stop_events[principal] = self.ctx.Event()
proc = self.ctx.Process(target=UserSubscriber, args=(principal, uid, stop_event))
proc.start()
await sync_to_async(proc.join)()
if stop_event.is_set():
break
# Start of Channel Layer message handlers
async def add_user(self, message):
# {'type': 'add_user',
# 'principal': '<principal of new user>',
# 'uid': '<db id of new user>'}
# Spawns user process for user if not already running.
principal = message['principal']
uid = message['uid']
if principal not in self.user_tasks:
self.user_tasks[principal] = asyncio.create_task(self.user_process_watcher(principal, uid))
async def del_user(self, message):
# {'type': 'del_user', 'principal': '<principal of deleted user>'}
# Kills user process for user if running.
principal = message['principal']
cancel_event = self.user_stop_events.pop(principal, None)
if cancel_event:
cancel_event.set()
task = self.user_tasks.pop(principal, None)
if task:
await task
# End message handlers
class UserSubscriber(_MPDjangoSetupMixin, _ZephyrProcessMixin):
"""Kerberos and zephyr are not particularly threadsafe, so each user
will have their own process."""
def __init__(self, principal, uid, stop_event, start=True):
# pylint: disable=too-many-arguments
super().__init__()
self._principal = principal
self.uid = uid
self.stop_event = stop_event
if start:
self.start()
def __str__(self):
return f'UserSubscriber<{self.principal}>'
@property
def groups(self):
# The _ChannelLayerMixin requires us to define this.
return [utils.principal_to_user_subscriber_group_name(self.principal)]
@property
def principal(self):
# The _ZephyrProcessMixin requires us to define this.
return self._principal
def get_subs_qs(self):
# The _ZephyrProcessMixin requires us to define this.
subs_qs = django.apps.apps.get_model('roost_backend', 'Subscription').objects.all()
subs_qs = subs_qs.filter(user__principal=self.principal, zrecipient=self.principal)
return subs_qs
def start(self):
_LOGGER.debug('%s starting...', self)
setproctitle.setproctitle(f'roost:{self.principal}')
self._initialize_memory_ccache()
async_to_sync(self.run)()
async def wait_for_stop_event(self):
# Wrapper around sync_to_async to make asyncio happy.
await sync_to_async(self.stop_event.wait, thread_sensitive=False)()
_LOGGER.debug('[%s] done waiting for stop_event.', self.log_prefix)
async def run(self):
tasks = [
asyncio.create_task(self.wait_for_stop_event()),
asyncio.create_task(self.zephyr_handler()),
asyncio.create_task(self.channel_layer_handler()),
]
while self.channel_layer is None:
await asyncio.sleep(0)
# Announce our activation.
_LOGGER.debug('[%s] announcing activation...', self.log_prefix)
await self.channel_layer.send(utils.principal_to_user_subscriber_announce_channel(self.principal), {
'type': 'announce_user_subscriber',
'principal': self.principal,
})
_LOGGER.debug('[%s] announced.', self.log_prefix)
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in tasks:
task.cancel()
for task in tasks:
try:
await task
except asyncio.CancelledError:
pass
_LOGGER.debug('[%s] done.', self.log_prefix)
# Start of Channel Layer message handlers
async def inject_credentials(self, message):
await database_sync_to_async(self._add_credential_to_ccache)(message['creds'])
async def have_valid_credentials(self, message):
reply_channel = message.pop('_reply_to')
valid_creds = await sync_to_async(self._have_valid_zephyr_creds)()
await self.channel_layer.send(reply_channel, {'valid': valid_creds})
# End message handlers
class ServerSubscriber(_MPDjangoSetupMixin, _ZephyrProcessMixin):
"""Like the UserSubscriber, but for shared subscriptions."""
def __init__(self, stop_event, start=True):
super().__init__()
self.uid = None
self.stop_event = stop_event
if start:
self.start()
def __str__(self):
return 'ServerSubscriber'
# The _ChannelLayerMixin requires us to define this.
groups = ['ROOST_SERVER_PROCESS']
@property
def principal(self):
# The _ZephyrProcessMixin requires us to define this.
return None
@property
def log_prefix(self):
return 'ServerSubscriber'
def get_subs_qs(self):
# The _ZephyrProcessMixin requires us to define this.
subs_qs = django.apps.apps.get_model('roost_backend', 'Subscription').objects.all()
subs_qs = subs_qs.filter(Q(zrecipient='') | Q(zrecipient__startswith='@'))
return subs_qs
def start(self):
_LOGGER.debug('%s starting...', self)
setproctitle.setproctitle('roost:server_subscriber')
utils.kerberos.initialize_memory_ccache_from_client_keytab()
async_to_sync(self.run)()
async def wait_for_stop_event(self):
# Wrapper around sync_to_async to make asyncio happy.
await sync_to_async(self.stop_event.wait, thread_sensitive=False)()
async def run(self):
tasks = [
asyncio.create_task(self.wait_for_stop_event()),
asyncio.create_task(self.zephyr_handler()),
asyncio.create_task(self.channel_layer_handler()),
]
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in tasks:
task.cancel()
for task in tasks:
try:
await task
except asyncio.CancelledError:
pass
_LOGGER.debug('[%s] done.', self.log_prefix)
def _have_valid_zephyr_creds(self):
# The server subscriber can renew its own credentials as needed, so
# let's do that when we check to see if we have valid creds and find
# we don't.
if super()._have_valid_zephyr_creds():
return True
utils.kerberos.initialize_memory_ccache_from_client_keytab(reinit=True)
return super()._have_valid_zephyr_creds()