Skip to content

Commit

Permalink
fix for #114, including a test case
Browse files Browse the repository at this point in the history
  • Loading branch information
Erik Steringer committed Apr 12, 2022
1 parent 1d969f0 commit 06f1dc1
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 11 deletions.
31 changes: 22 additions & 9 deletions principalmapper/graphing/sts_edges.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from principalmapper.querying import query_interface
from principalmapper.querying.local_policy_simulation import resource_policy_authorization, ResourcePolicyEvalResult, has_matching_statement
from principalmapper.util import arns
from principalmapper.util.case_insensitive_dict import CaseInsensitiveDict


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -66,6 +67,7 @@ def generate_edges_locally(nodes: List[Node], scps: Optional[List[List[dict]]] =
continue

# Check against resource policy
rp_mfa_required = False
sim_result = resource_policy_authorization(
node_source,
arns.get_account_id(node_source.arn),
Expand All @@ -75,11 +77,22 @@ def generate_edges_locally(nodes: List[Node], scps: Optional[List[List[dict]]] =
{},
)

if sim_result == ResourcePolicyEvalResult.DENY_MATCH:
continue # Node was explicitly denied from assuming the role

if sim_result == ResourcePolicyEvalResult.NO_MATCH:
continue # Resource policy must match for sts:AssumeRole, even in same-account scenarios
if sim_result == ResourcePolicyEvalResult.DENY_MATCH or sim_result == ResourcePolicyEvalResult.NO_MATCH:
sim_result = resource_policy_authorization(
node_source,
arns.get_account_id(node_source.arn),
node_destination.trust_policy,
'sts:AssumeRole',
node_destination.arn,
{
'aws:MultiFactorAuthAge': '1',
'aws:MultiFactorAuthPresent': 'true'
}
)
if sim_result == ResourcePolicyEvalResult.DENY_MATCH or sim_result == ResourcePolicyEvalResult.NO_MATCH:
continue
else:
rp_mfa_required = True # Resource Policy auth check passed when MFA elements set

assume_auth, need_mfa = query_interface.local_check_authorization_handling_mfa(
node_source, 'sts:AssumeRole', node_destination.arn, {}, service_control_policy_groups=scps
Expand All @@ -89,21 +102,21 @@ def generate_edges_locally(nodes: List[Node], scps: Optional[List[List[dict]]] =
'Deny',
'sts:AssumeRole',
node_destination.arn,
{},
CaseInsensitiveDict({}),
)
policy_denies_mfa = has_matching_statement(
node_source,
'Deny',
'sts:AssumeRole',
node_destination.arn,
{
CaseInsensitiveDict({
'aws:MultiFactorAuthAge': '1',
'aws:MultiFactorAuthPresent': 'true'
},
}),
)

if assume_auth:
if need_mfa:
if need_mfa or rp_mfa_required:
reason = '(requires MFA) can access via sts:AssumeRole'
else:
reason = 'can access via sts:AssumeRole'
Expand Down
22 changes: 20 additions & 2 deletions tests/build_test_graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

def build_empty_graph() -> Graph:
"""Constructs and returns a Graph object with no nodes, edges, policies, or groups"""
return Graph([], [], [], [], _get_default_metadata())
return Graph([], [], [], [], '000000000000', 'aws', _get_default_metadata())


def build_graph_with_one_admin() -> Graph:
Expand Down Expand Up @@ -72,6 +72,24 @@ def build_playground_graph() -> Graph:
nodes.append(Node(common_iam_prefix + 'role/s3_access_role', 'AIDA00000000000000003', [s3_full_access_policy], [], root_trusted_policy_doc,
None, 0, False, False, None, False, None))

# assumable role with s3 access and MFA required to assume
nodes.append(Node(common_iam_prefix + 'role/mfa_role_with_s3_access', 'AIDA0000000000000099', [s3_full_access_policy], [],
{
'Version': '2012-10-17',
'Statement': [
{
'Effect': 'Allow',
'Principal': {'AWS': 'arn:aws:iam::000000000000:root'},
'Action': 'sts:AssumeRole',
'Condition': {
'Bool': {
'aws:MultiFactorAuthPresent': 'true'
}
}
}
]
}, None, 0, False, False, None, False, None))

# second assumable role with s3 access with alternative trust policy
nodes.append(Node(common_iam_prefix + 'role/s3_access_role_alt', 'AIDA00000000000000004', [s3_full_access_policy], [],
alt_root_trusted_policy_doc, None, 0, False, False, None, False, None))
Expand All @@ -81,7 +99,7 @@ def build_playground_graph() -> Graph:
other_acct_trusted_policy_doc, None, 0, False, False, None, False, None))

# jump user with access to sts:AssumeRole
nodes.append(Node(common_iam_prefix + 'user/jumpuser', 'AIDA00000000000000006', [jump_policy], [], None, None, 1, True, False, None, False, None))
nodes.append(Node(common_iam_prefix + 'user/jumpuser', 'AIDA00000000000000006', [jump_policy], [], None, None, 1, True, False, None, True, None))

# user with S3 access, path in user's ARN
nodes.append(Node(common_iam_prefix + 'user/somepath/some_other_jumpuser', 'AIDA00000000000000007', [jump_policy],
Expand Down
6 changes: 6 additions & 0 deletions tests/test_edge_identification.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,9 @@ def test_admin_access(self):
self.assertTrue(is_connected(graph, admin_user_node, jump_user))
self.assertTrue(is_connected(graph, admin_user_node, nonassumable_role_node))
self.assertTrue(is_connected(graph, other_jump_user, other_assumable_role))

def test_mfa_assume_role(self):
graph = build_playground_graph()
source_node = graph.get_node_by_searchable_name('user/jumpuser')
mfa_target_node = graph.get_node_by_searchable_name('role/mfa_role_with_s3_access')
self.assertTrue(is_connected(graph, source_node, mfa_target_node))

0 comments on commit 06f1dc1

Please sign in to comment.