-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathauthentication.py
69 lines (57 loc) · 2.49 KB
/
authentication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from types import MappingProxyType
from rest_framework import authentication, exceptions
import jwt
from . import models, serializers, secrets
class JWTAuthentication(authentication.TokenAuthentication):
keyword = 'Bearer'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.request = None
self.claims = None
def authenticate(self, request):
self.request = request
self.request.zephyr_credentials = None
return super().authenticate(request)
@staticmethod
def _decode_token(token):
try:
return jwt.decode(token, secrets.AUTHTOKEN_KEY, algorithms=['HS256'])
except (jwt.DecodeError, jwt.InvalidSignatureError):
return None
except jwt.ExpiredSignatureError as exc:
raise exceptions.AuthenticationFailed('Expired token') from exc
except jwt.InvalidAudienceError as exc:
raise exceptions.AuthenticationFailed('Invalid token') from exc
@classmethod
def validate_token(cls, token, raise_on_jwt_error=True):
try:
claims = cls._decode_token(token)
except jwt.exceptions.InvalidTokenError:
if raise_on_jwt_error:
raise
claims = None
if claims is None:
return None, None
user = None
if 'identity' in claims:
user = models.User.objects.filter(**claims['identity']).first()
return claims, user
def authenticate_credentials(self, key):
claims, user = self.validate_token(key)
if claims is None:
# JWT did not parse; allow other authentication schemes to be tried, if any.
return None
# Check for and extract zephyr credentials that may be coming in with the payload.
if 'credentials' in self.request.data:
serializer = serializers.KerberosCredentialsSerializer(data=self.request.data['credentials'],
context={'user': user})
if serializer.is_valid():
self.request.zephyr_credentials = serializer.validated_data
del self.request.data['credentials']
user.send_to_user_subscriber({
'type': 'inject_credentials',
'creds': serializer.validated_data,
})
if user:
return (user, MappingProxyType(claims))
raise exceptions.AuthenticationFailed('User inactive or deleted')