Policy
Policy conditions
A policy can define conditions under which queries or fields are allowed.
-
claim
Must be a JSON object containing a
name
key indicating the name of the claim to look for, anincludes
array that sppecifies values that the claim must includes:"claim": { "name": "scopes", "includes": ["_staff"] }
-
is_claim_present
The name of a JWT claim that must be present:
"is_claim_present": "iss"
Policy principals
The "principal"
array specifies a subset of user who are concerned by the policy.
-
authenticated
Math all authenticated user i.e: any user with a valid JWT
-
perm:<permission>
Match all users with the given permission
-
role:<role>
Match all users belonging to this role
-
staff
Match staff user only
Example:
"principal": ["perm:book:borrow", "role:customer"]
Match users with "book:borrow"
permission and "customer"
role.
Expose functions to evaluate policies.
authorized(claims, info)
async
Evaluate authorization policies with the JWT claims and the query infos.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
claims |
dict |
JWT claims |
required |
info |
GraphQLResolveInfo |
Query infos |
required |
Returns:
Type | Description |
---|---|
bool |
bool: True if authorized, False otherwise |
Source code in turbulette/apps/auth/policy/base.py
async def authorized(claims: dict, info: GraphQLResolveInfo) -> bool:
"""Evaluate authorization policies with the JWT claims and the query infos.
Args:
claims (dict): JWT claims
info (GraphQLResolveInfo): Query infos
Returns:
bool: True if authorized, False otherwise
"""
policies = get_policy_config()
policies = await policy.involved(claims, policies, info)
if not policies: # pragma: no cover ### can't cover both all policy types and none
return False
valid_policies = await policy.with_valid_conditions(claims, policies, info)
if (
not valid_policies
): # pragma: no cover ### can't cover both all context types and none
return False
applied_policies = policy.apply(info, valid_policies)
authorized_ = bool(applied_policies) and all(applied_policies)
return authorized_
Core policy logic.
PolicyType
Store policy resolvers and handle core logic to apply policies.
Policy evaluation follows the principle of least privilege : Access is authorized if any policy allows it, and no policy denies it
condition(self, key)
Decorator to add a condition resolver.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
Condition key to use in the policy config |
required |
Source code in turbulette/apps/auth/policy/policy.py
def condition(self, key: str) -> Callable[[Callable], ConditionResolver]:
"""Decorator to add a condition resolver.
Args:
key (str): Condition key to use in the policy config
"""
if not isinstance(key, str):
raise ValueError(
"policy condition decorator should be passed a key: "
'@policy.condition("bar")'
)
return self._create_register_conditions(key)
involved(self, claims, policies, info)
async
Given a list of policies, return only those where one of the principal match.
All principal resolvers will be tested. For a policy to be involved, at least one of the resolvers must return True.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
claims |
Dict[str, Any] |
Current JWT claims |
required |
policies |
List[Dict[str, Dict]] |
The policy list to filter |
required |
Returns:
Type | Description |
---|---|
List[Dict[str, Dict]] |
List[Policy]: Involved policies |
Source code in turbulette/apps/auth/policy/policy.py
async def involved(
self, claims: Claims, policies: List[Policy], info: GraphQLResolveInfo
) -> List[Policy]:
"""Given a list of policies, return only those where one of the principal match.
All principal resolvers will be tested. For a policy to be involved,
at least one of the resolvers must return True.
Args:
claims (Claims): Current JWT claims
policies (List[Policy]): The policy list to filter
Returns:
List[Policy]: Involved policies
"""
res = []
for policy in policies:
for statement in policy[KEY_PRINCIPAL]:
parsed = statement.split(":", 1)
val = parsed[0] if len(parsed) == 1 else parsed[1]
if await self._principals[parsed[0]](val, claims, info):
res.append(policy)
break
return res
principal(self, key)
Decorator to add principal resolver.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
Principal key to use in the policy config |
required |
Source code in turbulette/apps/auth/policy/policy.py
def principal(self, key: str) -> Callable[[Callable], PrincipalResolver]:
"""Decorator to add principal resolver.
Args:
key (str): Principal key to use in the policy config
"""
if not isinstance(key, str):
raise ValueError(
"policy principal decorator should be passed a key: "
'@policy.principal("foo")'
)
return self._create_register_principal(key)
with_valid_conditions(self, claims, policies, info)
async
Given a list of policies, return only those with valid conditions.
All condition resolvers will be tested. For a policy to have valid conditions, all resolvers must return True.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
claims |
Dict[str, Any] |
Current JWT claims |
required |
policies |
List[Dict[str, Dict]] |
The policy to filter |
required |
Returns:
Type | Description |
---|---|
List[Dict[str, Dict]] |
List[Policy]: Policies where all conditions are valid |
Source code in turbulette/apps/auth/policy/policy.py
async def with_valid_conditions(
self, claims: Claims, policies: List[Policy], info: GraphQLResolveInfo
) -> List[Policy]:
"""Given a list of policies, return only those with valid conditions.
All condition resolvers will be tested. For a policy to have valid conditions,
all resolvers must return True.
Args:
claims (Claims): Current JWT claims
policies (List[Policy]): The policy to filter
Returns:
List[Policy]: Policies where all conditions are valid
"""
valid_policies = []
for policy in policies:
if KEY_CONDITIONS not in policy:
valid_policies.append(policy)
else:
for statement, val in policy[KEY_CONDITIONS].items():
if not await self._conditions[statement](val, claims, info):
break
else:
valid_policies.append(policy)
return valid_policies