core package
Core authentication logic.
TokenType
Type of JWTs available.
ACCESS
: used to access a specific resource (usually a query or a mutation)REFRESH
: needed to refresh an expired access token
decode_jwt(jwt)
Decode JSON web token.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
jwt |
str |
The JSON web token |
required |
Exceptions:
Type | Description |
---|---|
JSONWebTokenError |
The signature has expired or the token is invalid |
Returns:
Type | Description |
---|---|
Tuple |
The user id |
Source code in turbulette/apps/auth/core.py
def decode_jwt(jwt: str) -> Tuple:
"""Decode JSON web token.
Args:
jwt: The JSON web token
Raises:
JSONWebTokenError: The signature has expired or the token is invalid
Returns:
The user id
"""
if settings.JWT_ENCRYPT:
token = JWE()
try:
token.deserialize(jwt.replace("\\", ""))
except InvalidJWEData as error:
raise JWEInvalidToken from error
try:
token.decrypt(_encryption_key)
except InvalidJWEData as error:
raise JWEDecryptionError from error
jwt = token.payload.decode("utf-8")
if not settings.JWT_VERIFY:
return process_jwt(jwt)
try:
return verify_jwt(
jwt,
_secret_key,
checks_optional=settings.JWT_VERIFY_EXPIRATION,
iat_skew=settings.JWT_LEEWAY,
allowed_algs=[settings.JWT_ALGORITHM],
)
except (InvalidJWSObject, UnicodeDecodeError) as error:
raise JWTDecodeError from error
except InvalidJWSSignature as error:
raise JWTInvalidSignature from error
except Exception as error:
raise JWTExpired from error
encode_jwt(payload, token_type)
Encode a JWT from the given payload and token type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
payload |
dict |
JWT payload |
required |
token_type |
TokenType |
Type of the encoded token |
required |
Returns:
Type | Description |
---|---|
str |
str: The encoded JWT |
Source code in turbulette/apps/auth/core.py
def encode_jwt(payload: dict, token_type: TokenType) -> str:
"""Encode a JWT from the given payload and token type.
Args:
payload (dict): JWT payload
token_type (TokenType): Type of the encoded token
Returns:
str: The encoded JWT
"""
exp = (
settings.JWT_EXPIRATION_DELTA
if token_type is TokenType.ACCESS
else settings.JWT_REFRESH_EXPIRATION_DELTA
)
jti_size = (
settings.JWT_JTI_SIZE
if settings.JWT_BLACKLIST_ENABLED
and token_type.value in settings.JWT_BLACKLIST_TOKEN_CHECKS
else 0
)
payload["type"] = token_type.value
token = generate_jwt(
payload,
_secret_key,
algorithm=settings.JWT_ALGORITHM,
lifetime=exp,
jti_size=jti_size,
)
if settings.JWT_ENCRYPT:
token = JWE(
plaintext=token.encode("utf-8"),
protected={
"alg": settings.JWE_ALGORITHM,
"enc": settings.JWE_ENCRYPTION,
"typ": "JWE",
},
)
token.add_recipient(_encryption_key)
token = token.serialize()
return token
get_password_hash(password)
Get the password hash.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
password |
str |
The password to hash |
required |
Returns:
Type | Description |
---|---|
str |
The resulting hash |
Source code in turbulette/apps/auth/core.py
def get_password_hash(password: str) -> str:
"""Get the password hash.
Args:
password: The password to hash
Returns:
The resulting hash
"""
return pwd_context.hash(password)
get_token_from_user(user)
async
A shortcut to get the token directly from a user model instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user |
BaseUser |
GINO model instance of |
required |
Returns:
Type | Description |
---|---|
str |
The user JWT |
Source code in turbulette/apps/auth/core.py
async def get_token_from_user(user: user_model) -> str:
"""A shortcut to get the token directly from a user model instance.
Args:
user: GINO model instance of `AUTH_USER_MODEL`
Returns:
The user JWT
"""
return encode_jwt(await jwt_payload(user), TokenType.ACCESS)
jwt_payload(user)
async
Get the JWT payload from a user object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user |
BaseUser |
An instance of |
required |
Returns:
Type | Description |
---|---|
dict |
dict: The JWT payload |
Source code in turbulette/apps/auth/core.py
async def jwt_payload(user: user_model) -> dict:
"""Get the JWT payload from a user object.
Args:
user (user_model): An instance of `AUTH_USER_MODEL`
Returns:
dict: The JWT payload
"""
return _jwt_payload(user.get_username(), await _get_scopes(user), user.is_staff)
jwt_payload_from_claims(claims)
Get the full JWT payload from JWT claims.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
claims |
dict |
Holds custom JWT claims (scopes) |
required |
Returns:
Type | Description |
---|---|
dict |
dict: [description] |
Source code in turbulette/apps/auth/core.py
def jwt_payload_from_claims(claims: dict) -> dict:
"""Get the full JWT payload from JWT claims.
Args:
claims (dict): Holds custom JWT claims (scopes)
Returns:
dict: [description]
"""
return _jwt_payload(
claims["sub"], claims["scopes"], STAFF_SCOPE in claims["scopes"]
)
verify_password(plain_password, hashed_password)
Check the password against an existing hash.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
plain_password |
str |
Plain password to check |
required |
hashed_password |
str |
Hashed password to compare to |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in turbulette/apps/auth/core.py
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Check the password against an existing hash.
Args:
plain_password: Plain password to check
hashed_password: Hashed password to compare to
Returns:
`True` if the password matched the hash, else `False`
"""
return pwd_context.verify(plain_password, hashed_password)