Skip to content

Commit

Permalink
Official interop test servers
Browse files Browse the repository at this point in the history
  • Loading branch information
public committed Apr 26, 2021
1 parent 6c781e9 commit 0d662ec
Show file tree
Hide file tree
Showing 6 changed files with 517 additions and 0 deletions.
Empty file added test_server/__init__.py
Empty file.
102 changes: 102 additions & 0 deletions test_server/asgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import asyncio
import datetime
import sys
from wsgiref.simple_server import make_server

import sonora.asgi

import daphne.server

from test_server import empty_pb2, messages_pb2, test_pb2_grpc

_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial"
_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin"
_US_IN_A_SECOND = 1000 * 1000
UNARY_CALL_WITH_SLEEP_VALUE = 0.2

async def _maybe_echo_metadata(servicer_context):
"""Copies metadata from request to response if it is present."""
invocation_metadata = dict(servicer_context.invocation_metadata())
if _INITIAL_METADATA_KEY in invocation_metadata:
initial_metadatum = (_INITIAL_METADATA_KEY,
invocation_metadata[_INITIAL_METADATA_KEY])
await servicer_context.send_initial_metadata((initial_metadatum,))

if _TRAILING_METADATA_KEY in invocation_metadata:
trailing_metadatum = (_TRAILING_METADATA_KEY,
invocation_metadata[_TRAILING_METADATA_KEY])
servicer_context.set_trailing_metadata((trailing_metadatum,))


async def _maybe_echo_status(request: messages_pb2.SimpleRequest,
servicer_context):
"""Echos the RPC status if demanded by the request."""
if request.HasField('response_status'):
await servicer_context.abort(request.response_status.code,
request.response_status.message)


class TestServiceServicer(test_pb2_grpc.TestServiceServicer):

async def UnaryCall(self, request, context):
await _maybe_echo_metadata(context)
await _maybe_echo_status(request, context)

return messages_pb2.SimpleResponse(
payload=messages_pb2.Payload(type=messages_pb2.COMPRESSABLE,
body=b'\x00' * request.response_size))

async def EmptyCall(self, request, context):
return empty_pb2.Empty()

async def StreamingOutputCall(
self, request: messages_pb2.StreamingOutputCallRequest,
unused_context):
for response_parameters in request.response_parameters:
if response_parameters.interval_us != 0:
await asyncio.sleep(
datetime.timedelta(microseconds=response_parameters.
interval_us).total_seconds())
if response_parameters.size != 0:
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(type=request.response_type,
body=b'\x00' *
response_parameters.size))
else:
yield messages_pb2.StreamingOutputCallResponse()

# Next methods are extra ones that are registred programatically
# when the sever is instantiated. They are not being provided by
# the proto file.
async def UnaryCallWithSleep(self, unused_request, unused_context):
await asyncio.sleep(UNARY_CALL_WITH_SLEEP_VALUE)
return messages_pb2.SimpleResponse()

async def StreamingInputCall(self, request_async_iterator, unused_context):
aggregate_size = 0
async for request in request_async_iterator:
if request.payload is not None and request.payload.body:
aggregate_size += len(request.payload.body)
return messages_pb2.StreamingInputCallResponse(
aggregated_payload_size=aggregate_size)

async def FullDuplexCall(self, request_async_iterator, context):
await _maybe_echo_metadata(context)
async for request in request_async_iterator:
await _maybe_echo_status(request, context)
for response_parameters in request.response_parameters:
if response_parameters.interval_us != 0:
await asyncio.sleep(
datetime.timedelta(microseconds=response_parameters.
interval_us).total_seconds())
if response_parameters.size != 0:
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(type=request.payload.type,
body=b'\x00' *
response_parameters.size))
else:
yield messages_pb2.StreamingOutputCallResponse()


application = sonora.asgi.grpcASGI()
test_pb2_grpc.add_TestServiceServicer_to_server(TestServiceServicer(), application)
28 changes: 28 additions & 0 deletions test_server/empty.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package grpc.testing;

// An empty message that you can re-use to avoid defining duplicated empty
// messages in your project. A typical example is to use it as argument or the
// return value of a service API. For instance:
//
// service Foo {
// rpc Bar (grpc.testing.Empty) returns (grpc.testing.Empty) { };
// };
//
message Empty {}
209 changes: 209 additions & 0 deletions test_server/messages.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@

// Copyright 2015-2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Message definitions to be used by integration test service definitions.

syntax = "proto3";

package grpc.testing;

// TODO(dgq): Go back to using well-known types once
// https://github.com/grpc/grpc/issues/6980 has been fixed.
// import "google/protobuf/wrappers.proto";
message BoolValue {
// The bool value.
bool value = 1;
}

// The type of payload that should be returned.
enum PayloadType {
// Compressable text format.
COMPRESSABLE = 0;
}

// A block of data, to simply increase gRPC message size.
message Payload {
// The type of data in body.
PayloadType type = 1;
// Primary contents of payload.
bytes body = 2;
}

