Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Java GCP-IO Direct job #34019

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/beam_PreCommit_Java_GCP_IO_Direct.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ jobs:
:sdks:java:io:google-cloud-platform:expansion-service:build \
:sdks:java:io:google-cloud-platform:postCommit \
arguments: |
--info \
-PdisableSpotlessCheck=true \
-PdisableCheckStyle=true \
-PenableJacocoReport \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,15 @@ public static Iterable<Object[]> data() {
private static final int ORIGINAL_N = 60;
// for dynamic destination test
private static final int NUM_DESTINATIONS = 3;
private static final int TOTAL_NUM_STREAMS = 9;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

R: @ahmedabu98 who took care of SchemaUpdate tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akashorabek can we set TOTAL_NUM_STREAMS=6 and SCHEMA_UPDATE_TRIGGER=2? I worry we might be missing some unknown edge cases by restricting it too much

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private static final int TOTAL_NUM_STREAMS = 3;
// wait up to 60 seconds
private static final int SCHEMA_PROPAGATION_TIMEOUT_MS = 60000;
// interval between checks
private static final int SCHEMA_PROPAGATION_CHECK_INTERVAL_MS = 5000;
// wait for streams to recognize schema
private static final int STREAM_RECOGNITION_DELAY_MS = 15000;
// trigger for updating the schema when the row counter reaches this value
private static final int SCHEMA_UPDATE_TRIGGER = 1;

private final Random randomGenerator = new Random();

Expand Down Expand Up @@ -218,17 +226,43 @@ public void setup() {
public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState<Integer> counter)
throws Exception {
int current = firstNonNull(counter.read(), 0);
// We update schema early on to leave a healthy amount of time for StreamWriter to recognize
// it.
// We also update halfway through so that some writers are created *after* the schema update
if (current == TOTAL_NUM_STREAMS / 2) {
// We update schema early on to leave a healthy amount of time for the StreamWriter to recognize it,
// ensuring that subsequent writers are created with the updated schema.
if (current == SCHEMA_UPDATE_TRIGGER) {
for (Map.Entry<String, String> entry : newSchemas.entrySet()) {
bqClient.updateTableSchema(
projectId,
datasetId,
entry.getKey(),
BigQueryHelpers.fromJsonString(entry.getValue(), TableSchema.class));
}

// check that schema update propagated fully
long startTime = System.currentTimeMillis();
long timeoutMillis = SCHEMA_PROPAGATION_TIMEOUT_MS;
boolean schemaPropagated = false;
while (System.currentTimeMillis() - startTime < timeoutMillis) {
schemaPropagated = true;
for (Map.Entry<String, String> entry : newSchemas.entrySet()) {
TableSchema currentSchema = bqClient.getTableResource(projectId, datasetId, entry.getKey()).getSchema();
TableSchema expectedSchema = BigQueryHelpers.fromJsonString(entry.getValue(), TableSchema.class);
if (currentSchema.getFields().size() != expectedSchema.getFields().size()) {
schemaPropagated = false;
break;
}
}
if (schemaPropagated) {
break;
}
Thread.sleep(SCHEMA_PROPAGATION_CHECK_INTERVAL_MS);
}
if (!schemaPropagated) {
LOG.warn("Schema update did not propagate fully within the timeout.");
} else {
LOG.info("Schema update propagated fully within the timeout - {}.", System.currentTimeMillis() - startTime);
// wait for streams to recognize the new schema
Thread.sleep(STREAM_RECOGNITION_DELAY_MS);
}
}

counter.write(++current);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.sdk.io.gcp.datastore;

import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.verify;

import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -50,7 +49,7 @@ public class RampupThrottlingFnTest {
@Mock private Counter mockCounter;
private final Sleeper mockSleeper =
millis -> {
verify(mockCounter).inc(millis);
Copy link
Contributor

@Abacn Abacn Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you mind explaining a little bit how this fixed the flaky test

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verify expected exactly one call, but sometimes the code retries and calls inc(millis) multiple times causing the test to fail.

mockCounter.inc(millis);
throw new RampupDelayException();
};
private DoFnTester<String, String> rampupThrottlingFnTester;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
public class SpannerReadIT {

private static final int MAX_DB_NAME_LENGTH = 30;
private static final int CLEANUP_PROPAGATION_DELAY_MS = 5000;

@Rule public final transient TestPipeline p = TestPipeline.create();
@Rule public transient ExpectedException thrown = ExpectedException.none();
Expand Down Expand Up @@ -285,6 +286,12 @@ private CloseTransactionFn(SpannerConfig spannerConfig) {
public Transaction apply(Transaction tx) {
BatchClient batchClient = SpannerAccessor.getOrCreate(spannerConfig).getBatchClient();
batchClient.batchReadOnlyTransaction(tx.transactionId()).cleanup();
try {
// Wait for cleanup to propagate.
Thread.sleep(CLEANUP_PROPAGATION_DELAY_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return tx;
}
}
Expand Down
Loading