diff --git a/model/interactive/src/main/proto/org/apache/beam/model/interactive/v1/beam_interactive_api.proto b/model/interactive/src/main/proto/org/apache/beam/model/interactive/v1/beam_interactive_api.proto index 87e0685a4237..6351b2472579 100644 --- a/model/interactive/src/main/proto/org/apache/beam/model/interactive/v1/beam_interactive_api.proto +++ b/model/interactive/src/main/proto/org/apache/beam/model/interactive/v1/beam_interactive_api.proto @@ -49,3 +49,16 @@ message TestStreamFileRecord { // The recorded event from an element stream. org.apache.beam.model.pipeline.v1.TestStreamPayload.Event recorded_event = 1; } + +service TestStreamService { + // A TestStream will request for events using this RPC. + rpc Events(EventsRequest) returns (stream org.apache.beam.model.pipeline.v1.TestStreamPayload.Event) {} +} + +message EventsRequest { + // The set of PCollections to read from. These are the PTransform outputs + // local names. These are a subset of the TestStream's outputs. This allows + // Interactive Beam to cache many PCollections from a pipeline then replay a + // subset of them. + repeated string output_ids = 1; +} diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto index ae6903fe6902..9dfcb76ca310 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto @@ -722,19 +722,6 @@ message TestStreamPayload { } } -service TestStreamService { - // A TestStream will request for events using this RPC. - rpc Events(EventsRequest) returns (stream TestStreamPayload.Event) {} -} - -message EventsRequest { - // The set of PCollections to read from. These are the PTransform outputs - // local names. These are a subset of the TestStream's outputs. This allows - // Interactive Beam to cache many PCollections from a pipeline then replay a - // subset of them. - repeated string output_ids = 1; -} - // The payload for the special-but-not-primitive WriteFiles transform. message WriteFilesPayload { @@ -750,7 +737,7 @@ message WriteFilesPayload { map side_inputs = 5; - // This is different from runner based sharding. This is done by the runner backend, where as runner_determined_sharding + // This is different from runner based sharding. This is done by the runner backend, where as runner_determined_sharding // is by the runner translator bool auto_sharded = 6; } @@ -968,7 +955,7 @@ message StandardCoders { // 01 - on time // 10 - late // 11 - unknown - // * bit 6 is 1 if this is the last pane, 0 otherwise. + // * bit 6 is 1 if this is the last pane, 0 otherwise. // Commonly set with `byte |= 0x02` // * bit 7 is 1 if this is the first pane, 0 otherwise. // Commonly set with `byte |= 0x01` diff --git a/sdks/python/apache_beam/runners/direct/test_stream_impl.py b/sdks/python/apache_beam/runners/direct/test_stream_impl.py index c720418b05ed..7b086367fd96 100644 --- a/sdks/python/apache_beam/runners/direct/test_stream_impl.py +++ b/sdks/python/apache_beam/runners/direct/test_stream_impl.py @@ -37,8 +37,8 @@ from apache_beam import ParDo from apache_beam import coders from apache_beam import pvalue -from apache_beam.portability.api import beam_runner_api_pb2 -from apache_beam.portability.api import beam_runner_api_pb2_grpc +from apache_beam.portability.api import beam_interactive_api_pb2 +from apache_beam.portability.api import beam_interactive_api_pb2_grpc from apache_beam.testing.test_stream import ElementEvent from apache_beam.testing.test_stream import ProcessingTimeEvent from apache_beam.testing.test_stream import WatermarkEvent @@ -267,10 +267,10 @@ def _stream_events_from_rpc(endpoint, output_tags, coder, channel, is_alive): is placed on the channel to signify a successful end. """ stub_channel = grpc.insecure_channel(endpoint) - stub = beam_runner_api_pb2_grpc.TestStreamServiceStub(stub_channel) + stub = beam_interactive_api_pb2_grpc.TestStreamServiceStub(stub_channel) # Request the PCollections that we are looking for from the service. - event_request = beam_runner_api_pb2.EventsRequest( + event_request = beam_interactive_api_pb2.EventsRequest( output_ids=[str(tag) for tag in output_tags]) event_stream = stub.Events(event_request) diff --git a/sdks/python/apache_beam/testing/test_stream_service.py b/sdks/python/apache_beam/testing/test_stream_service.py index 1f63cbf7274f..4b1f20694a8d 100644 --- a/sdks/python/apache_beam/testing/test_stream_service.py +++ b/sdks/python/apache_beam/testing/test_stream_service.py @@ -21,11 +21,11 @@ import grpc -from apache_beam.portability.api import beam_runner_api_pb2_grpc +from apache_beam.portability.api import beam_interactive_api_pb2_grpc class TestStreamServiceController( - beam_runner_api_pb2_grpc.TestStreamServiceServicer): + beam_interactive_api_pb2_grpc.TestStreamServiceServicer): """A server that streams TestStreamPayload.Events from a single EventRequest. This server is used as a way for TestStreams to receive events from file. @@ -42,7 +42,7 @@ def __init__(self, reader, endpoint=None, exception_handler=None): port = self._server.add_insecure_port('localhost:0') self.endpoint = 'localhost:{}'.format(port) - beam_runner_api_pb2_grpc.add_TestStreamServiceServicer_to_server( + beam_interactive_api_pb2_grpc.add_TestStreamServiceServicer_to_server( self, self._server) self._reader = reader self._exception_handler = exception_handler diff --git a/sdks/python/apache_beam/testing/test_stream_service_test.py b/sdks/python/apache_beam/testing/test_stream_service_test.py index a04fa2303d08..fb3708fc7ad0 100644 --- a/sdks/python/apache_beam/testing/test_stream_service_test.py +++ b/sdks/python/apache_beam/testing/test_stream_service_test.py @@ -23,8 +23,8 @@ import grpc from apache_beam.portability.api import beam_interactive_api_pb2 +from apache_beam.portability.api import beam_interactive_api_pb2_grpc from apache_beam.portability.api import beam_runner_api_pb2 -from apache_beam.portability.api import beam_runner_api_pb2_grpc from apache_beam.testing.test_stream_service import TestStreamServiceController # Nose automatically detects tests if they match a regex. Here, it mistakens @@ -63,14 +63,14 @@ def setUp(self): self.controller.start() channel = grpc.insecure_channel(self.controller.endpoint) - self.stub = beam_runner_api_pb2_grpc.TestStreamServiceStub(channel) + self.stub = beam_interactive_api_pb2_grpc.TestStreamServiceStub(channel) def tearDown(self): self.controller.stop() def test_normal_run(self): r = self.stub.Events( - beam_runner_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS)) + beam_interactive_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS)) events = [e for e in r] expected_events = [ e for e in EventsReader( @@ -81,9 +81,9 @@ def test_normal_run(self): def test_multiple_sessions(self): resp_a = self.stub.Events( - beam_runner_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS)) + beam_interactive_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS)) resp_b = self.stub.Events( - beam_runner_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS)) + beam_interactive_api_pb2.EventsRequest(output_ids=EXPECTED_KEYS)) events_a = [] events_b = []