Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚨🚨 Source Mailchimp: Migrate to Low code #35281

Merged
merged 82 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
c9d43a6
Airbyte CDK: add CustomRecordFilter
artem1205 Feb 14, 2024
83736f2
Airbyte CDK: add interpolation for RequestOptions
artem1205 Feb 14, 2024
3a96f34
Source Mailchimp: migrate to Low-Code
artem1205 Feb 14, 2024
9d79879
Source Mailchimp: bump base image
artem1205 Feb 14, 2024
200d282
Source Mailchimp: remove unit tests
artem1205 Feb 14, 2024
9b76729
Source Mailchimp: add docstring
artem1205 Feb 14, 2024
b8173f1
Source Mailchimp: add segment_members transformation
artem1205 Feb 14, 2024
568d83c
Source Mailchimp: add tags transformation
artem1205 Feb 14, 2024
7eafce1
Source Mailchimp: use SelectiveAuthenticator
artem1205 Feb 14, 2024
7419188
Airbyte CDK: add filter to RemoveFields
artem1205 Feb 15, 2024
321ea38
Source Mailchimp: add transformation
artem1205 Feb 15, 2024
1b4ca32
Source Mailchimp: remove duplicating test
artem1205 Feb 15, 2024
90b9606
Source Mailchimp: ref MailChimpRecordExtractorEmailActivity
artem1205 Feb 16, 2024
24b1511
Source Mailchimp: add test
artem1205 Feb 16, 2024
0cb2273
Source Mailchimp: add unit tests
artem1205 Feb 16, 2024
2c25d95
Source Mailchimp: add test for components
artem1205 Feb 19, 2024
536c018
Source Mailchimp: add integration tests
artem1205 Feb 19, 2024
ccd2305
Source Mailchimp: ref manifest.yaml
artem1205 Feb 19, 2024
b08c4b7
Source Mailchimp: add unit tests
artem1205 Feb 19, 2024
d8ef2fc
Source Mailchimp: ref
artem1205 Feb 19, 2024
b2ccb38
Source Mailchimp: ref
artem1205 Feb 19, 2024
0d62d00
Source Mailchimp: fix deps
artem1205 Feb 19, 2024
9953738
Merge remote-tracking branch 'origin/master' into artem1205/source-ma…
artem1205 Feb 22, 2024
8cd2760
Merge remote-tracking branch 'origin/master' into artem1205/source-ma…
artem1205 Feb 22, 2024
7534433
Airbyte CDK: fix merge
artem1205 Feb 22, 2024
97a8f96
Merge remote-tracking branch 'origin/master' into artem1205/source-ma…
artem1205 Feb 23, 2024
0e9cdf1
Source Mailchimp: format code
artem1205 Feb 23, 2024
6594d4a
Source Mailchimp: update cdk
artem1205 Feb 23, 2024
55563a8
Source Mailchimp: format response
artem1205 Feb 23, 2024
89098c3
Source Mailchimp: ref component
artem1205 Feb 23, 2024
d020651
Source Mailchimp: add allowed hosts for oauth sync
artem1205 Feb 23, 2024
8641b82
Source Mailchimp: add unit test for source
artem1205 Feb 23, 2024
faf5f79
Source Mailchimp: bump version
artem1205 Feb 23, 2024
7837b76
Source Mailchimp: ref unit tests
artem1205 Feb 23, 2024
e6fc6d9
Source Mailchimp: clean code
artem1205 Feb 26, 2024
0fbd2ae
Merge remote-tracking branch 'origin/master' into artem1205/source-ma…
artem1205 Mar 6, 2024
591e852
Source MailChimp: update incremental
artem1205 Mar 6, 2024
425be4a
Source MailChimp: fix unit test
artem1205 Mar 6, 2024
1e369f9
Source MailChimp: fix unit test [skip ci]
artem1205 Mar 6, 2024
9a71be0
Source MailChimp: update abnormal_state
artem1205 Mar 6, 2024
f4c9591
Source MailChimp: update breaking changes
artem1205 Mar 6, 2024
8f69b55
Source MailChimp: fix formatting
artem1205 Mar 6, 2024
0fbcb0f
Source Mailchimp: refactor
artem1205 Mar 11, 2024
ef3e0ec
Source Mailchimp: rename page size
artem1205 Mar 11, 2024
f8c19f9
Source Mailchimp: change step to 100 years
artem1205 Mar 11, 2024
23d305e
Merge remote-tracking branch 'origin/master' into artem1205/source-ma…
artem1205 Mar 11, 2024
b89ba8d
Source Mailchimp: fix primary key
artem1205 Mar 11, 2024
8831889
Source Mailchimp: update docs
artem1205 Mar 11, 2024
e38201a
Source Mailchimp: update docs
artem1205 Mar 11, 2024
20a83a1
Source Mailchimp: remove step
artem1205 Mar 11, 2024
966f90b
Source Mailchimp: update tag
artem1205 Mar 11, 2024
2aeb0f4
Source Mailchimp: remove cursor_granularity
artem1205 Mar 11, 2024
944666b
Source Mailchimp: update CDK
artem1205 Mar 12, 2024
cfdc451
Source Mailchimp: remove unused campaign_id
artem1205 Mar 12, 2024
c54ea4e
Source MailChimp: refactor requester to config migration
artem1205 Mar 13, 2024
057e519
Source MailChimp: refactor requester to config migration
artem1205 Mar 13, 2024
01b2213
Source MailChimp: fix test config
artem1205 Mar 13, 2024
e1be9a7
Source MailChimp: remove unused todo
artem1205 Mar 13, 2024
b6d84d3
Source MailChimp: add lookback window
artem1205 Mar 13, 2024
7d5eb7f
Source MailChimp: fix formatting
artem1205 Mar 13, 2024
7732a44
Source MailChimp: fix integration tests
artem1205 Mar 13, 2024
77369ee
Source MailChimp: format
artem1205 Mar 13, 2024
bd956e7
Merge branch 'master' into artem1205/source-mailchimp-low-code-35064
artem1205 Mar 14, 2024
0dcc86c
Source MailChimp: bump lock
artem1205 Mar 14, 2024
d992ec3
Source MailChimp: set start date to 1970-01-01
artem1205 Mar 17, 2024
f703e00
Source MailChimp: remove message_repository
artem1205 Mar 18, 2024
82526f7
Source MailChimp: update docs
artem1205 Mar 18, 2024
dbe6cf2
Merge remote-tracking branch 'origin/master' into artem1205/source-ma…
artem1205 Mar 18, 2024
f087be1
Source MailChimp: format
artem1205 Mar 18, 2024
6e92e8e
Source MailChimp: format
artem1205 Mar 18, 2024
cc9299a
Merge remote-tracking branch 'origin/master' into artem1205/source-ma…
artem1205 Mar 26, 2024
0124f5e
Source MailChimp: update primary key
artem1205 Mar 26, 2024
ba51d7f
Source MailChimp: update poetry
artem1205 Mar 26, 2024
bd79818
Source MailChimp: update poetry
artem1205 Mar 26, 2024
2550ba7
Source MailChimp: add LegacyToPerPartitionStateMigration
artem1205 Mar 26, 2024
c6953ab
Source Mailchimp: fix state migration
artem1205 Mar 26, 2024
38dd9a4
Source MailChimp: update acceptance test
artem1205 Mar 27, 2024
08be185
Source MailChimp: update acceptance test
artem1205 Mar 27, 2024
f903771
Source MailChimp: update docs
artem1205 Mar 27, 2024
c556708
Source MailChimp: fix formatting
artem1205 Mar 27, 2024
df2ee5b
Merge remote-tracking branch 'origin/master' into artem1205/source-ma…
artem1205 Mar 29, 2024
ffd2ca3
Source Mailchimp: bump CDK version
artem1205 Mar 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,27 @@ definitions:
$parameters:
type: object
additionalProperties: true
CustomRecordFilter:
title: Custom Record Extractor
description: Record extractor component whose behavior is derived from a custom code implementation of the connector.
type: object
additionalProperties: true
required:
- type
- class_name
properties:
type:
type: string
enum: [CustomRecordFilter]
class_name:
title: Class Name
description: Fully-qualified name of the class that will be implementing the custom record filter strategy. The format is `source_<name>.<package>.<class_name>`.
type: string
examples:
- "source_railz.components.MyCustomCustomRecordFilter"
$parameters:
type: object
additionalProperties: true
CustomRequester:
title: Custom Requester
description: Requester component whose behavior is derived from a custom code implementation of the connector.
Expand Down Expand Up @@ -1819,7 +1840,9 @@ definitions:
record_filter:
title: Record Filter
description: Responsible for filtering records to be emitted by the Source.
"$ref": "#/definitions/RecordFilter"
anyOf:
- "$ref": "#/definitions/CustomRecordFilter"
- "$ref": "#/definitions/RecordFilter"
schema_normalization:
"$ref": "#/definitions/SchemaNormalization"
default: None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,11 @@ def request_kwargs(self) -> Mapping[str, Any]:
def _get_request_options(self, option_type: RequestOptionType, stream_slice: StreamSlice):
options = {}
if self.start_time_option and self.start_time_option.inject_into == option_type:
options[self.start_time_option.field_name] = stream_slice.get(self.partition_field_start.eval(self.config))
options[self.start_time_option.field_name.eval(config=self.config)] = stream_slice.get(
self.partition_field_start.eval(self.config)
)
if self.end_time_option and self.end_time_option.inject_into == option_type:
options[self.end_time_option.field_name] = stream_slice.get(self.partition_field_end.eval(self.config))
options[self.end_time_option.field_name.eval(config=self.config)] = stream_slice.get(self.partition_field_end.eval(self.config))
return options

