diff --git a/test_server/__init__.py b/test_server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test_server/asgi.py b/test_server/asgi.py new file mode 100644 index 0000000..dffd26b --- /dev/null +++ b/test_server/asgi.py @@ -0,0 +1,102 @@ +import asyncio +import datetime +import sys +from wsgiref.simple_server import make_server + +import sonora.asgi + +import daphne.server + +from test_server import empty_pb2, messages_pb2, test_pb2_grpc + +_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial" +_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin" +_US_IN_A_SECOND = 1000 * 1000 +UNARY_CALL_WITH_SLEEP_VALUE = 0.2 + +async def _maybe_echo_metadata(servicer_context): + """Copies metadata from request to response if it is present.""" + invocation_metadata = dict(servicer_context.invocation_metadata()) + if _INITIAL_METADATA_KEY in invocation_metadata: + initial_metadatum = (_INITIAL_METADATA_KEY, + invocation_metadata[_INITIAL_METADATA_KEY]) + await servicer_context.send_initial_metadata((initial_metadatum,)) + + if _TRAILING_METADATA_KEY in invocation_metadata: + trailing_metadatum = (_TRAILING_METADATA_KEY, + invocation_metadata[_TRAILING_METADATA_KEY]) + servicer_context.set_trailing_metadata((trailing_metadatum,)) + + +async def _maybe_echo_status(request: messages_pb2.SimpleRequest, + servicer_context): + """Echos the RPC status if demanded by the request.""" + if request.HasField('response_status'): + await servicer_context.abort(request.response_status.code, + request.response_status.message) + + +class TestServiceServicer(test_pb2_grpc.TestServiceServicer): + + async def UnaryCall(self, request, context): + await _maybe_echo_metadata(context) + await _maybe_echo_status(request, context) + + return messages_pb2.SimpleResponse( + payload=messages_pb2.Payload(type=messages_pb2.COMPRESSABLE, + body=b'\x00' * request.response_size)) + + async def EmptyCall(self, request, context): + return empty_pb2.Empty() + + async def StreamingOutputCall( + self, request: messages_pb2.StreamingOutputCallRequest, + unused_context): + for response_parameters in request.response_parameters: + if response_parameters.interval_us != 0: + await asyncio.sleep( + datetime.timedelta(microseconds=response_parameters. + interval_us).total_seconds()) + if response_parameters.size != 0: + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload(type=request.response_type, + body=b'\x00' * + response_parameters.size)) + else: + yield messages_pb2.StreamingOutputCallResponse() + + # Next methods are extra ones that are registred programatically + # when the sever is instantiated. They are not being provided by + # the proto file. + async def UnaryCallWithSleep(self, unused_request, unused_context): + await asyncio.sleep(UNARY_CALL_WITH_SLEEP_VALUE) + return messages_pb2.SimpleResponse() + + async def StreamingInputCall(self, request_async_iterator, unused_context): + aggregate_size = 0 + async for request in request_async_iterator: + if request.payload is not None and request.payload.body: + aggregate_size += len(request.payload.body) + return messages_pb2.StreamingInputCallResponse( + aggregated_payload_size=aggregate_size) + + async def FullDuplexCall(self, request_async_iterator, context): + await _maybe_echo_metadata(context) + async for request in request_async_iterator: + await _maybe_echo_status(request, context) + for response_parameters in request.response_parameters: + if response_parameters.interval_us != 0: + await asyncio.sleep( + datetime.timedelta(microseconds=response_parameters. + interval_us).total_seconds()) + if response_parameters.size != 0: + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload(type=request.payload.type, + body=b'\x00' * + response_parameters.size)) + else: + yield messages_pb2.StreamingOutputCallResponse() + + +application = sonora.asgi.grpcASGI() +test_pb2_grpc.add_TestServiceServicer_to_server(TestServiceServicer(), application) diff --git a/test_server/empty.proto b/test_server/empty.proto new file mode 100644 index 0000000..6a0aa88 --- /dev/null +++ b/test_server/empty.proto @@ -0,0 +1,28 @@ + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package grpc.testing; + +// An empty message that you can re-use to avoid defining duplicated empty +// messages in your project. A typical example is to use it as argument or the +// return value of a service API. For instance: +// +// service Foo { +// rpc Bar (grpc.testing.Empty) returns (grpc.testing.Empty) { }; +// }; +// +message Empty {} diff --git a/test_server/messages.proto b/test_server/messages.proto new file mode 100644 index 0000000..5993bc6 --- /dev/null +++ b/test_server/messages.proto @@ -0,0 +1,209 @@ + +// Copyright 2015-2016 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Message definitions to be used by integration test service definitions. + +syntax = "proto3"; + +package grpc.testing; + +// TODO(dgq): Go back to using well-known types once +// https://github.com/grpc/grpc/issues/6980 has been fixed. +// import "google/protobuf/wrappers.proto"; +message BoolValue { + // The bool value. + bool value = 1; +} + +// The type of payload that should be returned. +enum PayloadType { + // Compressable text format. + COMPRESSABLE = 0; +} + +// A block of data, to simply increase gRPC message size. +message Payload { + // The type of data in body. + PayloadType type = 1; + // Primary contents of payload. + bytes body = 2; +} + +// A protobuf representation for grpc status. This is used by test +// clients to specify a status that the server should attempt to return. +message EchoStatus { + int32 code = 1; + string message = 2; +} + +// The type of route that a client took to reach a server w.r.t. gRPCLB. +// The server must fill in "fallback" if it detects that the RPC reached +// the server via the "gRPCLB fallback" path, and "backend" if it detects +// that the RPC reached the server via "gRPCLB backend" path (i.e. if it got +// the address of this server from the gRPCLB server BalanceLoad RPC). Exactly +// how this detection is done is context and server dependent. +enum GrpclbRouteType { + // Server didn't detect the route that a client took to reach it. + GRPCLB_ROUTE_TYPE_UNKNOWN = 0; + // Indicates that a client reached a server via gRPCLB fallback. + GRPCLB_ROUTE_TYPE_FALLBACK = 1; + // Indicates that a client reached a server as a gRPCLB-given backend. + GRPCLB_ROUTE_TYPE_BACKEND = 2; +} + +// Unary request. +message SimpleRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, server randomly chooses one from other formats. + PayloadType response_type = 1; + + // Desired payload size in the response from the server. + int32 response_size = 2; + + // Optional input payload sent along with the request. + Payload payload = 3; + + // Whether SimpleResponse should include username. + bool fill_username = 4; + + // Whether SimpleResponse should include OAuth scope. + bool fill_oauth_scope = 5; + + // Whether to request the server to compress the response. This field is + // "nullable" in order to interoperate seamlessly with clients not able to + // implement the full compression tests by introspecting the call to verify + // the response's compression status. + BoolValue response_compressed = 6; + + // Whether server should return a given status + EchoStatus response_status = 7; + + // Whether the server should expect this request to be compressed. + BoolValue expect_compressed = 8; + + // Whether SimpleResponse should include server_id. + bool fill_server_id = 9; + + // Whether SimpleResponse should include grpclb_route_type. + bool fill_grpclb_route_type = 10; +} + +// Unary response, as configured by the request. +message SimpleResponse { + // Payload to increase message size. + Payload payload = 1; + // The user the request came from, for verifying authentication was + // successful when the client expected it. + string username = 2; + // OAuth scope. + string oauth_scope = 3; + + // Server ID. This must be unique among different server instances, + // but the same across all RPC's made to a particular server instance. + string server_id = 4; + // gRPCLB Path. + GrpclbRouteType grpclb_route_type = 5; + + // Server hostname. + string hostname = 6; +} + +// Client-streaming request. +message StreamingInputCallRequest { + // Optional input payload sent along with the request. + Payload payload = 1; + + // Whether the server should expect this request to be compressed. This field + // is "nullable" in order to interoperate seamlessly with servers not able to + // implement the full compression tests by introspecting the call to verify + // the request's compression status. + BoolValue expect_compressed = 2; + + // Not expecting any payload from the response. +} + +// Client-streaming response. +message StreamingInputCallResponse { + // Aggregated size of payloads received from the client. + int32 aggregated_payload_size = 1; +} + +// Configuration for a particular response. +message ResponseParameters { + // Desired payload sizes in responses from the server. + int32 size = 1; + + // Desired interval between consecutive responses in the response stream in + // microseconds. + int32 interval_us = 2; + + // Whether to request the server to compress the response. This field is + // "nullable" in order to interoperate seamlessly with clients not able to + // implement the full compression tests by introspecting the call to verify + // the response's compression status. + BoolValue compressed = 3; +} + +// Server-streaming request. +message StreamingOutputCallRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, the payload from each response in the stream + // might be of different types. This is to simulate a mixed type of payload + // stream. + PayloadType response_type = 1; + + // Configuration for each expected response message. + repeated ResponseParameters response_parameters = 2; + + // Optional input payload sent along with the request. + Payload payload = 3; + + // Whether server should return a given status + EchoStatus response_status = 7; +} + +// Server-streaming response, as configured by the request and parameters. +message StreamingOutputCallResponse { + // Payload to increase response size. + Payload payload = 1; +} + +// For reconnect interop test only. +// Client tells server what reconnection parameters it used. +message ReconnectParams { + int32 max_reconnect_backoff_ms = 1; +} + +// For reconnect interop test only. +// Server tells client whether its reconnects are following the spec and the +// reconnect backoffs it saw. +message ReconnectInfo { + bool passed = 1; + repeated int32 backoff_ms = 2; +} + +message LoadBalancerStatsRequest { + // Request stats for the next num_rpcs sent by client. + int32 num_rpcs = 1; + // If num_rpcs have not completed within timeout_sec, return partial results. + int32 timeout_sec = 2; +} + +message LoadBalancerStatsResponse { + // The number of completed RPCs for each peer. + map rpcs_by_peer = 1; + // The number of RPCs that failed to record a remote peer. + int32 num_failures = 2; +} diff --git a/test_server/test.proto b/test_server/test.proto new file mode 100644 index 0000000..9e92654 --- /dev/null +++ b/test_server/test.proto @@ -0,0 +1,86 @@ + +// Copyright 2015-2016 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// An integration test service that covers all the method signature permutations +// of unary/streaming requests/responses. + +syntax = "proto3"; + +import "test_server/empty.proto"; +import "test_server/messages.proto"; + +package grpc.testing; + +// A simple service to test the various types of RPCs and experiment with +// performance with various types of payload. +service TestService { + // One empty request followed by one empty response. + rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty); + + // One request followed by one response. + rpc UnaryCall(SimpleRequest) returns (SimpleResponse); + + // One request followed by one response. Response has cache control + // headers set such that a caching HTTP proxy (such as GFE) can + // satisfy subsequent requests. + rpc CacheableUnaryCall(SimpleRequest) returns (SimpleResponse); + + // One request followed by a sequence of responses (streamed download). + // The server returns the payload with client desired type and sizes. + rpc StreamingOutputCall(StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by one response (streamed upload). + // The server returns the aggregated size of client payload as the result. + rpc StreamingInputCall(stream StreamingInputCallRequest) + returns (StreamingInputCallResponse); + + // A sequence of requests with each request served by the server immediately. + // As one request could lead to multiple responses, this interface + // demonstrates the idea of full duplexing. + rpc FullDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by a sequence of responses. + // The server buffers all the client requests and then serves them in order. A + // stream of responses are returned to the client when the server starts with + // first request. + rpc HalfDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // The test server will not implement this method. It will be used + // to test the behavior when clients call unimplemented methods. + rpc UnimplementedCall(grpc.testing.Empty) returns (grpc.testing.Empty); +} + +// A simple service NOT implemented at servers so clients can test for +// that case. +service UnimplementedService { + // A call that no server should implement + rpc UnimplementedCall(grpc.testing.Empty) returns (grpc.testing.Empty); +} + +// A service used to control reconnect server. +service ReconnectService { + rpc Start(grpc.testing.ReconnectParams) returns (grpc.testing.Empty); + rpc Stop(grpc.testing.Empty) returns (grpc.testing.ReconnectInfo); +} + +// A service used to obtain stats for verifying LB behavior. +service LoadBalancerStatsService { + // Gets the backend distribution for RPCs sent by a test client. + rpc GetClientStats(LoadBalancerStatsRequest) + returns (LoadBalancerStatsResponse) {} +} diff --git a/test_server/wsgi.py b/test_server/wsgi.py new file mode 100644 index 0000000..aefa4fe --- /dev/null +++ b/test_server/wsgi.py @@ -0,0 +1,92 @@ +import sys +import time +from wsgiref.simple_server import make_server + +import sonora.wsgi + +from test_server import empty_pb2, messages_pb2, test_pb2_grpc + +_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial" +_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin" +_US_IN_A_SECOND = 1000 * 1000 + + +def _maybe_echo_metadata(servicer_context): + """Copies metadata from request to response if it is present.""" + invocation_metadata = dict(servicer_context.invocation_metadata()) + if _INITIAL_METADATA_KEY in invocation_metadata: + initial_metadatum = (_INITIAL_METADATA_KEY, + invocation_metadata[_INITIAL_METADATA_KEY]) + servicer_context.send_initial_metadata((initial_metadatum,)) + if _TRAILING_METADATA_KEY in invocation_metadata: + trailing_metadatum = (_TRAILING_METADATA_KEY, + invocation_metadata[_TRAILING_METADATA_KEY]) + servicer_context.set_trailing_metadata((trailing_metadatum,)) + + +def _maybe_echo_status_and_message(request, servicer_context): + """Sets the response context code and details if the request asks for them""" + if request.HasField('response_status'): + servicer_context.set_code(request.response_status.code) + servicer_context.set_details(request.response_status.message) + + +class TestServiceServicer(test_pb2_grpc.TestServiceServicer): + def EmptyCall(self, request, context): + _maybe_echo_metadata(context) + return empty_pb2.Empty() + + def UnaryCall(self, request, context): + _maybe_echo_metadata(context) + _maybe_echo_status_and_message(request, context) + return messages_pb2.SimpleResponse( + payload=messages_pb2.Payload(type=messages_pb2.COMPRESSABLE, + body=b'\x00' * request.response_size)) + + def StreamingOutputCall(self, request, context): + _maybe_echo_status_and_message(request, context) + for response_parameters in request.response_parameters: + if response_parameters.interval_us != 0: + time.sleep(response_parameters.interval_us / _US_IN_A_SECOND) + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload(type=request.response_type, + body=b'\x00' * + response_parameters.size)) + + def StreamingInputCall(self, request_iterator, context): + aggregate_size = 0 + for request in request_iterator: + if request.payload is not None and request.payload.body: + aggregate_size += len(request.payload.body) + return messages_pb2.StreamingInputCallResponse( + aggregated_payload_size=aggregate_size) + + def FullDuplexCall(self, request_iterator, context): + _maybe_echo_metadata(context) + for request in request_iterator: + _maybe_echo_status_and_message(request, context) + for response_parameters in request.response_parameters: + if response_parameters.interval_us != 0: + time.sleep(response_parameters.interval_us / + _US_IN_A_SECOND) + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload(type=request.payload.type, + body=b'\x00' * + response_parameters.size)) + + # NOTE(nathaniel): Apparently this is the same as the full-duplex call? + # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)... + def HalfDuplexCall(self, request_iterator, context): + return self.FullDuplexCall(request_iterator, context) + +def main(args): + grpc_wsgi_app = sonora.wsgi.grpcWSGI(None) + + with make_server("", 8080, grpc_wsgi_app) as httpd: + test_pb2_grpc.add_TestServiceServicer_to_server(TestServiceServicer(), grpc_wsgi_app) + print("Server up on 0.0.0.0:8080") + httpd.serve_forever() + + +if __name__ == "__main__": + sys.exit(main(sys.argv))