Skip to content

Commit

Permalink
Add RoleAlias support for IoT, enhance search_index() response (#8292)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsuchenia authored Nov 7, 2024
1 parent bf49e22 commit 092bd10
Show file tree
Hide file tree
Showing 7 changed files with 560 additions and 6 deletions.
5 changes: 5 additions & 0 deletions CLOUDFORMATION_COVERAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ Please let us know if you'd like support for a resource not yet listed here.
- [x] update implemented
- [x] delete implemented
- [x] Fn::GetAtt implemented
- AWS::IoT::RoleAlias:
- [x] create implemented
- [x] update implemented
- [x] delete implemented
- [x] Fn::GetAtt implemented
- AWS::KMS::Key:
- [x] create implemented
- [ ] update implemented
Expand Down
10 changes: 5 additions & 5 deletions IMPLEMENTATION_COVERAGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4539,7 +4539,7 @@
- [ ] create_provisioning_claim
- [ ] create_provisioning_template
- [ ] create_provisioning_template_version
- [ ] create_role_alias
- [X] create_role_alias
- [ ] create_scheduled_audit
- [ ] create_security_profile
- [ ] create_stream
Expand Down Expand Up @@ -4572,7 +4572,7 @@
- [ ] delete_provisioning_template
- [ ] delete_provisioning_template_version
- [ ] delete_registration_code
- [ ] delete_role_alias
- [X] delete_role_alias
- [ ] delete_scheduled_audit
- [ ] delete_security_profile
- [ ] delete_stream
Expand Down Expand Up @@ -4609,7 +4609,7 @@
- [ ] describe_mitigation_action
- [ ] describe_provisioning_template
- [ ] describe_provisioning_template_version
- [ ] describe_role_alias
- [X] describe_role_alias
- [ ] describe_scheduled_audit
- [ ] describe_security_profile
- [ ] describe_stream
Expand Down Expand Up @@ -4682,7 +4682,7 @@
- [ ] list_provisioning_template_versions
- [ ] list_provisioning_templates
- [ ] list_related_resources_for_audit_finding
- [ ] list_role_aliases
- [X] list_role_aliases
- [ ] list_sbom_validation_results
- [ ] list_scheduled_audits
- [ ] list_security_profiles
Expand Down Expand Up @@ -4749,7 +4749,7 @@
- [ ] update_package_configuration
- [ ] update_package_version
- [ ] update_provisioning_template
- [ ] update_role_alias
- [X] update_role_alias
- [ ] update_scheduled_audit
- [ ] update_security_profile
- [ ] update_stream
Expand Down
189 changes: 188 additions & 1 deletion moto/iot/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(
region_name: str,
):
self.region_name = region_name
self.account_id = account_id
self.thing_id = str(random.uuid4())
self.thing_name = thing_name
self.thing_type = thing_type
Expand Down Expand Up @@ -80,6 +81,8 @@ def to_dict(
include_default_client_id: bool = False,
include_connectivity: bool = False,
include_thing_id: bool = False,
include_thing_group_names: bool = False,
include_shadows_as_json: bool = False,
) -> Dict[str, Any]:
obj = {
"thingName": self.thing_name,
Expand All @@ -98,6 +101,29 @@ def to_dict(
}
if include_thing_id:
obj["thingId"] = self.thing_id
if include_thing_group_names:
iot_backend = iot_backends[self.account_id][self.region_name]
obj["thingGroupNames"] = [
thing_group.thing_group_name
for thing_group in iot_backend.list_thing_groups(None, None, None)
if self.arn in thing_group.things
]
if include_shadows_as_json:
named_shadows = {
shadow_name: shadow.to_dict()
for shadow_name, shadow in self.thing_shadows.items()
if shadow_name is not None
}
converted_response_format = {
shadow_name: {
**shadow_data["state"],
"metadata": shadow_data["metadata"],
"version": shadow_data["version"],
"hasDelta": "delta" in shadow_data["state"],
}
for shadow_name, shadow_data in named_shadows.items()
}
obj["shadow"] = json.dumps({"name": converted_response_format})
return obj

@staticmethod
Expand Down Expand Up @@ -899,6 +925,108 @@ def to_dict(self) -> Dict[str, Any]:
}


class FakeRoleAlias(CloudFormationModel):
def __init__(
self,
role_alias: str,
role_arn: str,
account_id: str,
region_name: str,
credential_duration_seconds: int = 3600,
):
self.role_alias = role_alias
self.role_arn = role_arn
self.credential_duration_seconds = credential_duration_seconds
self.account_id = account_id
self.region_name = region_name
self.creation_date = time.time()
self.last_modified_date = self.creation_date
self.arn = f"arn:{get_partition(self.region_name)}:iot:{self.region_name}:{self.account_id}:rolealias/{role_alias}"

def to_dict(self) -> Dict[str, Any]:
return {
"roleAlias": self.role_alias,
"roleAliasArn": self.arn,
"roleArn": self.role_arn,
"owner": self.account_id,
"credentialDurationSeconds": self.credential_duration_seconds,
"creationDate": self.creation_date,
"lastModifiedDate": self.last_modified_date,
}

@staticmethod
def cloudformation_name_type() -> str:
return "RoleAlias"

@staticmethod
def cloudformation_type() -> str:
return "AWS::IoT::RoleAlias"

@classmethod
def has_cfn_attr(cls, attr: str) -> bool:
return attr in [
"RoleAliasArn",
]

def get_cfn_attribute(self, attribute_name: str) -> Any:
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException

if attribute_name == "RoleAliasArn":
return self.arn
raise UnformattedGetAttTemplateException()

@classmethod
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "FakeRoleAlias":
iot_backend = iot_backends[account_id][region_name]
properties = cloudformation_json["Properties"]

role_alias_name = properties.get("RoleAlias", resource_name)
role_arn = properties.get("RoleArn")
credential_duration_seconds = properties.get("CredentialDurationSeconds", 3600)

return iot_backend.create_role_alias(
role_alias_name=role_alias_name,
role_arn=role_arn,
credential_duration_seconds=credential_duration_seconds,
)

@classmethod
def update_from_cloudformation_json( # type: ignore[misc]
cls,
original_resource: "FakeRoleAlias",
new_resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
) -> "FakeRoleAlias":
iot_backend = iot_backends[account_id][region_name]
iot_backend.delete_role_alias(role_alias_name=original_resource.role_alias)
return cls.create_from_cloudformation_json(
new_resource_name, cloudformation_json, account_id, region_name
)

@classmethod
def delete_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
) -> None:
properties = cloudformation_json["Properties"]
role_alias_name = properties.get("RoleAlias", resource_name)

