From a7f8322bd26a66b7b6de1a8b78c20b8f0408f618 Mon Sep 17 00:00:00 2001 From: Niketan Chandarana Date: Mon, 3 Feb 2025 11:46:21 -0800 Subject: [PATCH] Added 'Entry' class in ListToMapConfig to support multiple entries config format. Modified ListToMapProcessor to convert existing config to entries format and use Entry objects for executing processor task. Signed-off-by: Niketan Chandarana --- .../mutate-event-processors/build.gradle | 1 + .../mutateevent/ListToMapProcessor.java | 150 +++++++++------ .../mutateevent/ListToMapProcessorConfig.java | 172 +++++++++++++++++- .../mutateevent/ListToMapProcessorTest.java | 130 +++++++++++++ 4 files changed, 389 insertions(+), 64 deletions(-) diff --git a/data-prepper-plugins/mutate-event-processors/build.gradle b/data-prepper-plugins/mutate-event-processors/build.gradle index e4b0c63cea..96a6aef38f 100644 --- a/data-prepper-plugins/mutate-event-processors/build.gradle +++ b/data-prepper-plugins/mutate-event-processors/build.gradle @@ -24,4 +24,5 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-databind' testImplementation project(':data-prepper-test-event') testImplementation testLibs.slf4j.simple + testImplementation testLibs.spring.test } \ No newline at end of file diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessor.java index 7cc6ba237a..bf0ac9155b 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessor.java @@ -33,6 +33,7 @@ public class ListToMapProcessor extends AbstractProcessor, Record< private final ListToMapProcessorConfig config; private final ExpressionEvaluator expressionEvaluator; + private final List entries; @DataPrepperPluginConstructor public ListToMapProcessor(final PluginMetrics pluginMetrics, final ListToMapProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { @@ -46,95 +47,123 @@ public ListToMapProcessor(final PluginMetrics pluginMetrics, final ListToMapProc String.format("list_to_map_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", config.getListToMapWhen())); } + + if (config.getSource() != null || config.getListToMapWhen() != null) { + ListToMapProcessorConfig.Entry entry = new ListToMapProcessorConfig.Entry( + config.getSource(), + config.getTarget(), + config.getKey(), + config.getValueKey(), + config.getUseSourceKey(), + config.getExtractValue(), + config.getFlatten(), + config.getFlattenedElement(), + config.getTagsOnFailure(), + config.getListToMapWhen() + ); + entries = List.of(entry); + } + else{ + entries = config.getEntries(); + } + + for (ListToMapProcessorConfig.Entry entry : entries) { + if (entry.getListToMapWhen() != null && !expressionEvaluator.isValidExpressionStatement(entry.getListToMapWhen())) { + throw new InvalidPluginConfigurationException( + String.format("list_to_map_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", + entry.getListToMapWhen())); + } + } } @Override public Collection> doExecute(final Collection> records) { for (final Record record : records) { final Event recordEvent = record.getData(); + for (ListToMapProcessorConfig.Entry entry : entries) { + try { - try { - - if (Objects.nonNull(config.getListToMapWhen()) && !expressionEvaluator.evaluateConditional(config.getListToMapWhen(), recordEvent)) { - continue; - } + if (Objects.nonNull(entry.getListToMapWhen()) && !expressionEvaluator.evaluateConditional(entry.getListToMapWhen(), recordEvent)) { + continue; + } - final List> sourceList; - try { - sourceList = recordEvent.get(config.getSource(), List.class); - } catch (final Exception e) { - LOG.warn(EVENT, "Given source path [{}] is not valid on record [{}]", - config.getSource(), recordEvent, e); - recordEvent.getMetadata().addTags(config.getTagsOnFailure()); - continue; - } + final List> sourceList; + try { + sourceList = recordEvent.get(entry.getSource(), List.class); + } catch (final Exception e) { + LOG.warn(EVENT, "Given source path [{}] is not valid on record [{}]", + entry.getSource(), recordEvent, e); + recordEvent.getMetadata().addTags(entry.getTagsOnFailure()); + continue; + } - final Map targetMap; - try { - targetMap = constructTargetMap(sourceList); - } catch (final IllegalArgumentException e) { - LOG.warn(EVENT, "Cannot find a list at the given source path [{}} on record [{}]", - config.getSource(), recordEvent, e); - recordEvent.getMetadata().addTags(config.getTagsOnFailure()); - continue; - } catch (final Exception e) { - LOG.atError() - .addMarker(EVENT) - .addMarker(NOISY) - .setMessage("Error converting source list to map on record [{}]") - .addArgument(recordEvent) - .setCause(e) - .log(); - recordEvent.getMetadata().addTags(config.getTagsOnFailure()); - continue; - } + final Map targetMap; + try { + targetMap = constructTargetMap(sourceList, entry); + } catch (final IllegalArgumentException e) { + LOG.warn(EVENT, "Cannot find a list at the given source path [{}} on record [{}]", + entry.getSource(), recordEvent, e); + recordEvent.getMetadata().addTags(entry.getTagsOnFailure()); + continue; + } catch (final Exception e) { + LOG.atError() + .addMarker(EVENT) + .addMarker(NOISY) + .setMessage("Error converting source list to map on record [{}]") + .addArgument(recordEvent) + .setCause(e) + .log(); + recordEvent.getMetadata().addTags(entry.getTagsOnFailure()); + continue; + } - try { - updateEvent(recordEvent, targetMap); + try { + updateEvent(recordEvent, targetMap, entry.getTarget()); + } catch (final Exception e) { + LOG.atError() + .addMarker(EVENT) + .addMarker(NOISY) + .setMessage("Error updating record [{}] after converting source list to map") + .addArgument(recordEvent) + .setCause(e) + .log(); + recordEvent.getMetadata().addTags(entry.getTagsOnFailure()); + } } catch (final Exception e) { LOG.atError() .addMarker(EVENT) .addMarker(NOISY) - .setMessage("Error updating record [{}] after converting source list to map") + .setMessage("There was an exception while processing Event [{}]") .addArgument(recordEvent) .setCause(e) .log(); - recordEvent.getMetadata().addTags(config.getTagsOnFailure()); + recordEvent.getMetadata().addTags(entry.getTagsOnFailure()); } - } catch (final Exception e) { - LOG.atError() - .addMarker(EVENT) - .addMarker(NOISY) - .setMessage("There was an exception while processing Event [{}]") - .addArgument(recordEvent) - .setCause(e) - .log(); - recordEvent.getMetadata().addTags(config.getTagsOnFailure()); } } return records; } - private Map constructTargetMap(final List> sourceList) { + private Map constructTargetMap(final List> sourceList, ListToMapProcessorConfig.Entry entry) { Map targetMap = new HashMap<>(); for (final Map itemMap : sourceList) { - if (config.getUseSourceKey()) { - if (config.getFlatten()) { + if (entry.getUseSourceKey()) { + if (entry.getFlatten()) { for (final String entryKey : itemMap.keySet()) { - setTargetMapFlattened(targetMap, itemMap, entryKey, entryKey, config.getExtractValue()); + setTargetMapFlattened(targetMap, itemMap, entryKey, entryKey, entry.getExtractValue(), entry.getFlattenedElement()); } } else { for (final String entryKey : itemMap.keySet()) { - setTargetMapUnflattened(targetMap, itemMap, entryKey, entryKey, config.getExtractValue()); + setTargetMapUnflattened(targetMap, itemMap, entryKey, entryKey, entry.getExtractValue()); } } } else { - final String itemKey = (String) itemMap.get(config.getKey()); - if (config.getFlatten()) { - setTargetMapFlattened(targetMap, itemMap, itemKey, config.getValueKey(), config.getValueKey() != null); + final String itemKey = (String) itemMap.get(entry.getKey()); + if (entry.getFlatten()) { + setTargetMapFlattened(targetMap, itemMap, itemKey, entry.getValueKey(), entry.getValueKey() != null, entry.getFlattenedElement()); } else { - setTargetMapUnflattened(targetMap, itemMap, itemKey, config.getValueKey(), config.getValueKey() != null); + setTargetMapUnflattened(targetMap, itemMap, itemKey, entry.getValueKey(), entry.getValueKey() != null); } } } @@ -157,8 +186,9 @@ private void setTargetMapUnflattened( } private void setTargetMapFlattened( - final Map targetMap, final Map itemMap, final String itemKey, final String itemValueKey, final boolean doExtractValue) { - if (!targetMap.containsKey(itemKey) || config.getFlattenedElement() == ListToMapProcessorConfig.FlattenedElement.LAST) { + final Map targetMap, final Map itemMap, final String itemKey, final String itemValueKey, + final boolean doExtractValue, final ListToMapProcessorConfig.FlattenedElement flattenedElement) { + if (!targetMap.containsKey(itemKey) || flattenedElement == ListToMapProcessorConfig.FlattenedElement.LAST) { if (doExtractValue) { targetMap.put(itemKey, itemMap.get(itemValueKey)); } else { @@ -167,14 +197,14 @@ private void setTargetMapFlattened( } } - private void updateEvent(final Event recordEvent, final Map targetMap) { - final boolean doWriteToRoot = Objects.isNull(config.getTarget()); + private void updateEvent(final Event recordEvent, final Map targetMap, final String target) { + final boolean doWriteToRoot = Objects.isNull(target); if (doWriteToRoot) { for (final Map.Entry entry : targetMap.entrySet()) { recordEvent.put(entry.getKey(), entry.getValue()); } } else { - recordEvent.put(config.getTarget(), targetMap); + recordEvent.put(target, targetMap); } } diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorConfig.java index 1d7c30b56c..bae4e3bf49 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorConfig.java @@ -11,6 +11,8 @@ import com.fasterxml.jackson.annotation.JsonPropertyDescription; import com.fasterxml.jackson.annotation.JsonPropertyOrder; import com.fasterxml.jackson.annotation.JsonValue; +import jakarta.validation.Valid; +import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import org.opensearch.dataprepper.model.annotations.ConditionalRequired; @@ -32,6 +34,14 @@ @IfThenElse( ifFulfilled = {@SchemaProperty(field = "flatten", value = "true")}, thenExpect = {@SchemaProperty(field = "flattened_element")} + ), + @IfThenElse( + ifFulfilled = {@SchemaProperty(field = "entries", value = "null")}, + thenExpect = {@SchemaProperty(field = "source")} + ), + @IfThenElse( + ifFulfilled = {@SchemaProperty(field = "source", value = "null")}, + thenExpect = {@SchemaProperty(field = "entries")} ) }) @JsonPropertyOrder @@ -65,8 +75,145 @@ public String getOptionValue() { } } - @NotEmpty - @NotNull + @JsonPropertyOrder + @ConditionalRequired(value = { + @IfThenElse( + ifFulfilled = {@SchemaProperty(field = "use_source_key", value = "false")}, + thenExpect = {@SchemaProperty(field = "key")} + ), + @IfThenElse( + ifFulfilled = {@SchemaProperty(field = "flatten", value = "true")}, + thenExpect = {@SchemaProperty(field = "flattened_element")} + ) + }) + public static class Entry { + @NotEmpty + @NotNull + @JsonProperty("source") + @JsonPropertyDescription("The list of objects with key fields to be converted into keys for the generated map.") + private String source; + + @JsonProperty("target") + @JsonPropertyDescription("The target for the generated map. When not specified, the generated map will be placed in the root node.") + private String target = null; + + @JsonProperty("use_source_key") + @JsonPropertyDescription("When true, keys in the generated map will use original keys from the source. " + + "Default is false.") + private boolean useSourceKey = false; + + @JsonProperty("key") + @JsonPropertyDescription("The key of the fields to be extracted as keys in the generated mappings. Must be " + + "specified if use_source_key is false.") + private String key; + + @JsonProperty("value_key") + @JsonPropertyDescription("When specified, values given a value_key in objects contained in the source list " + + "will be extracted and converted into the value specified by this option based on the generated map. " + + "When not specified, objects contained in the source list retain their original value when mapped.") + @ExampleValues({ + @Example(value = "value", description = "In each list object, 'value' will be extracted and used as the value in the generated map.") + }) + private String valueKey = null; + + @JsonProperty("extract_value") + @JsonPropertyDescription("When true, object values from the source list will be extracted and added to " + + "the generated map. When false, object values from the source list are added to the generated map " + + "as they appear in the source list. Default is false") + private boolean extractValue = false; + + @NotNull + @JsonProperty(value = "flatten", defaultValue = "false") + @JsonPropertyDescription("When true, values in the generated map output flatten into single items based on " + + "the flattened_element. Otherwise, objects mapped to values from the generated map appear as lists. " + + "Default is false.") + private boolean flatten = false; + + @NotNull + @JsonProperty(value = "flattened_element", defaultValue = "first") + @JsonPropertyDescription("The element to keep, either first or last, when flatten is set to true.") + private FlattenedElement flattenedElement = FlattenedElement.FIRST; + + @JsonProperty("tags_on_failure") + @JsonPropertyDescription("A list of tags to add to the event metadata when the event fails to process.") + private List tagsOnFailure; + + @JsonProperty("list_to_map_when") + @JsonPropertyDescription("A conditional expression, " + + "such as /some-key == \"test\", that will be evaluated to determine whether the processor will be " + + "run on the event. By default, all events will be processed unless otherwise stated.") + @ExampleValues({ + @Example(value = "/some-key == \"test\"", description = "The operation will run when the value of the key is 'test'.") + }) + private String listToMapWhen; + + public String getSource() { + return source; + } + + public String getTarget() { + return target; + } + + public String getKey() { + return key; + } + + public String getValueKey() { + return valueKey; + } + + public boolean getUseSourceKey() { + return useSourceKey; + } + + public boolean getExtractValue() { + return extractValue; + } + + public boolean getFlatten() { + return flatten; + } + + public FlattenedElement getFlattenedElement() { + return flattenedElement; + } + + public List getTagsOnFailure() { + return tagsOnFailure; + } + + public String getListToMapWhen() { + return listToMapWhen; + } + + public Entry(final String source, + final String target, + final String key, + final String valueKey, + final boolean useSourceKey, + final boolean extractValue, + final boolean flatten, + final FlattenedElement flattenedElement, + final List tagsOnFailure, + final String listToMapWhen) { + this.source = source; + this.target = target; + this.key = key; + this.valueKey = valueKey; + this.useSourceKey = useSourceKey; + this.extractValue = extractValue; + this.flatten = flatten; + this.flattenedElement = flattenedElement; + this.tagsOnFailure = tagsOnFailure; + this.listToMapWhen = listToMapWhen; + } + + public Entry() { + + } + } + @JsonProperty("source") @JsonPropertyDescription("The list of objects with key fields to be converted into keys for the generated map.") private String source; @@ -101,14 +248,12 @@ public String getOptionValue() { "as they appear in the source list. Default is false") private boolean extractValue = false; - @NotNull @JsonProperty(value = "flatten", defaultValue = "false") @JsonPropertyDescription("When true, values in the generated map output flatten into single items based on " + "the flattened_element. Otherwise, objects mapped to values from the generated map appear as lists. " + "Default is false.") private boolean flatten = false; - @NotNull @JsonProperty(value = "flattened_element", defaultValue = "first") @JsonPropertyDescription("The element to keep, either first or last, when flatten is set to true.") private FlattenedElement flattenedElement = FlattenedElement.FIRST; @@ -126,6 +271,21 @@ public String getOptionValue() { }) private String listToMapWhen; + @Valid + @JsonProperty("entries") + @JsonPropertyDescription("A list of entries to process list-to-map operations.") + private List entries; + + @AssertTrue(message = "Must use either entries configuration or individual configuration") + boolean isUsingAtLeastOneConfiguration() { + return entries != null || source != null; + } + + @AssertTrue(message = "Cannot use both entries configuration and individual configuration together") + boolean isNotUsingBothConfigurations() { + return entries == null || source == null; + } + public String getSource() { return source; } @@ -163,4 +323,8 @@ public FlattenedElement getFlattenedElement() { public List getTagsOnFailure() { return tagsOnFailure; } + + public List getEntries() { + return entries; + } } diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorTest.java index a5321b719e..48918a710f 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorTest.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/ListToMapProcessorTest.java @@ -16,7 +16,9 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; +import org.springframework.test.util.ReflectionTestUtils; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -355,6 +357,110 @@ public void testNoValueExtraction_when_the_when_condition_returns_false() { assertThat(resultEvent.toMap(), equalTo(testRecord.getData().toMap())); } + @Test + void invalid_list_to_map_when_with_entries_format_throws_InvalidPluginConfigurationException(){ + final List entries = List.of( + new ListToMapProcessorConfig.Entry( + "mylist", + "target", + "name", + "value", + false, + false, + true, + ListToMapProcessorConfig.FlattenedElement.FIRST, + null, + "invalid_condition" + ) + ); + + when(mockConfig.getEntries()).thenReturn(entries); + when(expressionEvaluator.isValidExpressionStatement("invalid_condition")).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + + @Test + void testMultipleEntriesFormatWithDifferentConditions() { + final List entries = Arrays.asList( + new ListToMapProcessorConfig.Entry( + "mylist", + "target1", + "name", + "value", + false, + false, + true, + ListToMapProcessorConfig.FlattenedElement.FIRST, + null, + "condition1" + ), + new ListToMapProcessorConfig.Entry( + "mylist2", + "target2", + "name2", + "value2", + false, + false, + true, + ListToMapProcessorConfig.FlattenedElement.FIRST, + null, + "condition2" + ) + ); + + when(mockConfig.getEntries()).thenReturn(entries); + when(expressionEvaluator.isValidExpressionStatement("condition1")).thenReturn(true); + when(expressionEvaluator.isValidExpressionStatement("condition2")).thenReturn(true); + + final Record testRecord = createTestRecordWithMultipleLists(); + + when(expressionEvaluator.evaluateConditional("condition1", testRecord.getData())).thenReturn(true); + when(expressionEvaluator.evaluateConditional("condition2", testRecord.getData())).thenReturn(false); + + final ListToMapProcessor processor = createObjectUnderTest(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + assertThat(resultEvent.get("target1/a", String.class), is("val-a")); + assertThat(resultEvent.get("target2", Object.class), is(nullValue())); + } + + @Test + public void test_both_configurations_used_together() { + final ListToMapProcessorConfig configObjectUnderTest = new ListToMapProcessorConfig(); + ReflectionTestUtils.setField(configObjectUnderTest, "source", "mylist"); + + final ListToMapProcessorConfig.Entry entry = new ListToMapProcessorConfig.Entry( + "mylist2", + "target2", + "name", + "value", + false, + false, + true, + ListToMapProcessorConfig.FlattenedElement.FIRST, + null, + "condition" + ); + + ReflectionTestUtils.setField(configObjectUnderTest, "entries", List.of(entry)); + + assertThat(configObjectUnderTest.isNotUsingBothConfigurations(), is(false)); + } + + @Test + public void test_no_configuration_used() { + final ListToMapProcessorConfig configObjectUnderTest = new ListToMapProcessorConfig(); + + ReflectionTestUtils.setField(configObjectUnderTest, "source", null); + ReflectionTestUtils.setField(configObjectUnderTest, "entries", null); + + assertThat(configObjectUnderTest.isUsingAtLeastOneConfiguration(), is(false)); + } + private ListToMapProcessor createObjectUnderTest() { return new ListToMapProcessor(pluginMetrics, mockConfig, expressionEvaluator); } @@ -384,4 +490,28 @@ private Record createTestRecordWithInconsistentKeys() { .build(); return new Record<>(event); } + + private Record createTestRecordWithMultipleLists() { + final Map data = Map.of( + "mylist", List.of( + Map.of("name", "a", "value", "val-a"), + Map.of("name", "b", "value", "val-b1"), + Map.of("name", "b", "value", "val-b2"), + Map.of("name", "c", "value", "val-c") + ), + "mylist2", List.of( + Map.of("name2", "x", "value2", "val-x"), + Map.of("name2", "y", "value2", "val-y1"), + Map.of("name2", "y", "value2", "val-y2"), + Map.of("name2", "z", "value2", "val-z") + ), + "nolist", "single-value" + + ); + final Event event = JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build(); + return new Record<>(event); + } }