-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stabilize adaptive rate limit by considering current rate #1027
Conversation
Signed-off-by: Tomoyuki Morita <[email protected]>
I'll address integ test failure. |
Signed-off-by: Tomoyuki Morita <[email protected]>
Signed-off-by: Tomoyuki Morita <[email protected]>
Signed-off-by: Tomoyuki Morita <[email protected]>
Signed-off-by: Tomoyuki Morita <[email protected]>
flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterImpl.java
Show resolved
Hide resolved
flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapper.java
Show resolved
Hide resolved
flint-core/src/main/scala/org/opensearch/flint/core/storage/RequestRateMeter.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Tomoyuki Morita <[email protected]>
@@ -543,6 +543,8 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i | |||
- `spark.datasource.flint.read.scroll_size`: default value is 100. | |||
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration. | |||
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry. | |||
- `spark.datasource.flint.retry.bulk.max_retries`: max retries on failed bulk request. default value is 10. Use 0 to disable retry. | |||
- `spark.datasource.flint.retry.bulk.initial_backoff`: initial backoff in seconds for bulk request retry, default is 4. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason choose 4s as default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was fixed value and I made it a configuration. The original intention is having higher initial backoff to quickly reduce the rate.
flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala
Outdated
Show resolved
Hide resolved
private Queue<DataPoint> dataPoints = new LinkedList<>(); | ||
private long currentSum = 0; | ||
|
||
synchronized void addDataPoint(long timestamp, long requestCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, could we consider reduce the datapoints maintained by using bucket-based aggregation which have bounde memory usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Number of data points tend to be small like around 1 ~ 20 since each batch contain around 5k requests (we put data point for each batch). And memory usage is almost ignorable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added removeOldDataPoints in addDataPoint.
|
||
synchronized void addDataPoint(long timestamp, long requestCount) { | ||
dataPoints.add(new DataPoint(timestamp, requestCount)); | ||
currentSum += requestCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could removeOldDataPoint during add to reduce memory usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above. Memory usage is almost ignorable.
Signed-off-by: Tomoyuki Morita <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes!
Signed-off-by: Tomoyuki Morita <[email protected]>
Signed-off-by: Tomoyuki Morita <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ykmr1224
Are you familiar with resilience4j ?
I think this can be an interesting option for better resilient with managing fault tolerance for remote communications.
We already use it in SQL repo
I checked resilience4j earlier, do we see any additional benefit than using Failsafe library which we are currently using? |
Signed-off-by: Tomoyuki Morita <[email protected]>
IMO there is a benefit of using the same library in both repos (SQL/spark)... |
Description
Related Issues
Check List
--signoff
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.