Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline Runner Implementation and Tests #5435

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.dataprepper.core.peerforwarder.PeerForwardingProcessorDecorator;
import org.opensearch.dataprepper.core.pipeline.Pipeline;
import org.opensearch.dataprepper.core.pipeline.PipelineConnector;
import org.opensearch.dataprepper.core.pipeline.PipelineRunnerImpl;
import org.opensearch.dataprepper.core.pipeline.SupportsPipelineRunner;
import org.opensearch.dataprepper.core.pipeline.router.Router;
import org.opensearch.dataprepper.core.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.core.sourcecoordination.SourceCoordinatorFactory;
Expand Down Expand Up @@ -227,6 +229,11 @@ private void buildPipelineFromConfiguration(
eventFactory, acknowledgementSetManager, sourceCoordinatorFactory, processorThreads, readBatchDelay,
dataPrepperConfiguration.getProcessorShutdownTimeout(), dataPrepperConfiguration.getSinkShutdownTimeout(),
getPeerForwarderDrainTimeout(dataPrepperConfiguration));

if (pipelineDefinedBuffer instanceof SupportsPipelineRunner) {
((SupportsPipelineRunner) pipelineDefinedBuffer).setPipelineRunner(new PipelineRunnerImpl(pipeline));
}

pipelineMap.put(pipelineName, pipeline);
} catch (Exception ex) {
//If pipeline construction errors out, we will skip that pipeline and proceed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package org.opensearch.dataprepper.core.pipeline;

/**
* Pipeline Runner interface encapsulates the functionalities of reading from buffer,
* executing the processors and publishing to sinks to provide both synchronous and
* asynchronous mechanism for running a pipeline.
*/
public interface PipelineRunner {
void runAllProcessorsAndPublishToSinks();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package org.opensearch.dataprepper.core.pipeline;

import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.core.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.core.pipeline.common.FutureHelperResult;
import org.opensearch.dataprepper.core.pipeline.exceptions.InvalidEventHandleException;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

public class PipelineRunnerImpl implements PipelineRunner {
private static final Logger LOG = LoggerFactory.getLogger(PipelineRunnerImpl.class);
private final boolean acknowledgementsEnabled;
private boolean isEmptyRecordsLogged = false;
private final Buffer readBuffer;
private final List<Processor> processors;
private final Pipeline pipeline;

public PipelineRunnerImpl(Pipeline pipeline) {
List<Processor> processors = pipeline.getProcessorSets().stream().flatMap(Collection::stream).collect(Collectors.toList());
this.readBuffer = pipeline.getBuffer();
this.processors = processors;
this.pipeline = pipeline;
this.acknowledgementsEnabled = pipeline.getSource().areAcknowledgementsEnabled() || readBuffer.areAcknowledgementsEnabled();
}

@Override
public void runAllProcessorsAndPublishToSinks() {
final Map.Entry<Collection, CheckpointState> recordsReadFromBuffer = readFromBuffer(getBuffer(), getPipeline());
Collection records = recordsReadFromBuffer.getKey();
final CheckpointState checkpointState = recordsReadFromBuffer.getValue();
records = runProcessorsAndProcessAcknowledgements(getProcessors(), records);
postToSink(getPipeline(), records);
// Checkpoint the current batch read from the buffer after being processed by processors and sinks.
getBuffer().checkpoint(checkpointState);
}

@VisibleForTesting
Map.Entry<Collection, CheckpointState> readFromBuffer(Buffer buffer, Pipeline pipeline) {
final Map.Entry<Collection, CheckpointState> readResult = buffer.read(pipeline.getReadBatchTimeoutInMillis());
Collection records = readResult.getKey();
//TODO Hacky way to avoid logging continuously - Will be removed as part of metrics implementation
if (records.isEmpty()) {
if(!isEmptyRecordsLogged) {
LOG.debug(" {} Worker: No records received from buffer", pipeline.getName());
isEmptyRecordsLogged = true;
}
} else {
LOG.debug(" {} Worker: Processing {} records from buffer", pipeline.getName(), records.size());
}
return readResult;
}

@VisibleForTesting
void processAcknowledgements(List<Event> inputEvents, Collection<Record<Event>> outputRecords) {
Set<Event> outputEventsSet = outputRecords.stream().map(Record::getData).collect(Collectors.toSet());
// For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it
inputEvents.forEach(event -> {
EventHandle eventHandle = event.getEventHandle();
if (eventHandle != null && eventHandle instanceof DefaultEventHandle) {
InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle;
if (!outputEventsSet.contains(event)) {
eventHandle.release(true);
}
} else if (eventHandle != null) {
throw new InvalidEventHandleException("Unexpected EventHandle");
}
});
}

@VisibleForTesting
Collection runProcessorsAndProcessAcknowledgements(List<Processor> processors, Collection records) {
//Should Empty list from buffer should be sent to the processors? For now sending as the Stateful processors expects it.
for (final Processor processor : processors) {

List<Event> inputEvents = null;
if (acknowledgementsEnabled) {
inputEvents = ((List<Record<Event>>) records).stream().map(Record::getData).collect(Collectors.toList());
}

try {
records = processor.execute(records);
// acknowledge missing events only if the processor is not holding events
if (!processor.holdsEvents() && inputEvents != null) {
processAcknowledgements(inputEvents, records);
}
} catch (final Exception e) {
LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e);
if (inputEvents != null) {
processAcknowledgements(inputEvents, Collections.emptyList());
}

records = Collections.emptyList();
break;
}
}
return records;
}

/**
* TODO Add isolator pattern - Fail if one of the Sink fails [isolator Pattern]
* Uses the pipeline method to publish to sinks, waits for each of the sink result to be true before attempting to
* process more records from buffer.
*/

@VisibleForTesting
boolean postToSink(final Pipeline pipeline, final Collection<Record> records) {
LOG.debug("Pipeline Worker: Submitting {} processed records to sinks", records.size());
final List<Future<Void>> sinkFutures = pipeline.publishToSinks(records);
final FutureHelperResult<Void> futureResults = FutureHelper.awaitFuturesIndefinitely(sinkFutures);
return futureResults.getFailedReasons().size() == 0;
}

@VisibleForTesting
List<Processor> getProcessors() {
return processors;
}

@VisibleForTesting
Buffer getBuffer() {
return readBuffer;
}

@VisibleForTesting
Pipeline getPipeline() {
return pipeline;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,45 @@
package org.opensearch.dataprepper.core.pipeline;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.core.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.core.pipeline.common.FutureHelperResult;
import org.opensearch.dataprepper.core.pipeline.exceptions.InvalidEventHandleException;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

@SuppressWarnings({"rawtypes", "unchecked"})
public class ProcessWorker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ProcessWorker.class);

private static final String INVALID_EVENT_HANDLES = "invalidEventHandles";
private final Buffer readBuffer;
private final List<Processor> processors;
private final Pipeline pipeline;
private boolean isEmptyRecordsLogged = false;
private PluginMetrics pluginMetrics;
private final Counter invalidEventHandlesCounter;
private boolean acknowledgementsEnabled;
private final PipelineRunner pipelineRunner;

public ProcessWorker(
final Buffer readBuffer,
final List<Processor> processors,
final Pipeline pipeline) {
this(readBuffer, processors, pipeline, new PipelineRunnerImpl(pipeline));
}

public ProcessWorker(
final Buffer readBuffer,
final List<Processor> processors,
final Pipeline pipeline,
final PipelineRunner pipelineRunner) {
this.readBuffer = readBuffer;
this.processors = processors;
this.pipeline = pipeline;
this.pluginMetrics = PluginMetrics.fromNames("ProcessWorker", pipeline.getName());
this.invalidEventHandlesCounter = pluginMetrics.counter(INVALID_EVENT_HANDLES);
this.acknowledgementsEnabled = pipeline.getSource().areAcknowledgementsEnabled() || readBuffer.areAcknowledgementsEnabled();
this.pipelineRunner = pipelineRunner;
}

@Override
Expand Down Expand Up @@ -97,64 +91,12 @@ private void executeShutdownProcess() {
LOG.info("Processor shutdown phase 5 complete.");
}

private void processAcknowledgements(List<Event> inputEvents, Collection<Record<Event>> outputRecords) {
Set<Event> outputEventsSet = outputRecords.stream().map(Record::getData).collect(Collectors.toSet());
// For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it
inputEvents.forEach(event -> {
EventHandle eventHandle = event.getEventHandle();
if (eventHandle != null && eventHandle instanceof DefaultEventHandle) {
InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle;
if (!outputEventsSet.contains(event)) {
eventHandle.release(true);
}
} else if (eventHandle != null) {
invalidEventHandlesCounter.increment();
throw new RuntimeException("Unexpected EventHandle");
}
});
}

private void doRun() {
final Map.Entry<Collection, CheckpointState> readResult = readBuffer.read(pipeline.getReadBatchTimeoutInMillis());
Collection records = readResult.getKey();
final CheckpointState checkpointState = readResult.getValue();
//TODO Hacky way to avoid logging continuously - Will be removed as part of metrics implementation
if (records.isEmpty()) {
if(!isEmptyRecordsLogged) {
LOG.debug(" {} Worker: No records received from buffer", pipeline.getName());
isEmptyRecordsLogged = true;
}
} else {
LOG.debug(" {} Worker: Processing {} records from buffer", pipeline.getName(), records.size());
}
//Should Empty list from buffer should be sent to the processors? For now sending as the Stateful processors expects it.
for (final Processor processor : processors) {

List<Event> inputEvents = null;
if (acknowledgementsEnabled) {
inputEvents = ((List<Record<Event>>) records).stream().map(Record::getData).collect(Collectors.toList());
}

try {
records = processor.execute(records);
// acknowledge missing events only if the processor is not holding events
if (!processor.holdsEvents() && inputEvents != null) {
processAcknowledgements(inputEvents, records);
}
} catch (final Exception e) {
LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e);
if (inputEvents != null) {
processAcknowledgements(inputEvents, Collections.emptyList());
}

records = Collections.emptyList();
break;
}
try {
pipelineRunner.runAllProcessorsAndPublishToSinks();
} catch (InvalidEventHandleException exception) {
invalidEventHandlesCounter.increment();
}

postToSink(records);
// Checkpoint the current batch read from the buffer after being processed by processors and sinks.
readBuffer.checkpoint(checkpointState);
}

private boolean areComponentsReadyForShutdown() {
Expand All @@ -170,16 +112,4 @@ private boolean isBufferReadyForShutdown() {
LOG.debug("isBufferReadyForShutdown={}, isBufferEmpty={}, forceStopReadingBuffers={}", isBufferReadyForShutdown, isBufferEmpty, forceStopReadingBuffers);
return isBufferReadyForShutdown;
}

/**
* TODO Add isolator pattern - Fail if one of the Sink fails [isolator Pattern]
* Uses the pipeline method to publish to sinks, waits for each of the sink result to be true before attempting to
* process more records from buffer.
*/
private boolean postToSink(final Collection<Record> records) {
LOG.debug("Pipeline Worker: Submitting {} processed records to sinks", records.size());
final List<Future<Void>> sinkFutures = pipeline.publishToSinks(records);
final FutureHelperResult<Void> futureResults = FutureHelper.awaitFuturesIndefinitely(sinkFutures);
return futureResults.getFailedReasons().size() == 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.opensearch.dataprepper.core.pipeline.exceptions;

public class InvalidEventHandleException extends RuntimeException {
public InvalidEventHandleException(final String message) {
super(message);
}
}
Loading
Loading