-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 Source Stripe: Implement StripePartition #31696
🎉 Source Stripe: Implement StripePartition #31696
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
Before Merging a Connector Pull RequestWow! What a great pull request you have here! 🎉 To merge this PR, ensure the following has been done/considered for each connector added or updated:
If the checklist is complete, but the CI check is failing,
|
source-stripe test report (commit
|
Step | Result |
---|---|
Build source-stripe docker image for platform(s) linux/amd64 | ✅ |
Unit tests | ✅ |
Acceptance tests | ❌ |
Check our base image is used | ❌ |
Code format checks | ✅ |
Validate metadata for source-stripe | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-stripe test
source-stripe test report (commit
|
Step | Result |
---|---|
Build source-stripe docker image for platform(s) linux/amd64 | ✅ |
Unit tests | ✅ |
Acceptance tests | ❌ |
Check our base image is used | ❌ |
Code format checks | ✅ |
Validate metadata for source-stripe | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-stripe test
source-stripe test report (commit
|
Step | Result |
---|---|
Build source-stripe docker image for platform(s) linux/amd64 | ✅ |
Unit tests | ✅ |
Acceptance tests | ❌ |
Check our base image is used | ❌ |
Code format checks | ✅ |
Validate metadata for source-stripe | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-stripe test
56ed099
to
64b5167
Compare
source-stripe test report (commit
|
Step | Result |
---|---|
Build source-stripe docker image for platform(s) linux/amd64 | ✅ |
Unit tests | ✅ |
Acceptance tests | ❌ |
Check our base image is used | ❌ |
Code format checks | ✅ |
Validate metadata for source-stripe | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-stripe test
64b5167
to
1c091df
Compare
source-stripe test report (commit
|
Step | Result |
---|---|
Build source-stripe docker image for platform(s) linux/amd64 | ✅ |
Unit tests | ✅ |
Acceptance tests | ❌ |
Check our base image is used | ❌ |
Code format checks | ✅ |
Validate metadata for source-stripe | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-stripe test
source-stripe test report (commit
|
Step | Result |
---|---|
Build source-stripe docker image for platform(s) linux/amd64 | ✅ |
Unit tests | ✅ |
Acceptance tests | ❌ |
Check our base image is used | ❌ |
Code format checks | ✅ |
Validate metadata for source-stripe | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-stripe test
@@ -168,7 +168,10 @@ def as_airbyte_stream(self) -> AirbyteStream: | |||
|
|||
keys = self._primary_key | |||
if keys and len(keys) > 0: | |||
stream.source_defined_primary_key = [keys] | |||
if isinstance(keys, str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you encounter a case where keys
was a string and not a list? This wouldn't respect the interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, agree, will fix
@@ -0,0 +1,130 @@ | |||
# |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you unit test these classes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -158,10 +220,203 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: | |||
], | |||
**args, | |||
) | |||
return [ | |||
|
|||
concurrent_streams = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should only create concurrent streams when running in full refresh. You can tell the source will run in full refresh mode by checking that there's no input state file. Here's an example
cursor_value="{{ last_records[-1]['id'] if last_records else None }}", | ||
config={}, | ||
parameters={}, | ||
page_size=1000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make this a constant
concurrent_stream = StreamFacade( | ||
ThreadBasedConcurrentStream( | ||
partition_generator=StripePartitionGenerator(base_stream, self.message_repository, paginated_requester), | ||
max_workers=1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make this a constant
self._requester = requester | ||
|
||
@property | ||
def request_parameters(self) -> Optional[Mapping[str, Any]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't expose these properties publicly for arbitrary source because not all sources are HTTP API sources
@property | ||
def request_parameters(self) -> Mapping[str, Any]: | ||
params = { | ||
"limit": 100, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this parameter should be set by the paginator
class SourcePartition(Partition): | ||
def __init__(self, _slice: Mapping[str, Any], message_repository: MessageRepository, requester: PaginatedRequester): | ||
self._slice = _slice | ||
self._message_repository = message_repository |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is the message repository needed? it doesn't seem used
def __init__(self, requester: Requester, record_selector: RecordSelector, paginator: DefaultPaginator): | ||
self._requester = requester | ||
self._record_selector = record_selector | ||
self._page_token_option = paginator.page_token_option |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why create a new field instead of always referring to paginator.page_token_option
?
…gy and <Source>PaginationStrategy
SessionTokenAuthenticator, | ||
LegacySessionTokenAuthenticator, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@girarda It seems that when was changed the SessionTokenAuthenticator
to LegacySessionTokenAuthenticator
, wasn't changed this file, so mypy
complained about this one. Let me know if this is not the case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting. would you mind fixing this in a separate PR? no reason to delay this fix with the stripe partition work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, I've created separate PR for this
@@ -56,7 +56,7 @@ | |||
""" | |||
CLASS_TYPES_REGISTRY contains a mapping of developer-friendly string -> class to abstract the specific class referred to | |||
""" | |||
CLASS_TYPES_REGISTRY: Mapping[str, Type] = { | |||
CLASS_TYPES_REGISTRY: Mapping[str, type] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I make changes to this file, mypy
finds the following error:
airbyte_cdk/sources/declarative/parsers/class_types_registry.py:59: error: Missing type parameters for generic type "Type" [type-arg]
I've created a separate PR to fix the SessionTokenAuthorization
, and you can see there that mypy
is failing with this error: https://github.com/airbytehq/airbyte/actions/runs/6719622494/job/18261600772?pr=32040
It seems that when using a Type
(imported from the typing
), we also need to specify a parameter in the annotation (Type[<param>]
).
Ours doesn't specify a parameter, so I replaced it with a generic type
to get around the mypy
error.
from airbyte_cdk.sources.declarative.types import Config | ||
from airbyte_cdk.sources.declarative.types import Config, Record | ||
|
||
DECODED_RESPONSE_TYPE = Union[Mapping[str, Any], List[dict[str, Any]]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is needed because the decoder can return either a mapping or a list, is this correct? If so, can you update the decoder to return a value of this type to document why this is needed
return token if token else None | ||
|
||
def reset(self): | ||
@abstractmethod | ||
def stop(self, response: DECODED_RESPONSE_TYPE, headers: Mapping[str, Any], last_records: List[Record]) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need both a stop_condition and a stop method? Maybe we can remove the stop_condition
property and only keep the private field in the LowCodeCursorPaginationStrategy
…/source-stripe/implement-partition
…package' of github.com:airbytehq/airbyte into ykurochkin/source-stripe/implement-partition # Conflicts: # airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/__init__.py # airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py
1ff33ee
to
1872989
Compare
…package' of github.com:airbytehq/airbyte into ykurochkin/source-stripe/implement-partition # Conflicts: # airbyte-integrations/connectors/source-stripe/source_stripe/source.py # docs/integrations/sources/stripe.md
…package' of github.com:airbytehq/airbyte into ykurochkin/source-stripe/implement-partition
…package' of github.com:airbytehq/airbyte into ykurochkin/source-stripe/implement-partition
@@ -5,7 +5,7 @@ | |||
|
|||
from setuptools import find_packages, setup | |||
|
|||
MAIN_REQUIREMENTS = ["airbyte-cdk==0.52.8", "stripe==2.56.0", "pendulum==2.1.2"] | |||
MAIN_REQUIREMENTS = ["airbyte-cdk>=0.52.8", "stripe==2.56.0", "pendulum==2.1.2"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's pin the version before releasing to avoid the connector accidentally breaking if we make a breaking change
return None | ||
|
||
|
||
class StripePaginationStrategy(CursorPaginationStrategy): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's write unit tests for this class
|
||
|
||
class StripePaginator(DefaultPaginator): | ||
def path(self) -> Optional[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's write unit tests for this method
…package' of github.com:airbytehq/airbyte into ykurochkin/source-stripe/implement-partition
Hey @yevhenii-ldv |
What
Resolves airbytehq/airbyte-internal-issues#2147.
How
Describe the solution
Recommended reading order
airbyte-integrations/connectors/source-stripe/source_stripe/partition.py
airbyte-integrations/connectors/source-stripe/source_stripe/source.py
Pre-merge Actions
Community member or Airbyter
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.