Skip to content

Policy

Policy conditions

A policy can define conditions under which queries or fields are allowed.

  • claim

    Must be a JSON object containing a name key indicating the name of the claim to look for, an includes array that sppecifies values that the claim must includes:

      "claim": {
        "name": "scopes",
        "includes": ["_staff"]
      }
    
  • is_claim_present

    The name of a JWT claim that must be present:

    "is_claim_present": "iss"
    

Policy principals

The "principal" array specifies a subset of user who are concerned by the policy.

  • authenticated

    Math all authenticated user i.e: any user with a valid JWT

  • perm:<permission>

    Match all users with the given permission

  • role:<role>

    Match all users belonging to this role

  • staff

    Match staff user only

Example:

"principal": ["perm:book:borrow", "role:customer"]

Match users with "book:borrow" permission and "customer" role.

Expose functions to evaluate policies.

authorized(claims, info) async

Evaluate authorization policies with the JWT claims and the query infos.

Parameters:

Name Type Description Default
claims dict

JWT claims

required
info GraphQLResolveInfo

Query infos

required

Returns:

Type Description
bool

bool: True if authorized, False otherwise

Source code in turbulette/apps/auth/policy/base.py
async def authorized(claims: dict, info: GraphQLResolveInfo) -> bool:
    """Evaluate authorization policies with the JWT claims and the query infos.

    Args:
        claims (dict): JWT claims
        info (GraphQLResolveInfo): Query infos

    Returns:
        bool: True if authorized, False otherwise
    """
    policies = get_policy_config()
    policies = await policy.involved(claims, policies, info)
    if not policies:  # pragma: no cover ### can't cover both all policy types and none
        return False
    valid_policies = await policy.with_valid_conditions(claims, policies, info)
    if (
        not valid_policies
    ):  # pragma: no cover ### can't cover both all context types and none
        return False
    applied_policies = policy.apply(info, valid_policies)
    authorized_ = bool(applied_policies) and all(applied_policies)
    return authorized_

Core policy logic.

PolicyType

Store policy resolvers and handle core logic to apply policies.

Policy evaluation follows the principle of least privilege : Access is authorized if any policy allows it, and no policy denies it

condition(self, key)

Decorator to add a condition resolver.

Parameters:

Name Type Description Default
key str

Condition key to use in the policy config

required
Source code in turbulette/apps/auth/policy/policy.py
def condition(self, key: str) -> Callable[[Callable], ConditionResolver]:
    """Decorator to add a condition resolver.

    Args:
        key (str): Condition key to use in the policy config
    """
    if not isinstance(key, str):
        raise ValueError(
            "policy condition decorator should be passed a key: "
            '@policy.condition("bar")'
        )
    return self._create_register_conditions(key)

involved(self, claims, policies, info) async

Given a list of policies, return only those where one of the principal match.

All principal resolvers will be tested. For a policy to be involved, at least one of the resolvers must return True.

Parameters:

Name Type Description Default
claims Dict[str, Any]

Current JWT claims

required
policies List[Dict[str, Dict]]

The policy list to filter

required

Returns:

Type Description
List[Dict[str, Dict]]

List[Policy]: Involved policies

Source code in turbulette/apps/auth/policy/policy.py
async def involved(
    self, claims: Claims, policies: List[Policy], info: GraphQLResolveInfo
) -> List[Policy]:
    """Given a list of policies, return only those where one of the principal match.

    All principal resolvers will be tested. For a policy to be involved,
    at least one of the resolvers must return True.

    Args:
        claims (Claims): Current JWT claims
        policies (List[Policy]): The policy list to filter

    Returns:
        List[Policy]: Involved policies
    """
    res = []
    for policy in policies:
        for statement in policy[KEY_PRINCIPAL]:
            parsed = statement.split(":", 1)
            val = parsed[0] if len(parsed) == 1 else parsed[1]
            if await self._principals[parsed[0]](val, claims, info):
                res.append(policy)
                break
    return res

principal(self, key)

Decorator to add principal resolver.

Parameters:

Name Type Description Default
key str

Principal key to use in the policy config

required
Source code in turbulette/apps/auth/policy/policy.py
def principal(self, key: str) -> Callable[[Callable], PrincipalResolver]:
    """Decorator to add principal resolver.

    Args:
        key (str): Principal key to use in the policy config
    """
    if not isinstance(key, str):
        raise ValueError(
            "policy principal decorator should be passed a key: "
            '@policy.principal("foo")'
        )
    return self._create_register_principal(key)

with_valid_conditions(self, claims, policies, info) async

Given a list of policies, return only those with valid conditions.

All condition resolvers will be tested. For a policy to have valid conditions, all resolvers must return True.

Parameters:

Name Type Description Default
claims Dict[str, Any]

Current JWT claims

required
policies List[Dict[str, Dict]]

The policy to filter

required

Returns:

Type Description
List[Dict[str, Dict]]

List[Policy]: Policies where all conditions are valid

Source code in turbulette/apps/auth/policy/policy.py
async def with_valid_conditions(
    self, claims: Claims, policies: List[Policy], info: GraphQLResolveInfo
) -> List[Policy]:
    """Given a list of policies, return only those with valid conditions.

    All condition resolvers will be tested. For a policy to have valid conditions,
    all resolvers must return True.

    Args:
        claims (Claims): Current JWT claims
        policies (List[Policy]): The policy to filter

    Returns:
        List[Policy]: Policies where all conditions are valid
    """
    valid_policies = []
    for policy in policies:
        if KEY_CONDITIONS not in policy:
            valid_policies.append(policy)
        else:
            for statement, val in policy[KEY_CONDITIONS].items():
                if not await self._conditions[statement](val, claims, info):
                    break
            else:
                valid_policies.append(policy)
    return valid_policies

Last update: 2021-02-18