iot_backend = iot_backends[account_id][region_name]
iot_backend.delete_role_alias(role_alias_name=role_alias_name)


class IoTBackend(BaseBackend):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
Expand All @@ -917,6 +1045,7 @@ def __init__(self, region_name: str, account_id: str):
OrderedDict()
)
self.rules: Dict[str, FakeRule] = OrderedDict()
self.role_aliases: Dict[str, FakeRoleAlias] = OrderedDict()
self.endpoint: Optional[FakeEndpoint] = None
self.domain_configurations: Dict[str, FakeDomainConfiguration] = OrderedDict()

Expand Down Expand Up @@ -2195,7 +2324,65 @@ def search_index(self, query_string: str) -> List[Dict[str, Any]]:
things = [
thing for thing in self.things.values() if thing.matches(query_string)
]
return [t.to_dict(include_connectivity=True) for t in things]
return [
t.to_dict(
include_connectivity=True,
include_thing_id=True,
include_thing_group_names=True,
include_shadows_as_json=True,
)
for t in things
]

def create_role_alias(
self,
role_alias_name: str,
role_arn: str,
credential_duration_seconds: int = 3600,
) -> FakeRoleAlias:
if role_alias_name in self.role_aliases:
current_role_alias = self.role_aliases[role_alias_name]
raise ResourceAlreadyExistsException(
f"RoleAlias cannot be created - already exists (name={role_alias_name})",
current_role_alias.role_alias,
current_role_alias.arn,
)
new_role_alias = FakeRoleAlias(
role_alias=role_alias_name,
role_arn=role_arn,
account_id=self.account_id,
region_name=self.region_name,
credential_duration_seconds=credential_duration_seconds,
)
self.role_aliases[role_alias_name] = new_role_alias
return new_role_alias