// A protobuf representation for grpc status. This is used by test
// clients to specify a status that the server should attempt to return.
message EchoStatus {
int32 code = 1;
string message = 2;
}

// The type of route that a client took to reach a server w.r.t. gRPCLB.
// The server must fill in "fallback" if it detects that the RPC reached
// the server via the "gRPCLB fallback" path, and "backend" if it detects
// that the RPC reached the server via "gRPCLB backend" path (i.e. if it got
// the address of this server from the gRPCLB server BalanceLoad RPC). Exactly
// how this detection is done is context and server dependent.
enum GrpclbRouteType {
// Server didn't detect the route that a client took to reach it.
GRPCLB_ROUTE_TYPE_UNKNOWN = 0;
// Indicates that a client reached a server via gRPCLB fallback.
GRPCLB_ROUTE_TYPE_FALLBACK = 1;
// Indicates that a client reached a server as a gRPCLB-given backend.
GRPCLB_ROUTE_TYPE_BACKEND = 2;
}

// Unary request.
message SimpleRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats.
PayloadType response_type = 1;

// Desired payload size in the response from the server.
int32 response_size = 2;

// Optional input payload sent along with the request.
Payload payload = 3;

// Whether SimpleResponse should include username.
bool fill_username = 4;

// Whether SimpleResponse should include OAuth scope.
bool fill_oauth_scope = 5;

// Whether to request the server to compress the response. This field is
// "nullable" in order to interoperate seamlessly with clients not able to
// implement the full compression tests by introspecting the call to verify
// the response's compression status.
BoolValue response_compressed = 6;

// Whether server should return a given status
EchoStatus response_status = 7;

// Whether the server should expect this request to be compressed.
BoolValue expect_compressed = 8;

// Whether SimpleResponse should include server_id.
bool fill_server_id = 9;

// Whether SimpleResponse should include grpclb_route_type.
bool fill_grpclb_route_type = 10;
}

// Unary response, as configured by the request.
message SimpleResponse {
// Payload to increase message size.
Payload payload = 1;
// The user the request came from, for verifying authentication was
// successful when the client expected it.
string username = 2;
// OAuth scope.
string oauth_scope = 3;

// Server ID. This must be unique among different server instances,
// but the same across all RPC's made to a particular server instance.
string server_id = 4;
// gRPCLB Path.
GrpclbRouteType grpclb_route_type = 5;

// Server hostname.
string hostname = 6;
}

// Client-streaming request.
message StreamingInputCallRequest {
// Optional input payload sent along with the request.
Payload payload = 1;

// Whether the server should expect this request to be compressed. This field
// is "nullable" in order to interoperate seamlessly with servers not able to
// implement the full compression tests by introspecting the call to verify
// the request's compression status.
BoolValue expect_compressed = 2;

// Not expecting any payload from the response.
}

// Client-streaming response.
message StreamingInputCallResponse {
// Aggregated size of payloads received from the client.
int32 aggregated_payload_size = 1;
}

// Configuration for a particular response.
message ResponseParameters {
// Desired payload sizes in responses from the server.
int32 size = 1;

// Desired interval between consecutive responses in the response stream in
// microseconds.
int32 interval_us = 2;

// Whether to request the server to compress the response. This field is
// "nullable" in order to interoperate seamlessly with clients not able to
// implement the full compression tests by introspecting the call to verify
// the response's compression status.
BoolValue compressed = 3;
}

// Server-streaming request.
message StreamingOutputCallRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, the payload from each response in the stream
// might be of different types. This is to simulate a mixed type of payload
// stream.
PayloadType response_type = 1;

// Configuration for each expected response message.
repeated ResponseParameters response_parameters = 2;

// Optional input payload sent along with the request.
Payload payload = 3;

// Whether server should return a given status
EchoStatus response_status = 7;
}

// Server-streaming response, as configured by the request and parameters.
message StreamingOutputCallResponse {
// Payload to increase response size.
Payload payload = 1;
}

// For reconnect interop test only.
// Client tells server what reconnection parameters it used.
message ReconnectParams {
int32 max_reconnect_backoff_ms = 1;
}

// For reconnect interop test only.
// Server tells client whether its reconnects are following the spec and the
// reconnect backoffs it saw.
message ReconnectInfo {
bool passed = 1;
repeated int32 backoff_ms = 2;
}

message LoadBalancerStatsRequest {
// Request stats for the next num_rpcs sent by client.
int32 num_rpcs = 1;
// If num_rpcs have not completed within timeout_sec, return partial results.
int32 timeout_sec = 2;
}

message LoadBalancerStatsResponse {
// The number of completed RPCs for each peer.
map<string, int32> rpcs_by_peer = 1;
// The number of RPCs that failed to record a remote peer.
int32 num_failures = 2;
}
Loading

0 comments on commit 0d662ec

Please sign in to comment.