Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚨🚨 Source Mailchimp: Migrate to Low code #35281

Merged
merged 82 commits into from
Apr 1, 2024

Conversation

artem1205
Copy link
Collaborator

@artem1205 artem1205 commented Feb 14, 2024

What

Resolve https://github.com/airbytehq/airbyte-internal-issues/issues/2919

How

migrate to Low-Code

Recommended reading order

  1. airbyte-integrations/connectors/source-mailchimp/source_mailchimp/manifest.yaml
  2. airbyte-integrations/connectors/source-mailchimp/source_mailchimp/components.py

🚨 User Impact 🚨

  • breaking change for nested state (parent child incremental)
  • fix segment_members primary key: ["id", "segment_id"]
  • fix list_members primary key: ["id", "list_id"]

Pre-merge Actions

Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Unit & integration tests added

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.

Copy link

vercel bot commented Feb 14, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Mar 29, 2024 11:13am

@octavia-squidington-iii octavia-squidington-iii added area/connectors Connector related issues CDK Connector Development Kit labels Feb 14, 2024
Copy link
Contributor

github-actions bot commented Feb 14, 2024

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

Comment on lines 86 to 117
class MailChimpRecordFilter(RecordFilter):
"""
Filter applied on a list of Records.
"""

def filter_records(
self,
records: List[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Mapping[str, Any]]:
current_state = [x for x in stream_state.get("states", []) if x["partition"]["id"] == stream_slice.partition["id"]]
# TODO: REF what to do if no start_date mentioned (see manifest)
# implement the same logic
start_date = self.config.get("start_date", (pendulum.now() - pendulum.duration(days=700)).to_iso8601_string())
if current_state and start_date:
filter_value = max(start_date, current_state[0]["cursor"][self.parameters["cursor_field"]])
return [record for record in records if record[self.parameters["cursor_field"]] > filter_value]
return records


class MailChimpRecordFilterEmailActivity(RecordFilter):
def filter_records(
self,
records: List[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Mapping[str, Any]]:

return [{**record, **activity_item} for record in records for activity_item in record.pop("activity", [])]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question so I learn: looks like this filter takes a dict of record and returns a separate row for each record and each activity in that record.

If there are no activities at all, the record will be filtered out. If there are multiple activities on a single record (if that is possible), that will return multiple row for each found activity item.

How wrong am I?

If that's correct, than this thing both filters records, but also transforms them and generates them, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're generally right:

this code may be written as follows:

data = response_json.get(self.data_field, [])
for item in data:
    for activity_item in item.pop("activity", []):
        yield {**item, **activity_item}

If that's correct, than this thing both filters records, but also transforms them and generates them, right?

Yep.
Standard Dpathextractor is used to replace first for in code above data = response_json.get(self.data_field, [])
and this custom class filters and extracts activities

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this defined as being a filter? Like Natik mentioned, I would expect this to be a custom transformation followed by a filter that checks if there were activities.

Separating the two concerns allows us to extract some logic that can be generalized.

Instead of creating a custom filter, we should introduce a flatten_field transformation which takes a nested object and brings it to the root

The transformation:

{
"key": 123,
"activities": {"nested": "value", "another_nested": "value2"} 
}

to 
{
"key": 123,
"nested:" value",
"another_nested": "value2"
}

would then be described as

transformations:
  type: FlattenField
  field_pointers:
    - activities

Here is a tentative schema definition:

  FlattenFields:
    title: Flatten Fields
    type: object
    required:
      - type
      - field_pointers
    properties:
      type:
        type: string
        enum: [FlattenFields]
      field_pointers:
        title: Field Paths
        type: array
        items:
          items:
            type: string
        examples:
          - ["activities"]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I can agree that this is more about transformation rather than filtering, but

{
"key": 123,
"activities": [
{"nested": "value"},
 {"another_nested": "value2"}]
}

should be transformed to 2 records as activities is a list :

[
  {
    "key": 123,
    "nested": "value"
  },
  {
    "key": 123,
    "another_nested": "value2"
  }
]

it means that we will have more records than original list, while current implementation does not expect the number of records to change:

def _transform(
self,
records: List[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
) -> None:
for record in records:
for transformation in self.transformations:
# record has type Mapping[str, Any], but Record expected
transformation.transform(record, config=self.config, stream_state=stream_state, stream_slice=stream_slice) # type: ignore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it! I did misunderstand.

Can you make a proposal for how you'd want to use this feature? When considering new features or custom components, it's useful to have a description, examples, and a desired interface (when known) to help us understand the use case.

Then we can implement and make any required changes to the record transform interface

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll get back with all propositions as soon as this PR will be in review, I'm still working on it.



@dataclass
class MailChimpRequester(HttpRequester):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you describe why this is needed? it's difficult to judge whether is should truly be custom behavior without stating the desired behavior and why it cannot be supported natively by the CDK

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, mailchimp declares custom api structure

There are a few ways to find your data center. It’s the first part of the URL you see in the API keys section of your account; if the URL is https://us6.mailchimp.com/account/api/, then the data center subdomain is us6. It’s also appended to your API key in the form key-dc; if your API key is 0123456789abcdef0123456789abcde-us6, then the data center subdomain is us6. And finally, if you’re connecting via OAuth 2, you can find the data center associated with the token via the OAuth Metadata endpoint; for more information, see the OAuth guide.

The only reason for this custom component is to get the data center prefix for Oauth authenticator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a custom requester instead of something like a config migrator?

],
ids=["API Key Invalid", "Forbidden", "Unknown Error"],
)
def test_check_connection_error(requests_mock, config, data_center, response, expected_message):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we considered the tradeoffs between deleting the tests and updating the fixture to run them in the new implementation?

data_center = self.config["credentials"]["apikey"].split("-").pop()
else:
data_center = self.get_oauth_data_center(self.config["credentials"]["access_token"])
self.config["data_center"] = data_center
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we modifying the config? As far as I know, this is an implementation detail. Why not remove the url_base field from the manifest?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly for visibility and not overriding the get_url_base method from base class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would agree with @girarda that it would be better to pass this as a parameter of __init__ else we create an object that is stateful. That being said, I don't know how all of this fits in the initialization process of the custom components. The other solution I would propose is to not use custom components and use only interpolation

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing all the comments and making sure there is a little impact as possible for the user!

Args:
- migrated_config (Mapping[str, Any]): The migrated configuration.
"""
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole method can be replaced by print(create_connector_config_control_message(migrated_config).json(exclude_unset=True)) and we can remove the message_repository from this class

@NAjustin
Copy link
Contributor

@artem1205 I found this PR while investigating an issue with the current version of the Mailchimp connector, and noticed that this version replicates the same error, so I wanted to flag it for you while I work on testing a PR for the main connector.

The issue is that the members object currently has primary_key set to id, which is incorrect:

  • In Mailchimp, members are associated with lists. This means there will be one member record per list they are on.
  • members.id` is a hash of the member's email address, which is obviously non-unique across lists
  • This means that if you do an incremental sync with the current connector (or this low-code version), you will only get one record per email.

You can see this issue here in your feature branch.

Either of the following keys would be suitable:

  • contact_id, which is a non-email-specific identifier for the member within the list (so is unique across email+list)
  • web_id is an integer and is also unique to the list+email combination
  • id and list_id could also be used as a compound key with the same effect

Mailchimp also discourages the use of id in their docs (see the members.contact_id response object), stating:

As Mailchimp evolves beyond email, you may eventually have contacts without email addresses. While the id is the MD5 hash of their email address, this contact_id is agnostic of contact’s inclusion of an email address.

This has been a longstanding issue in the Mailchimp source, but only affects accounts with the same contacts on multiple lists which may be why it's flown under the radar.

@artem1205
Copy link
Collaborator Author

@NAjustin, thank you for pointing that out!
composite primary key will be used: ["id", "list_id"]

Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Copy link
Contributor

@brianjlai brianjlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small reminder to re-run poetry lock before merging to use the latest version of airbyte-cdk. Thanks!

…ilchimp-low-code-35064

# Conflicts:
#	airbyte-integrations/connectors/source-mailchimp/metadata.yaml
#	airbyte-integrations/connectors/source-mailchimp/poetry.lock
#	airbyte-integrations/connectors/source-mailchimp/pyproject.toml
#	docs/integrations/sources/mailchimp.md
Signed-off-by: Artem Inzhyyants <[email protected]>
@artem1205 artem1205 merged commit 02add5b into master Apr 1, 2024
29 of 30 checks passed
@artem1205 artem1205 deleted the artem1205/source-mailchimp-low-code-35064 branch April 1, 2024 13:30
@lazebnyi lazebnyi added the low-code-migration This connector has been migrated to the low-code CDK label Apr 3, 2024
nurikk pushed a commit to nurikk/airbyte that referenced this pull request Apr 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation breaking-change Don't merge me unless you are ready. connectors/source/mailchimp low-code-migration This connector has been migrated to the low-code CDK
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants