Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ecarton/cumulus 3751 pg update #3916

Open
wants to merge 44 commits into
base: ecarton/cumulus-3751-s3-task
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
863f114
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Jan 28, 2025
a192620
first cpypasta
etcart Jan 28, 2025
ab17fb1
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Jan 28, 2025
65ca137
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Jan 28, 2025
0b6ef26
WIP hevaily pared down index
etcart Jan 28, 2025
4e3d597
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Jan 29, 2025
8f86897
more cleanup
etcart Jan 29, 2025
666f483
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Jan 29, 2025
68275e1
WIP testing
etcart Jan 29, 2025
cd70353
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Jan 29, 2025
45244d7
wip more test fixes
etcart Jan 29, 2025
0a2a90b
updating tests
etcart Jan 29, 2025
03288aa
WIP adding tf config
etcart Jan 29, 2025
1546f8d
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Jan 29, 2025
9dd637f
replciating s3 form to new pg
etcart Jan 29, 2025
30ce16b
WIP
etcart Jan 29, 2025
5eca38e
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Jan 29, 2025
3e115bb
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Jan 30, 2025
6814085
cleanup of task code
etcart Jan 30, 2025
c8f6250
pg task in tf
etcart Jan 30, 2025
59819e6
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Jan 30, 2025
e68ad55
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Jan 30, 2025
a2c49e6
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Feb 5, 2025
3039a01
small updates with the latest logic from upstream
etcart Feb 5, 2025
c6d7f18
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Feb 5, 2025
6a1b316
pg int tests in new paradigm
etcart Feb 5, 2025
027000f
fix input ocnfig for pg change
etcart Feb 5, 2025
71c4d7e
need to use collection instead of sourceCollection
etcart Feb 5, 2025
2bc3343
correct version config
etcart Feb 6, 2025
da803d1
fleshing out workflow with pg
etcart Feb 6, 2025
6cdce57
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Feb 6, 2025
1f10f45
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Feb 6, 2025
d874a90
some more fixes to dependencies
etcart Feb 6, 2025
ab9a5db
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Feb 6, 2025
7670181
proper routing in workflow
etcart Feb 6, 2025
c0221e4
removing recursive cmr data search relying instead on spec
etcart Feb 6, 2025
4dfe06e
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Feb 7, 2025
805c9d4
linting
etcart Feb 7, 2025
56a2ec5
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Feb 7, 2025
63c7abe
wip better workflow test
etcart Feb 7, 2025
07b5ab4
Merge branch 'ecarton/cumulus-3751-s3-task' into ecarton/CUMULUS-3751…
etcart Feb 7, 2025
af513ab
more direct integration test of workflow
etcart Feb 7, 2025
4635fb1
linting
etcart Feb 7, 2025
fd3d550
linting fixes
etcart Feb 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
'use strict';

const { InvokeCommand } = require('@aws-sdk/client-lambda');
const { lambda } = require('@cumulus/aws-client/services');
const fs = require('fs');
const {
deleteS3Object,
} = require('@cumulus/aws-client/S3');
const { waitForListObjectsV2ResultCount, addCollections, addProviders } = require('@cumulus/integration-tests');

const { getGranule, deleteGranule, removePublishedGranule } = require('@cumulus/api-client/granules');
const { constructCollectionId } = require('@cumulus/message/Collections');
const { deleteExecution } = require('@cumulus/api-client/executions');
const { setupTestGranuleForIngest } = require('../../helpers/granuleUtils');
const { waitForApiStatus } = require('../../helpers/apiUtils');
const { buildAndStartWorkflow } = require('../../helpers/workflowUtils');
const { loadConfig, createTimestampedTestId, createTestSuffix, createTestDataPath, uploadTestDataToBucket } = require('../../helpers/testUtils');