def list_role_aliases(self) -> Iterable[FakeRoleAlias]:
return self.role_aliases.values()

def describe_role_alias(self, role_alias_name: str) -> FakeRoleAlias:
if role_alias_name not in self.role_aliases:
raise ResourceNotFoundException(
f"RoleAlias not found (name= {role_alias_name})",
)
return self.role_aliases[role_alias_name]

def update_role_alias(
self,
role_alias_name: str,
role_arn: Optional[str] = None,
credential_duration_seconds: Optional[int] = None,
) -> FakeRoleAlias:
role_alias = self.describe_role_alias(role_alias_name=role_alias_name)
if role_arn:
role_alias.role_arn = role_arn
if credential_duration_seconds:
role_alias.credential_duration_seconds = credential_duration_seconds
return role_alias

def delete_role_alias(self, role_alias_name: str) -> None:
self.describe_role_alias(role_alias_name=role_alias_name)
del self.role_aliases[role_alias_name]


iot_backends = BackendDict(IoTBackend, "iot")
57 changes: 57 additions & 0 deletions moto/iot/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,3 +769,60 @@ def search_index(self) -> str:
query = self._get_param("queryString")
things = self.iot_backend.search_index(query)
return json.dumps({"things": things, "thingGroups": []})

def create_role_alias(self) -> str:
role_alias_name = self._get_param("roleAlias")
role_arn = self._get_param("roleArn")
credential_duration_seconds = self._get_int_param(
"credentialDurationSeconds", 3600
)
created_role_alias = self.iot_backend.create_role_alias(
role_alias_name=role_alias_name,
role_arn=role_arn,
credential_duration_seconds=credential_duration_seconds,
)
return json.dumps(
dict(
roleAlias=created_role_alias.role_alias,
roleAliasArn=created_role_alias.arn,
)
)

def list_role_aliases(self) -> str:
# page_size = self._get_int_param("pageSize")
# marker = self._get_param("marker")
# ascending_order = self._get_param("ascendingOrder")
return json.dumps(
dict(
roleAliases=[_.role_alias for _ in self.iot_backend.list_role_aliases()]
)
)

def describe_role_alias(self) -> str:
role_alias_name = self._get_param("roleAlias")
role_alias = self.iot_backend.describe_role_alias(
role_alias_name=role_alias_name
)
return json.dumps({"roleAliasDescription": role_alias.to_dict()})

def update_role_alias(self) -> str:
role_alias_name = self._get_param("roleAlias")
role_arn = self._get_param("roleArn", None)
credential_duration_seconds = self._get_int_param(
"credentialDurationSeconds", 0
)

role_alias = self.iot_backend.update_role_alias(
role_alias_name=role_alias_name,
role_arn=role_arn,
credential_duration_seconds=credential_duration_seconds,
)

return json.dumps(
dict(roleAlias=role_alias.role_alias, roleAliasArn=role_alias.arn)
)

def delete_role_alias(self) -> str:
role_alias_name = self._get_param("roleAlias")
self.iot_backend.delete_role_alias(role_alias_name=role_alias_name)
return json.dumps({})
Loading

0 comments on commit 092bd10

Please sign in to comment.