diff --git a/principalmapper/graphing/sts_edges.py b/principalmapper/graphing/sts_edges.py index ce31632..40b72c1 100644 --- a/principalmapper/graphing/sts_edges.py +++ b/principalmapper/graphing/sts_edges.py @@ -25,6 +25,7 @@ from principalmapper.querying import query_interface from principalmapper.querying.local_policy_simulation import resource_policy_authorization, ResourcePolicyEvalResult, has_matching_statement from principalmapper.util import arns +from principalmapper.util.case_insensitive_dict import CaseInsensitiveDict logger = logging.getLogger(__name__) @@ -66,6 +67,7 @@ def generate_edges_locally(nodes: List[Node], scps: Optional[List[List[dict]]] = continue # Check against resource policy + rp_mfa_required = False sim_result = resource_policy_authorization( node_source, arns.get_account_id(node_source.arn), @@ -75,11 +77,22 @@ def generate_edges_locally(nodes: List[Node], scps: Optional[List[List[dict]]] = {}, ) - if sim_result == ResourcePolicyEvalResult.DENY_MATCH: - continue # Node was explicitly denied from assuming the role - - if sim_result == ResourcePolicyEvalResult.NO_MATCH: - continue # Resource policy must match for sts:AssumeRole, even in same-account scenarios + if sim_result == ResourcePolicyEvalResult.DENY_MATCH or sim_result == ResourcePolicyEvalResult.NO_MATCH: + sim_result = resource_policy_authorization( + node_source, + arns.get_account_id(node_source.arn), + node_destination.trust_policy, + 'sts:AssumeRole', + node_destination.arn, + { + 'aws:MultiFactorAuthAge': '1', + 'aws:MultiFactorAuthPresent': 'true' + } + ) + if sim_result == ResourcePolicyEvalResult.DENY_MATCH or sim_result == ResourcePolicyEvalResult.NO_MATCH: + continue + else: + rp_mfa_required = True # Resource Policy auth check passed when MFA elements set assume_auth, need_mfa = query_interface.local_check_authorization_handling_mfa( node_source, 'sts:AssumeRole', node_destination.arn, {}, service_control_policy_groups=scps @@ -89,21 +102,21 @@ def generate_edges_locally(nodes: List[Node], scps: Optional[List[List[dict]]] = 'Deny', 'sts:AssumeRole', node_destination.arn, - {}, + CaseInsensitiveDict({}), ) policy_denies_mfa = has_matching_statement( node_source, 'Deny', 'sts:AssumeRole', node_destination.arn, - { + CaseInsensitiveDict({ 'aws:MultiFactorAuthAge': '1', 'aws:MultiFactorAuthPresent': 'true' - }, + }), ) if assume_auth: - if need_mfa: + if need_mfa or rp_mfa_required: reason = '(requires MFA) can access via sts:AssumeRole' else: reason = 'can access via sts:AssumeRole' diff --git a/tests/build_test_graphs.py b/tests/build_test_graphs.py index fffb74f..dba7f86 100644 --- a/tests/build_test_graphs.py +++ b/tests/build_test_graphs.py @@ -24,7 +24,7 @@ def build_empty_graph() -> Graph: """Constructs and returns a Graph object with no nodes, edges, policies, or groups""" - return Graph([], [], [], [], _get_default_metadata()) + return Graph([], [], [], [], '000000000000', 'aws', _get_default_metadata()) def build_graph_with_one_admin() -> Graph: @@ -72,6 +72,24 @@ def build_playground_graph() -> Graph: nodes.append(Node(common_iam_prefix + 'role/s3_access_role', 'AIDA00000000000000003', [s3_full_access_policy], [], root_trusted_policy_doc, None, 0, False, False, None, False, None)) + # assumable role with s3 access and MFA required to assume + nodes.append(Node(common_iam_prefix + 'role/mfa_role_with_s3_access', 'AIDA0000000000000099', [s3_full_access_policy], [], + { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Allow', + 'Principal': {'AWS': 'arn:aws:iam::000000000000:root'}, + 'Action': 'sts:AssumeRole', + 'Condition': { + 'Bool': { + 'aws:MultiFactorAuthPresent': 'true' + } + } + } + ] + }, None, 0, False, False, None, False, None)) + # second assumable role with s3 access with alternative trust policy nodes.append(Node(common_iam_prefix + 'role/s3_access_role_alt', 'AIDA00000000000000004', [s3_full_access_policy], [], alt_root_trusted_policy_doc, None, 0, False, False, None, False, None)) @@ -81,7 +99,7 @@ def build_playground_graph() -> Graph: other_acct_trusted_policy_doc, None, 0, False, False, None, False, None)) # jump user with access to sts:AssumeRole - nodes.append(Node(common_iam_prefix + 'user/jumpuser', 'AIDA00000000000000006', [jump_policy], [], None, None, 1, True, False, None, False, None)) + nodes.append(Node(common_iam_prefix + 'user/jumpuser', 'AIDA00000000000000006', [jump_policy], [], None, None, 1, True, False, None, True, None)) # user with S3 access, path in user's ARN nodes.append(Node(common_iam_prefix + 'user/somepath/some_other_jumpuser', 'AIDA00000000000000007', [jump_policy], diff --git a/tests/test_edge_identification.py b/tests/test_edge_identification.py index bfe44bf..750ff63 100644 --- a/tests/test_edge_identification.py +++ b/tests/test_edge_identification.py @@ -47,3 +47,9 @@ def test_admin_access(self): self.assertTrue(is_connected(graph, admin_user_node, jump_user)) self.assertTrue(is_connected(graph, admin_user_node, nonassumable_role_node)) self.assertTrue(is_connected(graph, other_jump_user, other_assumable_role)) + + def test_mfa_assume_role(self): + graph = build_playground_graph() + source_node = graph.get_node_by_searchable_name('user/jumpuser') + mfa_target_node = graph.get_node_by_searchable_name('role/mfa_role_with_s3_access') + self.assertTrue(is_connected(graph, source_node, mfa_target_node))