describe('when ChangeGranuleCollectionPG is called', () => {
let stackName;
let config;
let inputPayload;
let provider;
let testDataFolder;
let granuleId;
let startingFiles;
let finalFiles;
let collection;
let targetCollection;
let ingestExecutionArn;
let cleanupCollectionId;
afterAll(async () => {
try {
await removePublishedGranule({
prefix: config.stackName,
granuleId,
collectionId: cleanupCollectionId,
});
let cleanup = finalFiles.map((fileObj) => deleteS3Object(
fileObj.bucket,
fileObj.key
));
cleanup.concat(startingFiles.map((fileObj) => deleteS3Object(
fileObj.bucket,
fileObj.key
)));
cleanup = cleanup.concat([
deleteExecution({ prefix: config.stackName, executionArn: ingestExecutionArn }),
deleteGranule({ prefix: config.stackName, granuleId: granuleId }),
]);

await Promise.all(cleanup);
} catch (error) {
console.log('cleanup failed with error', error);
}
});
beforeAll(async () => {
const inputPayloadFilename = './spec/parallel/ingestGranule/IngestGranule.input.payload.json';
const providersDir = './data/providers/s3/';
const s3data = [
'@cumulus/test-data/granules/MOD09GQ.A2016358.h13v04.006.2016360104606.hdf.met',
'@cumulus/test-data/granules/MOD09GQ.A2016358.h13v04.006.2016360104606.hdf',
'@cumulus/test-data/granules/MOD09GQ.A2016358.h13v04.006.2016360104606_ndvi.jpg',
];

const collectionsDir = './data/collections/s3_MOD09GQ_006_full_ingest';
const targetCollectionsDir = './data/collections/s3_MOD09GQ_007_full_ingest_move';
const granuleRegex = '^MOD09GQ\\.A[\\d]{7}\\.[\\w]{6}\\.006\\.[\\d]{13}$';
config = await loadConfig();
stackName = config.stackName;
const testId = createTimestampedTestId(stackName, 'IngestGranuleSuccess');
const testSuffix = createTestSuffix(testId);
testDataFolder = createTestDataPath(testId);

collection = { name: `MOD09GQ${testSuffix}`, version: '006' };
targetCollection = { name: `MOD09GQ${testSuffix}`, version: '007' };
provider = { id: `s3_provider${testSuffix}` };

// populate collections, providers and test data
await Promise.all([
uploadTestDataToBucket(config.bucket, s3data, testDataFolder),
addCollections(stackName, config.bucket, collectionsDir, testSuffix, testId),
addCollections(stackName, config.bucket, targetCollectionsDir, testSuffix, testId),
addProviders(stackName, config.bucket, providersDir, config.bucket, testSuffix),
]);

const inputPayloadJson = fs.readFileSync(inputPayloadFilename, 'utf8');
// update test data filepaths
inputPayload = await setupTestGranuleForIngest(
config.bucket,
JSON.stringify({ ...JSON.parse(inputPayloadJson), pdr: undefined }),
granuleRegex,
testSuffix,
testDataFolder
);
granuleId = inputPayload.granules[0].granuleId;

ingestExecutionArn = await buildAndStartWorkflow(
stackName,
config.bucket,
'IngestAndPublishGranuleWithOrca',
collection,
provider,
inputPayload
);

await waitForApiStatus(
getGranule,
{
prefix: stackName,
granuleId: inputPayload.granules[0].granuleId,
collectionId: constructCollectionId(collection.name, collection.version),
},
'completed'
);
});

describe('under normal circumstances', () => {
let beforeAllFailed = false;
beforeAll(async () => {
startingFiles = (await getGranule({
prefix: stackName,
granuleId: granuleId,
})).files;
//upload to cumulus
try {
const { $metadata, Payload } = await lambda().send(new InvokeCommand({
FunctionName: `${stackName}-ChangeGranuleCollectionS3`,
InvocationType: 'RequestResponse',
Payload: JSON.stringify({
cma: {
meta: {
targetCollection,
collection,
buckets: config.buckets,
},
task_config: {
buckets: '{$.meta.buckets}',
collection: '{$.meta.collection}',
targetCollection: '{$.meta.targetCollection}',
},
event: {
payload: { granuleIds: [granuleId] },
},
},
}),
}));
const outputGranule = JSON.parse(new TextDecoder('utf-8').decode(Payload)).payload.granules[0];
if ($metadata.httpStatusCode >= 400) {
console.log(`lambda invocation to set up failed, code ${$metadata.httpStatusCode}`);
}
finalFiles = outputGranule.files;

const { $metadata: PGmetadata } = await lambda().send(new InvokeCommand({
FunctionName: `${stackName}-ChangeGranuleCollectionPG`,
InvocationType: 'RequestResponse',
Payload: JSON.stringify({
cma: {
meta: {
targetCollection,
collection,
buckets: config.buckets,
},
task_config: {
buckets: '{$.meta.buckets}',
collection: '{$.meta.collection}',
targetCollection: '{$.meta.targetCollection}',
},
event: {
payload: { granules: [outputGranule] },
},
},
}),
}));
if (PGmetadata.httpStatusCode >= 400) {
console.log(`lambda invocation to set up failed, code ${PGmetadata.httpStatusCode}`);
}
await Promise.all(finalFiles.map((file) => expectAsync(
waitForListObjectsV2ResultCount({
bucket: file.bucket,
prefix: file.key,
desiredCount: 1,
interval: 5 * 1000,
timeout: 60 * 1000,
})
).toBeResolved()));
} catch (error) {
console.log(`files do not appear to have been moved: error: ${error}`);
beforeAllFailed = true;
}
});
it('updates the granule data in pg', async () => {
if (beforeAllFailed) fail('beforeAllFailed');
const granuleRecord = await getGranule({
prefix: config.stackName,
granuleId,
});
expect(granuleRecord.collectionId).toEqual(constructCollectionId(targetCollection.name, targetCollection.version));
const finalKeys = finalFiles.map((file) => file.key);
const finalBuckets = finalFiles.map((file) => file.bucket);

granuleRecord.files.map((file) => {
expect(finalBuckets.includes(file.bucket)).toBe(true);
expect(finalKeys.includes(file.key)).toBe(true);
return null;
});
});
});
});
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
const { deleteExecution } = require('@cumulus/api-client/executions');
const fs = require('fs');
const { waitForListObjectsV2ResultCount, addCollections, addProviders } = require('@cumulus/integration-tests');
const { addCollections, addProviders } = require('@cumulus/integration-tests');
const {
deleteS3Object,
s3ObjectExists,
} = require('@cumulus/aws-client/S3');
const { constructCollectionId } = require('@cumulus/message/Collections');
const { deleteGranule, getGranule, removePublishedGranule } = require('@cumulus/api-client/granules');
const { buildAndStartWorkflow } = require('../../helpers/workflowUtils');
const { buildAndStartWorkflow, buildAndExecuteWorkflow } = require('../../helpers/workflowUtils');
const { loadConfig, createTestSuffix, createTimestampedTestId, uploadTestDataToBucket, createTestDataPath } = require('../../helpers/testUtils');
const { waitForApiStatus } = require('../../helpers/apiUtils');
const { setupTestGranuleForIngest } = require('../../helpers/granuleUtils');
Expand All @@ -24,8 +24,7 @@ const s3data = [
const providersDir = './data/providers/s3/';
const collectionsDir = './data/collections/s3_MOD09GQ_006_full_ingest';
const targetCollectionsDir = './data/collections/s3_MOD09GQ_007_full_ingest_move';
let collection;
let targetCollection;

const inputPayloadFilename = './spec/parallel/ingestGranule/IngestGranule.input.payload.json';

describe('The MoveGranuleCollections workflow', () => {
Expand All @@ -40,27 +39,8 @@ describe('The MoveGranuleCollections workflow', () => {
let ingestExecutionArn;
let cleanupCollectionId;
let moveExecutionArn;
afterAll(async () => {
try {
await removePublishedGranule({
prefix: config.stackName,
granuleId,
collectionId: cleanupCollectionId,
});
let cleanup = finalFiles.map((fileObj) => deleteS3Object(
fileObj.bucket,
fileObj.key
));
cleanup = cleanup.concat([
deleteExecution({ prefix: config.stackName, executionArn: ingestExecutionArn }),
deleteExecution({ prefix: config.stackName, executionArn: moveExecutionArn }),
deleteGranule({ prefix: config.stackName, granuleId: granuleId }),
]);
await Promise.all(cleanup);
} catch (error) {
console.log('cleanup failed with error', error);
}
});
let collection;
let targetCollection;
beforeAll(async () => {
config = await loadConfig();
stackName = config.stackName;
Expand Down Expand Up @@ -104,8 +84,16 @@ describe('The MoveGranuleCollections workflow', () => {
},
'completed'
);
const startingGranule = await getGranule({
prefix: stackName,
granuleId,
});
finalFiles = startingGranule.files.map((file) => ({
...file,
key: `changedCollectionPath/MOD09GQ${testSuffix}___007/${testId}/${file.fileName}`,
}));
try {
moveExecutionArn = await buildAndStartWorkflow(
await buildAndExecuteWorkflow(
stackName,
config.bucket,
'MoveGranuleCollectionsWorkflow',
Expand All @@ -118,34 +106,52 @@ describe('The MoveGranuleCollections workflow', () => {
targetCollection,
}
);
const startingGranule = await getGranule({
} catch (error) {
console.log(`files do not appear to have been moved: error: ${error}`);
beforeAllFailed = true;
}
});
afterAll(async () => {
try {
await removePublishedGranule({
prefix: stackName,
granuleId,
collectionId: cleanupCollectionId,
});
finalFiles = startingGranule.files.map((file) => ({
...file,
key: `changedCollectionPath/MOD09GQ${testSuffix}___007/${testId}/${file.fileName}`,
}));

await Promise.all(finalFiles.map((file) => expectAsync(
waitForListObjectsV2ResultCount({
bucket: file.bucket,
prefix: file.key,
desiredCount: 1,
interval: 5 * 1000,
timeout: 60 * 1000,
})
).toBeResolved()));
let cleanup = finalFiles.map((fileObj) => deleteS3Object(
fileObj.bucket,
fileObj.key
));
cleanup = cleanup.concat([
deleteExecution({ prefix: stackName, executionArn: ingestExecutionArn }),
deleteExecution({ prefix: stackName, executionArn: moveExecutionArn }),
deleteGranule({ prefix: stackName, granuleId: granuleId }),
]);
await Promise.all(cleanup);
} catch (error) {
console.log(`files do not appear to have been moved: error: ${error}`);
beforeAllFailed = true;
// eslint-disable-line no-empty
}
});

it('updates the granule data in s3', async () => {
if (beforeAllFailed) fail('beforeAllFailed');
await Promise.all(finalFiles.map(async (file) => {
expect(await s3ObjectExists({ Bucket: file.bucket, Key: file.key })).toEqual(true);
}));
});
it('updates the granule data in pg', async () => {
if (beforeAllFailed) fail('beforeAllFailed');
const pgGranule = await getGranule({
prefix: stackName,
granuleId,
});
expect(pgGranule.collectionId).toEqual(
constructCollectionId(targetCollection.name, targetCollection.version)
);
const finalKeys = finalFiles.map((file) => file.key);
const finalBuckets = finalFiles.map((file) => file.bucket);
pgGranule.files.forEach((file) => {
expect(finalKeys.includes(file.key)).toBeTrue();
expect(finalBuckets.includes(file.bucket)).toBeTrue();
});
});
});
15 changes: 15 additions & 0 deletions tasks/change-granule-collection-pg/.babelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"comments": false,
"sourceMaps": "both",
"sourceType": "unambiguous",
"presets": [
["@babel/preset-env", {
"targets": {
"node": "20.12.2"
}
}]
],
"plugins": [
"source-map-support"
]
}
2 changes: 2 additions & 0 deletions tasks/change-granule-collection-pg/.npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/nyc.config.js
/tests/
10 changes: 10 additions & 0 deletions tasks/change-granule-collection-pg/.nycrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"extends": "../../nyc.config.js",
"include": [
"*.js"
],
"statements": 92.0,
"functions": 92.0,
"branches": 78.0,
"lines": 91.0
}
Loading