def should_be_synced(self, record: Record) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,20 @@ class Config:
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class CustomRecordFilter(BaseModel):
class Config:
extra = Extra.allow

type: Literal['CustomRecordFilter']
class_name: str = Field(
...,
description='Fully-qualified name of the class that will be implementing the custom record filtering. The format is `source_<name>.<package>.<class_name>`.',
examples=['source_railz.components.MyCustomRecordFilter'],
title='Class Name',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class CustomRequester(BaseModel):
class Config:
extra = Extra.allow
Expand Down Expand Up @@ -1019,7 +1033,7 @@ class ListPartitionRouter(BaseModel):
class RecordSelector(BaseModel):
type: Literal['RecordSelector']
extractor: Union[CustomRecordExtractor, DpathExtractor]
record_filter: Optional[RecordFilter] = Field(
record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
None,
description='Responsible for filtering records to be emitted by the Source.',
title='Record Filter',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomPaginationStrategy as CustomPaginationStrategyModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomPartitionRouter as CustomPartitionRouterModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomRecordExtractor as CustomRecordExtractorModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomRecordFilter as CustomRecordFilterModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomRequester as CustomRequesterModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomRetriever as CustomRetrieverModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomTransformation as CustomTransformationModel
Expand Down Expand Up @@ -161,6 +162,7 @@ def _init_mappings(self) -> None:
CustomErrorHandlerModel: self.create_custom_component,
CustomIncrementalSyncModel: self.create_custom_component,
CustomRecordExtractorModel: self.create_custom_component,
CustomRecordFilterModel: self.create_custom_component,
CustomRequesterModel: self.create_custom_component,
CustomRetrieverModel: self.create_custom_component,
CustomPaginationStrategyModel: self.create_custom_component,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ def _get_request_options(self, option_type: RequestOptionType) -> MutableMapping
and isinstance(self.page_token_option, RequestOption)
and self.page_token_option.inject_into == option_type
):
options[self.page_token_option.field_name] = self._token
options[self.page_token_option.field_name.eval(config=self.config)] = self._token
if self.page_size_option and self.pagination_strategy.get_page_size() and self.page_size_option.inject_into == option_type:
options[self.page_size_option.field_name] = self.pagination_strategy.get_page_size()
options[self.page_size_option.field_name.eval(config=self.config)] = self.pagination_strategy.get_page_size()
return options


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

from dataclasses import InitVar, dataclass
from enum import Enum
from typing import Any, Mapping
from typing import Any, Mapping, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString


class RequestOptionType(Enum):
Expand All @@ -28,6 +30,9 @@ class RequestOption:
inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter
"""

field_name: str
field_name: Union[InterpolatedString, str]
inject_into: RequestOptionType
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.field_name = InterpolatedString.create(self.field_name, parameters=parameters)
28 changes: 5 additions & 23 deletions airbyte-integrations/connectors/source-mailchimp/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ include = "source_mailchimp"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "==0.58.8"
airbyte-cdk = "^0.62.1"
pytest = "==6.2.5"

[tool.poetry.scripts]
source-mailchimp = "source_mailchimp.run:run"

[tool.poetry.group.dev.dependencies]
pytest-mock = "^3.6.1"
responses = "^0.19.0"
requests-mock = "^1.9.3"
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from dataclasses import dataclass
from typing import Any, List, Mapping, Optional

import pendulum
import requests
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.auth.token import BasicHttpAuthenticator, BearerAuthenticator
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
RequestInput,
)
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState


@dataclass
class MailChimpRequester(HttpRequester):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you describe why this is needed? it's difficult to judge whether is should truly be custom behavior without stating the desired behavior and why it cannot be supported natively by the CDK

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, mailchimp declares custom api structure

There are a few ways to find your data center. It’s the first part of the URL you see in the API keys section of your account; if the URL is https://us6.mailchimp.com/account/api/, then the data center subdomain is us6. It’s also appended to your API key in the form key-dc; if your API key is 0123456789abcdef0123456789abcde-us6, then the data center subdomain is us6. And finally, if you’re connecting via OAuth 2, you can find the data center associated with the token via the OAuth Metadata endpoint; for more information, see the OAuth guide.

The only reason for this custom component is to get the data center prefix for Oauth authenticator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a custom requester instead of something like a config migrator?


request_body_json: Optional[RequestInput] = None
request_headers: Optional[RequestInput] = None
request_parameters: Optional[RequestInput] = None
request_body_data: Optional[RequestInput] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:

self.request_options_provider = InterpolatedRequestOptionsProvider(
request_body_data=self.request_body_data,
request_body_json=self.request_body_json,
request_headers=self.request_headers,
request_parameters=self.request_parameters,
config=self.config,
parameters=parameters or {},
)
super().__post_init__(parameters)

def get_url_base(self) -> str:
self.get_data_center_location()
return super().get_url_base()

def get_data_center_location(self):
if not self.config.get("data_center"):
if isinstance(self.authenticator, BasicHttpAuthenticator):
data_center = self.config["credentials"]["apikey"].split("-").pop()
else:
data_center = self.get_oauth_data_center(self.config["credentials"]["access_token"])
self.config["data_center"] = data_center
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we modifying the config? As far as I know, this is an implementation detail. Why not remove the url_base field from the manifest?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly for visibility and not overriding the get_url_base method from base class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would agree with @girarda that it would be better to pass this as a parameter of __init__ else we create an object that is stateful. That being said, I don't know how all of this fits in the initialization process of the custom components. The other solution I would propose is to not use custom components and use only interpolation


@staticmethod
def get_oauth_data_center(access_token: str) -> str:
"""
Every Mailchimp API request must be sent to a specific data center.
The data center is already embedded in API keys, but not OAuth access tokens.
This method retrieves the data center for OAuth credentials.
"""
try:
response = requests.get(
"https://login.mailchimp.com/oauth2/metadata", headers={"Authorization": "OAuth {}".format(access_token)}
)

# Requests to this endpoint will return a 200 status code even if the access token is invalid.
error = response.json().get("error")
if error == "invalid_token":
raise ValueError("The access token you provided was invalid. Please check your credentials and try again.")
return response.json()["dc"]

# Handle any other exceptions that may occur.
except Exception as e:
raise Exception(f"An error occured while retrieving the data center for your account. \n {repr(e)}")


class MailChimpAuthenticator(DeclarativeAuthenticator):
artem1205 marked this conversation as resolved.
Show resolved Hide resolved
config: Mapping[str, Any]
bearer: BearerAuthenticator
basic: BasicHttpAuthenticator

def __new__(cls, bearer, basic, config, *args, **kwargs):
if config.get("credentials", {}).get("auth_type") == "oauth2.0":
return bearer
else:
return basic


class MailChimpRecordFilter(RecordFilter):
"""
Filter applied on a list of Records.
"""

def filter_records(
self,
records: List[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Mapping[str, Any]]:
current_state = [x for x in stream_state.get("states", []) if x["partition"]["id"] == stream_slice.partition["id"]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to @girarda: semi-incremental needs to work with per partition states

# TODO: REF what to do if no start_date mentioned (see manifest)
maxi297 marked this conversation as resolved.
Show resolved Hide resolved
# implement the same logic
start_date = self.config.get("start_date", (pendulum.now() - pendulum.duration(days=700)).to_iso8601_string())
if current_state and start_date:
filter_value = max(start_date, current_state[0]["cursor"][self.parameters["cursor_field"]])
return [record for record in records if record[self.parameters["cursor_field"]] > filter_value]
return records


class MailChimpRecordFilterEmailActivity(RecordFilter):
def filter_records(
self,
records: List[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Mapping[str, Any]]:

return [{**record, **activity_item} for record in records for activity_item in record.pop("activity", [])]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question so I learn: looks like this filter takes a dict of record and returns a separate row for each record and each activity in that record.

If there are no activities at all, the record will be filtered out. If there are multiple activities on a single record (if that is possible), that will return multiple row for each found activity item.

How wrong am I?

If that's correct, than this thing both filters records, but also transforms them and generates them, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're generally right:

this code may be written as follows:

data = response_json.get(self.data_field, [])
for item in data:
    for activity_item in item.pop("activity", []):
        yield {**item, **activity_item}

If that's correct, than this thing both filters records, but also transforms them and generates them, right?

Yep.
Standard Dpathextractor is used to replace first for in code above data = response_json.get(self.data_field, [])
and this custom class filters and extracts activities

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this defined as being a filter? Like Natik mentioned, I would expect this to be a custom transformation followed by a filter that checks if there were activities.

Separating the two concerns allows us to extract some logic that can be generalized.

Instead of creating a custom filter, we should introduce a flatten_field transformation which takes a nested object and brings it to the root

The transformation:

{
"key": 123,
"activities": {"nested": "value", "another_nested": "value2"} 
}

to 
{
"key": 123,
"nested:" value",
"another_nested": "value2"
}

would then be described as

transformations:
  type: FlattenField
  field_pointers:
    - activities

Here is a tentative schema definition:

  FlattenFields:
    title: Flatten Fields
    type: object
    required:
      - type
      - field_pointers
    properties:
      type:
        type: string
        enum: [FlattenFields]
      field_pointers:
        title: Field Paths
        type: array
        items:
          items:
            type: string
        examples:
          - ["activities"]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I can agree that this is more about transformation rather than filtering, but

{
"key": 123,
"activities": [
{"nested": "value"},
 {"another_nested": "value2"}]
}

should be transformed to 2 records as activities is a list :

[
  {
    "key": 123,
    "nested": "value"
  },
  {
    "key": 123,
    "another_nested": "value2"
  }
]

it means that we will have more records than original list, while current implementation does not expect the number of records to change:

def _transform(
self,
records: List[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
) -> None:
for record in records:
for transformation in self.transformations:
# record has type Mapping[str, Any], but Record expected
transformation.transform(record, config=self.config, stream_state=stream_state, stream_slice=stream_slice) # type: ignore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it! I did misunderstand.

Can you make a proposal for how you'd want to use this feature? When considering new features or custom components, it's useful to have a description, examples, and a desired interface (when known) to help us understand the use case.

Then we can implement and make any required changes to the record transform interface

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll get back with all propositions as soon as this PR will be in review, I'm still working on it.

Loading
Loading