Skip to content

core package

Core authentication logic.

TokenType

Type of JWTs available.

  • ACCESS : used to access a specific resource (usually a query or a mutation)
  • REFRESH : needed to refresh an expired access token

decode_jwt(jwt)

Decode JSON web token.

Parameters:

Name Type Description Default
jwt str

The JSON web token

required

Exceptions:

Type Description
JSONWebTokenError

The signature has expired or the token is invalid

Returns:

Type Description
Tuple

The user id

Source code in turbulette/apps/auth/core.py
def decode_jwt(jwt: str) -> Tuple:
    """Decode JSON web token.

    Args:
        jwt: The JSON web token

    Raises:
        JSONWebTokenError: The signature has expired or the token is invalid

    Returns:
        The user id
    """
    if settings.JWT_ENCRYPT:
        token = JWE()
        try:
            token.deserialize(jwt.replace("\\", ""))
        except InvalidJWEData as error:
            raise JWEInvalidToken from error
        try:
            token.decrypt(_encryption_key)
        except InvalidJWEData as error:
            raise JWEDecryptionError from error
        jwt = token.payload.decode("utf-8")

    if not settings.JWT_VERIFY:
        return process_jwt(jwt)

    try:
        return verify_jwt(
            jwt,
            _secret_key,
            checks_optional=settings.JWT_VERIFY_EXPIRATION,
            iat_skew=settings.JWT_LEEWAY,
            allowed_algs=[settings.JWT_ALGORITHM],
        )
    except (InvalidJWSObject, UnicodeDecodeError) as error:
        raise JWTDecodeError from error
    except InvalidJWSSignature as error:
        raise JWTInvalidSignature from error
    except Exception as error:
        raise JWTExpired from error

encode_jwt(payload, token_type)

Encode a JWT from the given payload and token type.

Parameters:

Name Type Description Default
payload dict

JWT payload

required
token_type TokenType

Type of the encoded token

required

Returns:

Type Description
str

str: The encoded JWT

Source code in turbulette/apps/auth/core.py
def encode_jwt(payload: dict, token_type: TokenType) -> str:
    """Encode a JWT from the given payload and token type.

    Args:
        payload (dict): JWT payload
        token_type (TokenType): Type of the encoded token

    Returns:
        str: The encoded JWT
    """
    exp = (
        settings.JWT_EXPIRATION_DELTA
        if token_type is TokenType.ACCESS
        else settings.JWT_REFRESH_EXPIRATION_DELTA
    )

    jti_size = (
        settings.JWT_JTI_SIZE
        if settings.JWT_BLACKLIST_ENABLED
        and token_type.value in settings.JWT_BLACKLIST_TOKEN_CHECKS
        else 0
    )

    payload["type"] = token_type.value
    token = generate_jwt(
        payload,
        _secret_key,
        algorithm=settings.JWT_ALGORITHM,
        lifetime=exp,
        jti_size=jti_size,
    )

    if settings.JWT_ENCRYPT:
        token = JWE(
            plaintext=token.encode("utf-8"),
            protected={
                "alg": settings.JWE_ALGORITHM,
                "enc": settings.JWE_ENCRYPTION,
                "typ": "JWE",
            },
        )
        token.add_recipient(_encryption_key)
        token = token.serialize()
    return token

get_password_hash(password)

Get the password hash.

Parameters:

Name Type Description Default
password str

The password to hash

required

Returns:

Type Description
str

The resulting hash

Source code in turbulette/apps/auth/core.py
def get_password_hash(password: str) -> str:
    """Get the password hash.

    Args:
        password: The password to hash

    Returns:
        The resulting hash
    """
    return pwd_context.hash(password)

get_token_from_user(user) async

A shortcut to get the token directly from a user model instance.

Parameters:

Name Type Description Default
user BaseUser

GINO model instance of AUTH_USER_MODEL

required

Returns:

Type Description
str

The user JWT

Source code in turbulette/apps/auth/core.py
async def get_token_from_user(user: user_model) -> str:
    """A shortcut to get the token directly from a user model instance.

    Args:
        user: GINO model instance of `AUTH_USER_MODEL`

    Returns:
        The user JWT
    """
    return encode_jwt(await jwt_payload(user), TokenType.ACCESS)

jwt_payload(user) async

Get the JWT payload from a user object.

Parameters:

Name Type Description Default
user BaseUser

An instance of AUTH_USER_MODEL

required

Returns:

Type Description
dict

dict: The JWT payload

Source code in turbulette/apps/auth/core.py
async def jwt_payload(user: user_model) -> dict:
    """Get the JWT payload from a user object.

    Args:
        user (user_model): An instance of `AUTH_USER_MODEL`

    Returns:
        dict: The JWT payload
    """
    return _jwt_payload(user.get_username(), await _get_scopes(user), user.is_staff)

jwt_payload_from_claims(claims)

Get the full JWT payload from JWT claims.

Parameters:

Name Type Description Default
claims dict

Holds custom JWT claims (scopes)

required

Returns:

Type Description
dict

dict: [description]

Source code in turbulette/apps/auth/core.py
def jwt_payload_from_claims(claims: dict) -> dict:
    """Get the full JWT payload from JWT claims.

    Args:
        claims (dict): Holds custom JWT claims (scopes)

    Returns:
        dict: [description]
    """
    return _jwt_payload(
        claims["sub"], claims["scopes"], STAFF_SCOPE in claims["scopes"]
    )

verify_password(plain_password, hashed_password)

Check the password against an existing hash.

Parameters:

Name Type Description Default
plain_password str

Plain password to check

required
hashed_password str

Hashed password to compare to

required

Returns:

Type Description
bool

True if the password matched the hash, else False

Source code in turbulette/apps/auth/core.py
def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Check the password against an existing hash.

    Args:
        plain_password: Plain password to check
        hashed_password: Hashed password to compare to

    Returns:
        `True` if the password matched the hash, else `False`
    """
    return pwd_context.verify(plain_password, hashed_password)

Last update: 2021-02-18