Skip to content

Commit

Permalink
Merge pull request #4 from negz/pythonic
Browse files Browse the repository at this point in the history
Apply more best practices to the Python function
  • Loading branch information
negz authored Nov 8, 2024
2 parents 7b5c863 + c5d02fb commit b06f958
Showing 1 changed file with 58 additions and 50 deletions.
108 changes: 58 additions & 50 deletions functions/compose-bucket-python/main.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,71 @@
from crossplane.function import resource
from crossplane.function.proto.v1 import run_function_pb2 as fnv1
from model.io.k8s.apimachinery.pkg.apis.meta import v1 as metav1
from model.com.example.platform.xstoragebucket import v1alpha1
from model.io.upbound.aws.s3.bucket import v1beta1 as bucketv1beta1
from model.io.upbound.aws.s3.bucketacl import v1beta1 as aclv1beta1
from model.io.upbound.aws.s3.bucketversioning import v1beta1 as verv1beta1
from model.io.upbound.aws.s3.bucketserversideencryptionconfiguration import v1beta1 as ssev1beta1

from .model.io.k8s.apimachinery.pkg.apis.meta import v1 as metav1
from .model.com.example.platform.xstoragebucket import v1alpha1
from .model.io.upbound.aws.s3.bucket import v1beta1 as bucketv1beta1
from .model.io.upbound.aws.s3.bucketacl import v1beta1 as aclv1beta1
from .model.io.upbound.aws.s3.bucketversioning import v1beta1 as verv1beta1
from .model.io.upbound.aws.s3.bucketserversideencryptionconfiguration import (
v1beta1 as ssev1beta1,
)


def compose(req: fnv1.RunFunctionRequest, rsp: fnv1.RunFunctionResponse):
observed_xr = v1alpha1.XStorageBucket(**req.observed.composite.resource)
xr_name = observed_xr.metadata.name
bucket_name = xr_name + "-bucket"
params = observed_xr.spec.parameters

bucket = bucketv1beta1.Bucket(
desired_bucket = bucketv1beta1.Bucket(
apiVersion="s3.aws.upbound.io/v1beta1",
kind="Bucket",
metadata=metav1.ObjectMeta(
name=bucket_name,
),
spec=bucketv1beta1.Spec(
forProvider=bucketv1beta1.ForProvider(
region=params.region,
),
),
)
resource.update(rsp.desired.resources[bucket.metadata.name], bucket)
resource.update(rsp.desired.resources["bucket"], desired_bucket)

# Return early if Crossplane hasn't observed the bucket yet. This means it
# hasn't been created yet. This function will be called again after it is.
if "bucket" not in req.observed.resources:
return

observed_bucket = bucketv1beta1.Bucket(**req.observed.resources["bucket"].resource)

acl = aclv1beta1.BucketACL(
# The desired ACL, encryption, and versioning resources all need to refer to
# the bucket by its external name, which is stored in its external name
# annotation. Return early if the Bucket's external-name annotation isn't
# set yet.
if observed_bucket.metadata is None or observed_bucket.metadata.annotations is None:
return
if "crossplane.io/external-name" not in observed_bucket.metadata.annotations:
return

bucket_external_name = observed_bucket.metadata.annotations[
"crossplane.io/external-name"
]

desired_acl = aclv1beta1.BucketACL(
apiVersion="s3.aws.upbound.io/v1beta1",
kind="BucketACL",
metadata=metav1.ObjectMeta(
name=xr_name + "-acl",
),
spec=aclv1beta1.Spec(
forProvider=aclv1beta1.ForProvider(
region=params.region,
bucketRef=aclv1beta1.BucketRef(
name = bucket_name,
),
bucket=bucket_external_name,
acl=params.acl,
),
),
)
resource.update(rsp.desired.resources[acl.metadata.name], acl)
resource.update(rsp.desired.resources["acl"], desired_acl)

sse = ssev1beta1.BucketServerSideEncryptionConfiguration(
desired_sse = ssev1beta1.BucketServerSideEncryptionConfiguration(
apiVersion="s3.aws.upbound.io/v1beta1",
kind="BucketServerSideEncryptionConfiguration",
metadata=metav1.ObjectMeta(
name=xr_name + "-encryption",
),
spec=ssev1beta1.Spec(
forProvider=ssev1beta1.ForProvider(
region=params.region,
bucketRef=ssev1beta1.BucketRef(
name=bucket_name,
),
bucket=bucket_external_name,
rule=[
ssev1beta1.RuleItem(
applyServerSideEncryptionByDefault=[
Expand All @@ -70,27 +79,26 @@ def compose(req: fnv1.RunFunctionRequest, rsp: fnv1.RunFunctionResponse):
),
),
)
resource.update(rsp.desired.resources[sse.metadata.name], sse)
resource.update(rsp.desired.resources["sse"], desired_sse)

if params.versioning:
versioning = verv1beta1.BucketVersioning(
apiVersion="s3.aws.upbound.io/v1beta1",
kind="BucketVersioning",
metadata=metav1.ObjectMeta(
name=xr_name + "-versioning",
),
spec=verv1beta1.Spec(
forProvider=verv1beta1.ForProvider(
region=params.region,
bucketRef=verv1beta1.BucketRef(
name=bucket_name,
# Return early without composing a BucketVersioning MR if the XR doesn't
# have versioning enabled.
if not params.versioning:
return

desired_versioning = verv1beta1.BucketVersioning(
apiVersion="s3.aws.upbound.io/v1beta1",
kind="BucketVersioning",
spec=verv1beta1.Spec(
forProvider=verv1beta1.ForProvider(
region=params.region,
bucket=bucket_external_name,
versioningConfiguration=[
verv1beta1.VersioningConfigurationItem(
status="Enabled",
),
versioningConfiguration=[
verv1beta1.VersioningConfigurationItem(
status="Enabled",
),
],
),
)
)
resource.update(rsp.desired.resources[versioning.metadata.name], versioning)
],
),
),
)
resource.update(rsp.desired.resources["versioning"], desired_versioning)

0 comments on commit b06f958

Please sign in to comment.