diff --git a/.circleci/config.yml b/.circleci/config.yml index 2cc64bc..6db3009 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -3,12 +3,51 @@ workflows: version: 2 test: jobs: + - build-sonora-images + - build-grpcweb-images - test-py37 - test-py38 - - black-py37 - - interop-chrome-wsgi - - interop-chrome-asgi + - lint + - interop-grpcweb-wsgi: + requires: + - build-sonora-images + # - build-grpcweb-images + - interop-grpcweb-asgi: + requires: + - build-sonora-images + # - build-grpcweb-images jobs: + build-sonora-images: + docker: + - image: cimg/base:stable + steps: + - setup_remote_docker + - checkout + - run: + name: Build + command: | + docker-compose build + docker login --username=public --password=$DOCKER_HUB_PAT + docker tag public/sonora-test-wsgi-server public/sonora-test-wsgi-server:$CIRCLE_SHA1 + docker push public/sonora-test-wsgi-server:$CIRCLE_SHA1 + docker tag public/sonora-test-asgi-server public/sonora-test-asgi-server:$CIRCLE_SHA1 + docker push public/sonora-test-asgi-server:$CIRCLE_SHA1 + build-grpcweb-images: + docker: + - image: cimg/base:stable + steps: + - setup_remote_docker + - run: + name: Build + command: | + git clone https://github.com/grpc/grpc-web.git + cd grpc-web + git reset --hard 35c16a9e4e113b65966e159dc879bc452c00526c + docker pull public/sonora-grpcweb-interop + docker-compose build common prereqs node-interop-server interop-client + docker login --username=public --password=$DOCKER_HUB_PAT + docker tag grpcweb/prereqs public/sonora-grpcweb-interop:$CIRCLE_SHA1 + docker push public/sonora-grpcweb-interop:$CIRCLE_SHA1 test-py37: docker: - image: circleci/python:3.7 @@ -37,56 +76,38 @@ jobs: key: benchmarks-{{ .Environment.CIRCLE_JOB }}-{{ .Branch }}-{{ .Revision }} paths: - .benchmarks - black-py37: + test-py39: docker: - - image: circleci/python:3.7 + - image: circleci/python:3.9 steps: - checkout - run: sudo pip install tox - - run: tox -e black - interop-chrome-wsgi: + - run: sudo apt-get install python3-dev libev-dev + - restore_cache: + key: benchmarks-{{ .Environment.CIRCLE_JOB }}-master + - run: tox -e py39 -- --benchmark-save=${CIRCLE_BRANCH} --benchmark-compare + - save_cache: + key: benchmarks-{{ .Environment.CIRCLE_JOB }}-{{ .Branch }}-{{ .Revision }} + paths: + - .benchmarks + interop-grpcweb-wsgi: docker: - - image: public/grpc-web-frontend-tests:latest - entrypoint: /bin/bash + - image: public/sonora-grpcweb-interop + - image: public/sonora-test-wsgi-server:$CIRCLE_SHA1 steps: - - checkout - - run: sudo pip3 install grpcio-tools - - run: sudo pip3 install . - - run: | - python3 -m grpc.tools.protoc \ - --proto_path=$(pwd) \ - --python_out=. \ - --grpc_python_out=. \ - $(pwd)/echo/echo.proto - - run: - background: true - command: PYTHONPATH=.:$PYTHONPATH python3 echo/wsgi.py - run: - working_directory: /home/node/src - command: "./node_modules/.bin/grunt karma:improbable --grpc-host=http://localhost:8080" + command: /github/grpc-web/scripts/docker-run-interop-tests.sh + interop-grpcweb-asgi: + docker: + - image: public/sonora-grpcweb-interop + - image: public/sonora-test-asgi-server:$CIRCLE_SHA1 + steps: - run: - working_directory: /home/node/src - command: "./node_modules/.bin/grunt karma:grpcWeb --grpc-host=http://localhost:8080" - interop-chrome-asgi: + command: /github/grpc-web/scripts/docker-run-interop-tests.sh + lint: docker: - - image: public/grpc-web-frontend-tests:latest - entrypoint: /bin/bash + - image: circleci/python:3.9 steps: - checkout - - run: sudo pip3 install grpcio-tools daphne - - run: sudo pip3 install . - - run: | - python3 -m grpc.tools.protoc \ - --proto_path="$(pwd)" \ - --python_out=. \ - --grpc_python_out=. \ - "$(pwd)/echo/echo.proto" - - run: - background: true - command: PYTHONPATH=.:$PYTHONPATH python3 echo/asgi.py - - run: - working_directory: /home/node/src - command: "./node_modules/.bin/grunt karma:improbable --grpc-host=http://localhost:8080" - - run: - working_directory: /home/node/src - command: "./node_modules/.bin/grunt karma:grpcWeb --grpc-host=http://localhost:8080" + - run: sudo pip install tox + - run: tox -e black \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4590a5a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.8.1-buster AS base + +WORKDIR /usr/src/app + +RUN apt update && \ + apt install -y build-essential libev-dev + +COPY . . + +RUN pip install -e .[tests] + +RUN python -m grpc.tools.protoc \ + --proto_path="$(pwd)/" \ + --python_out=. \ + --grpc_python_out=. \ + "$(pwd)"/test_server/*.proto + +RUN pip install daphne + +FROM base AS wsgi + +CMD python test_server/wsgi.py + +FROM base AS asgi + +CMD daphne -b 0.0.0.0 -p 8080 test_server.asgi:application \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..29d9ddc --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,35 @@ +version: "3.9" +services: + wsgi-server: + build: + context: . + target: wsgi + image: public/sonora-test-wsgi-server + volumes: + - ".:/usr/src/app" + networks: + interop: + aliases: + - wsgi-server + - server + + asgi-server: + build: + context: . + target: asgi + image: public/sonora-test-asgi-server + volumes: + - ".:/usr/src/app" + networks: + interop: + aliases: + - server + + interop-grpcweb: + image: grpcweb/prereqs + entrypoint: /github/grpc-web/scripts/docker-run-interop-tests.sh + networks: + - interop + +networks: + interop: \ No newline at end of file diff --git a/echo/README.md b/echo/README.md deleted file mode 100755 index 10ea475..0000000 --- a/echo/README.md +++ /dev/null @@ -1,6 +0,0 @@ -# Echo protofile - -A nice simple example showing various method types and streaming modes. - -Copied almost entirely from the -[official grpc-web repo](https://github.com/grpc/grpc-web/blob/92aa9f8fc8e7af4aadede52ea075dd5790a63b62/net/grpc/gateway/examples/echo/echo.proto). diff --git a/echo/asgi.py b/echo/asgi.py deleted file mode 100644 index 86e27e5..0000000 --- a/echo/asgi.py +++ /dev/null @@ -1,47 +0,0 @@ -import sys -import time - -from daphne.cli import ASGI3Middleware -import daphne.server - -import grpc -import sonora.asgi - -from echo import echo_pb2 -from echo import echo_pb2_grpc -import asyncio - - -class Echo(echo_pb2_grpc.EchoServiceServicer): - async def Echo(self, request, context): - return echo_pb2.EchoResponse(message=request.message) - - async def EchoAbort(self, request, context): - context.set_code(grpc.StatusCode.ABORTED) - return echo_pb2.EchoResponse(message=request.message) - - async def ServerStreamingEcho(self, request, context): - for _ in range(request.message_count): - yield echo_pb2.EchoResponse(message=request.message) - - await asyncio.sleep(request.message_interval.seconds / 2) - time.sleep(request.message_interval.seconds / 2) - - async def ServerStreamingEchoAbort(self, request, context): - for _ in range(request.message_count // 2): - yield echo_pb2.EchoResponse(message=request.message) - context.set_code(grpc.StatusCode.ABORTED) - - -def main(args): - grpc_asgi_app = sonora.asgi.grpcASGI() - echo_pb2_grpc.add_EchoServiceServicer_to_server(Echo(), grpc_asgi_app) - - server = daphne.server.Server( - ASGI3Middleware(grpc_asgi_app), ["tcp:port=8080:interface=0.0.0.0"] - ) - server.run() - - -if __name__ == "__main__": - sys.exit(main(sys.argv)) diff --git a/echo/echo.proto b/echo/echo.proto deleted file mode 100755 index 25ff28a..0000000 --- a/echo/echo.proto +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2018 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package grpc.gateway.testing; - -import "google/protobuf/duration.proto"; - -message Empty { -} - -message EchoRequest { - string message = 1; -} - -message EchoResponse { - string message = 1; - int32 message_count = 2; -} - -// Request type for server side streaming echo. -message ServerStreamingEchoRequest { - // Message string for server streaming request. - string message = 1; - - // The total number of messages to be generated before the server - // closes the stream; default is 10. - int32 message_count = 2; - - // The interval between two server messages. The server implementation - // may enforce some minimum interval (e.g. 100ms) to avoid message overflow. - google.protobuf.Duration message_interval = 3; -} - -// Response type for server streaming response. -message ServerStreamingEchoResponse { - // Response message. - string message = 1; -} - -// Request type for client side streaming echo. -message ClientStreamingEchoRequest { - // A special value "" indicates that there's no further messages. - string message = 1; -} - -// Response type for client side streaming echo. -message ClientStreamingEchoResponse { - // Total number of client messages that have been received. - int32 message_count = 1; -} - -// A simple echo service. -service EchoService { - // One request followed by one response - // The server returns the client message as-is. - rpc Echo(EchoRequest) returns (EchoResponse); - - // Sends back abort status. - rpc EchoAbort(EchoRequest) returns (EchoResponse) { - }; - - // One empty request, ZERO processing, followed by one empty response - // (minimum effort to do message serialization). - rpc NoOp(Empty) returns (Empty); - - // One request followed by a sequence of responses (streamed download). - // The server will return the same client message repeatedly. - rpc ServerStreamingEcho(ServerStreamingEchoRequest) - returns (stream ServerStreamingEchoResponse); - - // One request followed by a sequence of responses (streamed download). - // The server abort directly. - rpc ServerStreamingEchoAbort(ServerStreamingEchoRequest) - returns (stream ServerStreamingEchoResponse) { - } - - // A sequence of requests followed by one response (streamed upload). - // The server returns the total number of messages as the result. - rpc ClientStreamingEcho(stream ClientStreamingEchoRequest) - returns (ClientStreamingEchoResponse); - - // A sequence of requests with each message echoed by the server immediately. - // The server returns the same client messages in order. - // E.g. this is how the speech API works. - rpc FullDuplexEcho(stream EchoRequest) returns (stream EchoResponse); - - // A sequence of requests followed by a sequence of responses. - // The server buffers all the client messages and then returns the same - // client messages one by one after the client half-closes the stream. - // This is how an image recognition API may work. - rpc HalfDuplexEcho(stream EchoRequest) returns (stream EchoResponse); -} diff --git a/echo/echo/echo_pb2.py b/echo/echo/echo_pb2.py deleted file mode 100644 index 534694f..0000000 --- a/echo/echo/echo_pb2.py +++ /dev/null @@ -1,408 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: echo/echo.proto - -import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) -from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection -from google.protobuf import symbol_database as _symbol_database -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - -from google.protobuf import duration_pb2 as google_dot_protobuf_dot_duration__pb2 - - -DESCRIPTOR = _descriptor.FileDescriptor( - name='echo/echo.proto', - package='grpc.gateway.testing', - syntax='proto3', - serialized_options=None, - serialized_pb=_b('\n\x0f\x65\x63ho/echo.proto\x12\x14grpc.gateway.testing\x1a\x1egoogle/protobuf/duration.proto\"\x07\n\x05\x45mpty\"\x1e\n\x0b\x45\x63hoRequest\x12\x0f\n\x07message\x18\x01 \x01(\t\"6\n\x0c\x45\x63hoResponse\x12\x0f\n\x07message\x18\x01 \x01(\t\x12\x15\n\rmessage_count\x18\x02 \x01(\x05\"y\n\x1aServerStreamingEchoRequest\x12\x0f\n\x07message\x18\x01 \x01(\t\x12\x15\n\rmessage_count\x18\x02 \x01(\x05\x12\x33\n\x10message_interval\x18\x03 \x01(\x0b\x32\x19.google.protobuf.Duration\".\n\x1bServerStreamingEchoResponse\x12\x0f\n\x07message\x18\x01 \x01(\t\"-\n\x1a\x43lientStreamingEchoRequest\x12\x0f\n\x07message\x18\x01 \x01(\t\"4\n\x1b\x43lientStreamingEchoResponse\x12\x15\n\rmessage_count\x18\x01 \x01(\x05\x32\xb0\x06\n\x0b\x45\x63hoService\x12M\n\x04\x45\x63ho\x12!.grpc.gateway.testing.EchoRequest\x1a\".grpc.gateway.testing.EchoResponse\x12T\n\tEchoAbort\x12!.grpc.gateway.testing.EchoRequest\x1a\".grpc.gateway.testing.EchoResponse\"\x00\x12@\n\x04NoOp\x12\x1b.grpc.gateway.testing.Empty\x1a\x1b.grpc.gateway.testing.Empty\x12|\n\x13ServerStreamingEcho\x12\x30.grpc.gateway.testing.ServerStreamingEchoRequest\x1a\x31.grpc.gateway.testing.ServerStreamingEchoResponse0\x01\x12\x83\x01\n\x18ServerStreamingEchoAbort\x12\x30.grpc.gateway.testing.ServerStreamingEchoRequest\x1a\x31.grpc.gateway.testing.ServerStreamingEchoResponse\"\x00\x30\x01\x12|\n\x13\x43lientStreamingEcho\x12\x30.grpc.gateway.testing.ClientStreamingEchoRequest\x1a\x31.grpc.gateway.testing.ClientStreamingEchoResponse(\x01\x12[\n\x0e\x46ullDuplexEcho\x12!.grpc.gateway.testing.EchoRequest\x1a\".grpc.gateway.testing.EchoResponse(\x01\x30\x01\x12[\n\x0eHalfDuplexEcho\x12!.grpc.gateway.testing.EchoRequest\x1a\".grpc.gateway.testing.EchoResponse(\x01\x30\x01\x62\x06proto3') - , - dependencies=[google_dot_protobuf_dot_duration__pb2.DESCRIPTOR,]) - - - - -_EMPTY = _descriptor.Descriptor( - name='Empty', - full_name='grpc.gateway.testing.Empty', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=73, - serialized_end=80, -) - - -_ECHOREQUEST = _descriptor.Descriptor( - name='EchoRequest', - full_name='grpc.gateway.testing.EchoRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='message', full_name='grpc.gateway.testing.EchoRequest.message', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=82, - serialized_end=112, -) - - -_ECHORESPONSE = _descriptor.Descriptor( - name='EchoResponse', - full_name='grpc.gateway.testing.EchoResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='message', full_name='grpc.gateway.testing.EchoResponse.message', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='message_count', full_name='grpc.gateway.testing.EchoResponse.message_count', index=1, - number=2, type=5, cpp_type=1, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=114, - serialized_end=168, -) - - -_SERVERSTREAMINGECHOREQUEST = _descriptor.Descriptor( - name='ServerStreamingEchoRequest', - full_name='grpc.gateway.testing.ServerStreamingEchoRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='message', full_name='grpc.gateway.testing.ServerStreamingEchoRequest.message', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='message_count', full_name='grpc.gateway.testing.ServerStreamingEchoRequest.message_count', index=1, - number=2, type=5, cpp_type=1, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='message_interval', full_name='grpc.gateway.testing.ServerStreamingEchoRequest.message_interval', index=2, - number=3, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=170, - serialized_end=291, -) - - -_SERVERSTREAMINGECHORESPONSE = _descriptor.Descriptor( - name='ServerStreamingEchoResponse', - full_name='grpc.gateway.testing.ServerStreamingEchoResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='message', full_name='grpc.gateway.testing.ServerStreamingEchoResponse.message', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=293, - serialized_end=339, -) - - -_CLIENTSTREAMINGECHOREQUEST = _descriptor.Descriptor( - name='ClientStreamingEchoRequest', - full_name='grpc.gateway.testing.ClientStreamingEchoRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='message', full_name='grpc.gateway.testing.ClientStreamingEchoRequest.message', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=341, - serialized_end=386, -) - - -_CLIENTSTREAMINGECHORESPONSE = _descriptor.Descriptor( - name='ClientStreamingEchoResponse', - full_name='grpc.gateway.testing.ClientStreamingEchoResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='message_count', full_name='grpc.gateway.testing.ClientStreamingEchoResponse.message_count', index=0, - number=1, type=5, cpp_type=1, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=388, - serialized_end=440, -) - -_SERVERSTREAMINGECHOREQUEST.fields_by_name['message_interval'].message_type = google_dot_protobuf_dot_duration__pb2._DURATION -DESCRIPTOR.message_types_by_name['Empty'] = _EMPTY -DESCRIPTOR.message_types_by_name['EchoRequest'] = _ECHOREQUEST -DESCRIPTOR.message_types_by_name['EchoResponse'] = _ECHORESPONSE -DESCRIPTOR.message_types_by_name['ServerStreamingEchoRequest'] = _SERVERSTREAMINGECHOREQUEST -DESCRIPTOR.message_types_by_name['ServerStreamingEchoResponse'] = _SERVERSTREAMINGECHORESPONSE -DESCRIPTOR.message_types_by_name['ClientStreamingEchoRequest'] = _CLIENTSTREAMINGECHOREQUEST -DESCRIPTOR.message_types_by_name['ClientStreamingEchoResponse'] = _CLIENTSTREAMINGECHORESPONSE -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - -Empty = _reflection.GeneratedProtocolMessageType('Empty', (_message.Message,), dict( - DESCRIPTOR = _EMPTY, - __module__ = 'echo.echo_pb2' - # @@protoc_insertion_point(class_scope:grpc.gateway.testing.Empty) - )) -_sym_db.RegisterMessage(Empty) - -EchoRequest = _reflection.GeneratedProtocolMessageType('EchoRequest', (_message.Message,), dict( - DESCRIPTOR = _ECHOREQUEST, - __module__ = 'echo.echo_pb2' - # @@protoc_insertion_point(class_scope:grpc.gateway.testing.EchoRequest) - )) -_sym_db.RegisterMessage(EchoRequest) - -EchoResponse = _reflection.GeneratedProtocolMessageType('EchoResponse', (_message.Message,), dict( - DESCRIPTOR = _ECHORESPONSE, - __module__ = 'echo.echo_pb2' - # @@protoc_insertion_point(class_scope:grpc.gateway.testing.EchoResponse) - )) -_sym_db.RegisterMessage(EchoResponse) - -ServerStreamingEchoRequest = _reflection.GeneratedProtocolMessageType('ServerStreamingEchoRequest', (_message.Message,), dict( - DESCRIPTOR = _SERVERSTREAMINGECHOREQUEST, - __module__ = 'echo.echo_pb2' - # @@protoc_insertion_point(class_scope:grpc.gateway.testing.ServerStreamingEchoRequest) - )) -_sym_db.RegisterMessage(ServerStreamingEchoRequest) - -ServerStreamingEchoResponse = _reflection.GeneratedProtocolMessageType('ServerStreamingEchoResponse', (_message.Message,), dict( - DESCRIPTOR = _SERVERSTREAMINGECHORESPONSE, - __module__ = 'echo.echo_pb2' - # @@protoc_insertion_point(class_scope:grpc.gateway.testing.ServerStreamingEchoResponse) - )) -_sym_db.RegisterMessage(ServerStreamingEchoResponse) - -ClientStreamingEchoRequest = _reflection.GeneratedProtocolMessageType('ClientStreamingEchoRequest', (_message.Message,), dict( - DESCRIPTOR = _CLIENTSTREAMINGECHOREQUEST, - __module__ = 'echo.echo_pb2' - # @@protoc_insertion_point(class_scope:grpc.gateway.testing.ClientStreamingEchoRequest) - )) -_sym_db.RegisterMessage(ClientStreamingEchoRequest) - -ClientStreamingEchoResponse = _reflection.GeneratedProtocolMessageType('ClientStreamingEchoResponse', (_message.Message,), dict( - DESCRIPTOR = _CLIENTSTREAMINGECHORESPONSE, - __module__ = 'echo.echo_pb2' - # @@protoc_insertion_point(class_scope:grpc.gateway.testing.ClientStreamingEchoResponse) - )) -_sym_db.RegisterMessage(ClientStreamingEchoResponse) - - - -_ECHOSERVICE = _descriptor.ServiceDescriptor( - name='EchoService', - full_name='grpc.gateway.testing.EchoService', - file=DESCRIPTOR, - index=0, - serialized_options=None, - serialized_start=443, - serialized_end=1259, - methods=[ - _descriptor.MethodDescriptor( - name='Echo', - full_name='grpc.gateway.testing.EchoService.Echo', - index=0, - containing_service=None, - input_type=_ECHOREQUEST, - output_type=_ECHORESPONSE, - serialized_options=None, - ), - _descriptor.MethodDescriptor( - name='EchoAbort', - full_name='grpc.gateway.testing.EchoService.EchoAbort', - index=1, - containing_service=None, - input_type=_ECHOREQUEST, - output_type=_ECHORESPONSE, - serialized_options=None, - ), - _descriptor.MethodDescriptor( - name='NoOp', - full_name='grpc.gateway.testing.EchoService.NoOp', - index=2, - containing_service=None, - input_type=_EMPTY, - output_type=_EMPTY, - serialized_options=None, - ), - _descriptor.MethodDescriptor( - name='ServerStreamingEcho', - full_name='grpc.gateway.testing.EchoService.ServerStreamingEcho', - index=3, - containing_service=None, - input_type=_SERVERSTREAMINGECHOREQUEST, - output_type=_SERVERSTREAMINGECHORESPONSE, - serialized_options=None, - ), - _descriptor.MethodDescriptor( - name='ServerStreamingEchoAbort', - full_name='grpc.gateway.testing.EchoService.ServerStreamingEchoAbort', - index=4, - containing_service=None, - input_type=_SERVERSTREAMINGECHOREQUEST, - output_type=_SERVERSTREAMINGECHORESPONSE, - serialized_options=None, - ), - _descriptor.MethodDescriptor( - name='ClientStreamingEcho', - full_name='grpc.gateway.testing.EchoService.ClientStreamingEcho', - index=5, - containing_service=None, - input_type=_CLIENTSTREAMINGECHOREQUEST, - output_type=_CLIENTSTREAMINGECHORESPONSE, - serialized_options=None, - ), - _descriptor.MethodDescriptor( - name='FullDuplexEcho', - full_name='grpc.gateway.testing.EchoService.FullDuplexEcho', - index=6, - containing_service=None, - input_type=_ECHOREQUEST, - output_type=_ECHORESPONSE, - serialized_options=None, - ), - _descriptor.MethodDescriptor( - name='HalfDuplexEcho', - full_name='grpc.gateway.testing.EchoService.HalfDuplexEcho', - index=7, - containing_service=None, - input_type=_ECHOREQUEST, - output_type=_ECHORESPONSE, - serialized_options=None, - ), -]) -_sym_db.RegisterServiceDescriptor(_ECHOSERVICE) - -DESCRIPTOR.services_by_name['EchoService'] = _ECHOSERVICE - -# @@protoc_insertion_point(module_scope) diff --git a/echo/echo/echo_pb2_grpc.py b/echo/echo/echo_pb2_grpc.py deleted file mode 100644 index 6e4ec81..0000000 --- a/echo/echo/echo_pb2_grpc.py +++ /dev/null @@ -1,175 +0,0 @@ -# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! -import grpc - -from echo import echo_pb2 as echo_dot_echo__pb2 - - -class EchoServiceStub(object): - """A simple echo service. - """ - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.Echo = channel.unary_unary( - '/grpc.gateway.testing.EchoService/Echo', - request_serializer=echo_dot_echo__pb2.EchoRequest.SerializeToString, - response_deserializer=echo_dot_echo__pb2.EchoResponse.FromString, - ) - self.EchoAbort = channel.unary_unary( - '/grpc.gateway.testing.EchoService/EchoAbort', - request_serializer=echo_dot_echo__pb2.EchoRequest.SerializeToString, - response_deserializer=echo_dot_echo__pb2.EchoResponse.FromString, - ) - self.NoOp = channel.unary_unary( - '/grpc.gateway.testing.EchoService/NoOp', - request_serializer=echo_dot_echo__pb2.Empty.SerializeToString, - response_deserializer=echo_dot_echo__pb2.Empty.FromString, - ) - self.ServerStreamingEcho = channel.unary_stream( - '/grpc.gateway.testing.EchoService/ServerStreamingEcho', - request_serializer=echo_dot_echo__pb2.ServerStreamingEchoRequest.SerializeToString, - response_deserializer=echo_dot_echo__pb2.ServerStreamingEchoResponse.FromString, - ) - self.ServerStreamingEchoAbort = channel.unary_stream( - '/grpc.gateway.testing.EchoService/ServerStreamingEchoAbort', - request_serializer=echo_dot_echo__pb2.ServerStreamingEchoRequest.SerializeToString, - response_deserializer=echo_dot_echo__pb2.ServerStreamingEchoResponse.FromString, - ) - self.ClientStreamingEcho = channel.stream_unary( - '/grpc.gateway.testing.EchoService/ClientStreamingEcho', - request_serializer=echo_dot_echo__pb2.ClientStreamingEchoRequest.SerializeToString, - response_deserializer=echo_dot_echo__pb2.ClientStreamingEchoResponse.FromString, - ) - self.FullDuplexEcho = channel.stream_stream( - '/grpc.gateway.testing.EchoService/FullDuplexEcho', - request_serializer=echo_dot_echo__pb2.EchoRequest.SerializeToString, - response_deserializer=echo_dot_echo__pb2.EchoResponse.FromString, - ) - self.HalfDuplexEcho = channel.stream_stream( - '/grpc.gateway.testing.EchoService/HalfDuplexEcho', - request_serializer=echo_dot_echo__pb2.EchoRequest.SerializeToString, - response_deserializer=echo_dot_echo__pb2.EchoResponse.FromString, - ) - - -class EchoServiceServicer(object): - """A simple echo service. - """ - - def Echo(self, request, context): - """One request followed by one response - The server returns the client message as-is. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def EchoAbort(self, request, context): - """Sends back abort status. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def NoOp(self, request, context): - """One empty request, ZERO processing, followed by one empty response - (minimum effort to do message serialization). - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ServerStreamingEcho(self, request, context): - """One request followed by a sequence of responses (streamed download). - The server will return the same client message repeatedly. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ServerStreamingEchoAbort(self, request, context): - """One request followed by a sequence of responses (streamed download). - The server abort directly. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ClientStreamingEcho(self, request_iterator, context): - """A sequence of requests followed by one response (streamed upload). - The server returns the total number of messages as the result. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def FullDuplexEcho(self, request_iterator, context): - """A sequence of requests with each message echoed by the server immediately. - The server returns the same client messages in order. - E.g. this is how the speech API works. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def HalfDuplexEcho(self, request_iterator, context): - """A sequence of requests followed by a sequence of responses. - The server buffers all the client messages and then returns the same - client messages one by one after the client half-closes the stream. - This is how an image recognition API may work. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_EchoServiceServicer_to_server(servicer, server): - rpc_method_handlers = { - 'Echo': grpc.unary_unary_rpc_method_handler( - servicer.Echo, - request_deserializer=echo_dot_echo__pb2.EchoRequest.FromString, - response_serializer=echo_dot_echo__pb2.EchoResponse.SerializeToString, - ), - 'EchoAbort': grpc.unary_unary_rpc_method_handler( - servicer.EchoAbort, - request_deserializer=echo_dot_echo__pb2.EchoRequest.FromString, - response_serializer=echo_dot_echo__pb2.EchoResponse.SerializeToString, - ), - 'NoOp': grpc.unary_unary_rpc_method_handler( - servicer.NoOp, - request_deserializer=echo_dot_echo__pb2.Empty.FromString, - response_serializer=echo_dot_echo__pb2.Empty.SerializeToString, - ), - 'ServerStreamingEcho': grpc.unary_stream_rpc_method_handler( - servicer.ServerStreamingEcho, - request_deserializer=echo_dot_echo__pb2.ServerStreamingEchoRequest.FromString, - response_serializer=echo_dot_echo__pb2.ServerStreamingEchoResponse.SerializeToString, - ), - 'ServerStreamingEchoAbort': grpc.unary_stream_rpc_method_handler( - servicer.ServerStreamingEchoAbort, - request_deserializer=echo_dot_echo__pb2.ServerStreamingEchoRequest.FromString, - response_serializer=echo_dot_echo__pb2.ServerStreamingEchoResponse.SerializeToString, - ), - 'ClientStreamingEcho': grpc.stream_unary_rpc_method_handler( - servicer.ClientStreamingEcho, - request_deserializer=echo_dot_echo__pb2.ClientStreamingEchoRequest.FromString, - response_serializer=echo_dot_echo__pb2.ClientStreamingEchoResponse.SerializeToString, - ), - 'FullDuplexEcho': grpc.stream_stream_rpc_method_handler( - servicer.FullDuplexEcho, - request_deserializer=echo_dot_echo__pb2.EchoRequest.FromString, - response_serializer=echo_dot_echo__pb2.EchoResponse.SerializeToString, - ), - 'HalfDuplexEcho': grpc.stream_stream_rpc_method_handler( - servicer.HalfDuplexEcho, - request_deserializer=echo_dot_echo__pb2.EchoRequest.FromString, - response_serializer=echo_dot_echo__pb2.EchoResponse.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'grpc.gateway.testing.EchoService', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) diff --git a/echo/echo_pb2.py b/echo/echo_pb2.py deleted file mode 100644 index 534694f..0000000 --- a/echo/echo_pb2.py +++ /dev/null @@ -1,408 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: echo/echo.proto - -import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) -from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection -from google.protobuf import symbol_database as _symbol_database -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - -from google.protobuf import duration_pb2 as google_dot_protobuf_dot_duration__pb2 - - -DESCRIPTOR = _descriptor.FileDescriptor( - name='echo/echo.proto', - package='grpc.gateway.testing', - syntax='proto3', - serialized_options=None, - serialized_pb=_b('\n\x0f\x65\x63ho/echo.proto\x12\x14grpc.gateway.testing\x1a\x1egoogle/protobuf/duration.proto\"\x07\n\x05\x45mpty\"\x1e\n\x0b\x45\x63hoRequest\x12\x0f\n\x07message\x18\x01 \x01(\t\"6\n\x0c\x45\x63hoResponse\x12\x0f\n\x07message\x18\x01 \x01(\t\x12\x15\n\rmessage_count\x18\x02 \x01(\x05\"y\n\x1aServerStreamingEchoRequest\x12\x0f\n\x07message\x18\x01 \x01(\t\x12\x15\n\rmessage_count\x18\x02 \x01(\x05\x12\x33\n\x10message_interval\x18\x03 \x01(\x0b\x32\x19.google.protobuf.Duration\".\n\x1bServerStreamingEchoResponse\x12\x0f\n\x07message\x18\x01 \x01(\t\"-\n\x1a\x43lientStreamingEchoRequest\x12\x0f\n\x07message\x18\x01 \x01(\t\"4\n\x1b\x43lientStreamingEchoResponse\x12\x15\n\rmessage_count\x18\x01 \x01(\x05\x32\xb0\x06\n\x0b\x45\x63hoService\x12M\n\x04\x45\x63ho\x12!.grpc.gateway.testing.EchoRequest\x1a\".grpc.gateway.testing.EchoResponse\x12T\n\tEchoAbort\x12!.grpc.gateway.testing.EchoRequest\x1a\".grpc.gateway.testing.EchoResponse\"\x00\x12@\n\x04NoOp\x12\x1b.grpc.gateway.testing.Empty\x1a\x1b.grpc.gateway.testing.Empty\x12|\n\x13ServerStreamingEcho\x12\x30.grpc.gateway.testing.ServerStreamingEchoRequest\x1a\x31.grpc.gateway.testing.ServerStreamingEchoResponse0\x01\x12\x83\x01\n\x18ServerStreamingEchoAbort\x12\x30.grpc.gateway.testing.ServerStreamingEchoRequest\x1a\x31.grpc.gateway.testing.ServerStreamingEchoResponse\"\x00\x30\x01\x12|\n\x13\x43lientStreamingEcho\x12\x30.grpc.gateway.testing.ClientStreamingEchoRequest\x1a\x31.grpc.gateway.testing.ClientStreamingEchoResponse(\x01\x12[\n\x0e\x46ullDuplexEcho\x12!.grpc.gateway.testing.EchoRequest\x1a\".grpc.gateway.testing.EchoResponse(\x01\x30\x01\x12[\n\x0eHalfDuplexEcho\x12!.grpc.gateway.testing.EchoRequest\x1a\".grpc.gateway.testing.EchoResponse(\x01\x30\x01\x62\x06proto3') - , - dependencies=[google_dot_protobuf_dot_duration__pb2.DESCRIPTOR,]) - - - - -_EMPTY = _descriptor.Descriptor( - name='Empty', - full_name='grpc.gateway.testing.Empty', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=73, - serialized_end=80, -) - - -_ECHOREQUEST = _descriptor.Descriptor( - name='EchoRequest', - full_name='grpc.gateway.testing.EchoRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='message', full_name='grpc.gateway.testing.EchoRequest.message', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=82, - serialized_end=112, -) - - -_ECHORESPONSE = _descriptor.Descriptor( - name='EchoResponse', - full_name='grpc.gateway.testing.EchoResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='message', full_name='grpc.gateway.testing.EchoResponse.message', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='message_count', full_name='grpc.gateway.testing.EchoResponse.message_count', index=1, - number=2, type=5, cpp_type=1, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=114, - serialized_end=168, -) - - -_SERVERSTREAMINGECHOREQUEST = _descriptor.Descriptor( - name='ServerStreamingEchoRequest', - full_name='grpc.gateway.testing.ServerStreamingEchoRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='message', full_name='grpc.gateway.testing.ServerStreamingEchoRequest.message', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='message_count', full_name='grpc.gateway.testing.ServerStreamingEchoRequest.message_count', index=1, - number=2, type=5, cpp_type=1, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='message_interval', full_name='grpc.gateway.testing.ServerStreamingEchoRequest.message_interval', index=2, - number=3, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=170, - serialized_end=291, -) - - -_SERVERSTREAMINGECHORESPONSE = _descriptor.Descriptor( - name='ServerStreamingEchoResponse', - full_name='grpc.gateway.testing.ServerStreamingEchoResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='message', full_name='grpc.gateway.testing.ServerStreamingEchoResponse.message', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=293, - serialized_end=339, -) - - -_CLIENTSTREAMINGECHOREQUEST = _descriptor.Descriptor( - name='ClientStreamingEchoRequest', - full_name='grpc.gateway.testing.ClientStreamingEchoRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='message', full_name='grpc.gateway.testing.ClientStreamingEchoRequest.message', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=341, - serialized_end=386, -) - - -_CLIENTSTREAMINGECHORESPONSE = _descriptor.Descriptor( - name='ClientStreamingEchoResponse', - full_name='grpc.gateway.testing.ClientStreamingEchoResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='message_count', full_name='grpc.gateway.testing.ClientStreamingEchoResponse.message_count', index=0, - number=1, type=5, cpp_type=1, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=388, - serialized_end=440, -) - -_SERVERSTREAMINGECHOREQUEST.fields_by_name['message_interval'].message_type = google_dot_protobuf_dot_duration__pb2._DURATION -DESCRIPTOR.message_types_by_name['Empty'] = _EMPTY -DESCRIPTOR.message_types_by_name['EchoRequest'] = _ECHOREQUEST -DESCRIPTOR.message_types_by_name['EchoResponse'] = _ECHORESPONSE -DESCRIPTOR.message_types_by_name['ServerStreamingEchoRequest'] = _SERVERSTREAMINGECHOREQUEST -DESCRIPTOR.message_types_by_name['ServerStreamingEchoResponse'] = _SERVERSTREAMINGECHORESPONSE -DESCRIPTOR.message_types_by_name['ClientStreamingEchoRequest'] = _CLIENTSTREAMINGECHOREQUEST -DESCRIPTOR.message_types_by_name['ClientStreamingEchoResponse'] = _CLIENTSTREAMINGECHORESPONSE -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - -Empty = _reflection.GeneratedProtocolMessageType('Empty', (_message.Message,), dict( - DESCRIPTOR = _EMPTY, - __module__ = 'echo.echo_pb2' - # @@protoc_insertion_point(class_scope:grpc.gateway.testing.Empty) - )) -_sym_db.RegisterMessage(Empty) - -EchoRequest = _reflection.GeneratedProtocolMessageType('EchoRequest', (_message.Message,), dict( - DESCRIPTOR = _ECHOREQUEST, - __module__ = 'echo.echo_pb2' - # @@protoc_insertion_point(class_scope:grpc.gateway.testing.EchoRequest) - )) -_sym_db.RegisterMessage(EchoRequest) - -EchoResponse = _reflection.GeneratedProtocolMessageType('EchoResponse', (_message.Message,), dict( - DESCRIPTOR = _ECHORESPONSE, - __module__ = 'echo.echo_pb2' - # @@protoc_insertion_point(class_scope:grpc.gateway.testing.EchoResponse) - )) -_sym_db.RegisterMessage(EchoResponse) - -ServerStreamingEchoRequest = _reflection.GeneratedProtocolMessageType('ServerStreamingEchoRequest', (_message.Message,), dict( - DESCRIPTOR = _SERVERSTREAMINGECHOREQUEST, - __module__ = 'echo.echo_pb2' - # @@protoc_insertion_point(class_scope:grpc.gateway.testing.ServerStreamingEchoRequest) - )) -_sym_db.RegisterMessage(ServerStreamingEchoRequest) - -ServerStreamingEchoResponse = _reflection.GeneratedProtocolMessageType('ServerStreamingEchoResponse', (_message.Message,), dict( - DESCRIPTOR = _SERVERSTREAMINGECHORESPONSE, - __module__ = 'echo.echo_pb2' - # @@protoc_insertion_point(class_scope:grpc.gateway.testing.ServerStreamingEchoResponse) - )) -_sym_db.RegisterMessage(ServerStreamingEchoResponse) - -ClientStreamingEchoRequest = _reflection.GeneratedProtocolMessageType('ClientStreamingEchoRequest', (_message.Message,), dict( - DESCRIPTOR = _CLIENTSTREAMINGECHOREQUEST, - __module__ = 'echo.echo_pb2' - # @@protoc_insertion_point(class_scope:grpc.gateway.testing.ClientStreamingEchoRequest) - )) -_sym_db.RegisterMessage(ClientStreamingEchoRequest) - -ClientStreamingEchoResponse = _reflection.GeneratedProtocolMessageType('ClientStreamingEchoResponse', (_message.Message,), dict( - DESCRIPTOR = _CLIENTSTREAMINGECHORESPONSE, - __module__ = 'echo.echo_pb2' - # @@protoc_insertion_point(class_scope:grpc.gateway.testing.ClientStreamingEchoResponse) - )) -_sym_db.RegisterMessage(ClientStreamingEchoResponse) - - - -_ECHOSERVICE = _descriptor.ServiceDescriptor( - name='EchoService', - full_name='grpc.gateway.testing.EchoService', - file=DESCRIPTOR, - index=0, - serialized_options=None, - serialized_start=443, - serialized_end=1259, - methods=[ - _descriptor.MethodDescriptor( - name='Echo', - full_name='grpc.gateway.testing.EchoService.Echo', - index=0, - containing_service=None, - input_type=_ECHOREQUEST, - output_type=_ECHORESPONSE, - serialized_options=None, - ), - _descriptor.MethodDescriptor( - name='EchoAbort', - full_name='grpc.gateway.testing.EchoService.EchoAbort', - index=1, - containing_service=None, - input_type=_ECHOREQUEST, - output_type=_ECHORESPONSE, - serialized_options=None, - ), - _descriptor.MethodDescriptor( - name='NoOp', - full_name='grpc.gateway.testing.EchoService.NoOp', - index=2, - containing_service=None, - input_type=_EMPTY, - output_type=_EMPTY, - serialized_options=None, - ), - _descriptor.MethodDescriptor( - name='ServerStreamingEcho', - full_name='grpc.gateway.testing.EchoService.ServerStreamingEcho', - index=3, - containing_service=None, - input_type=_SERVERSTREAMINGECHOREQUEST, - output_type=_SERVERSTREAMINGECHORESPONSE, - serialized_options=None, - ), - _descriptor.MethodDescriptor( - name='ServerStreamingEchoAbort', - full_name='grpc.gateway.testing.EchoService.ServerStreamingEchoAbort', - index=4, - containing_service=None, - input_type=_SERVERSTREAMINGECHOREQUEST, - output_type=_SERVERSTREAMINGECHORESPONSE, - serialized_options=None, - ), - _descriptor.MethodDescriptor( - name='ClientStreamingEcho', - full_name='grpc.gateway.testing.EchoService.ClientStreamingEcho', - index=5, - containing_service=None, - input_type=_CLIENTSTREAMINGECHOREQUEST, - output_type=_CLIENTSTREAMINGECHORESPONSE, - serialized_options=None, - ), - _descriptor.MethodDescriptor( - name='FullDuplexEcho', - full_name='grpc.gateway.testing.EchoService.FullDuplexEcho', - index=6, - containing_service=None, - input_type=_ECHOREQUEST, - output_type=_ECHORESPONSE, - serialized_options=None, - ), - _descriptor.MethodDescriptor( - name='HalfDuplexEcho', - full_name='grpc.gateway.testing.EchoService.HalfDuplexEcho', - index=7, - containing_service=None, - input_type=_ECHOREQUEST, - output_type=_ECHORESPONSE, - serialized_options=None, - ), -]) -_sym_db.RegisterServiceDescriptor(_ECHOSERVICE) - -DESCRIPTOR.services_by_name['EchoService'] = _ECHOSERVICE - -# @@protoc_insertion_point(module_scope) diff --git a/echo/echo_pb2_grpc.py b/echo/echo_pb2_grpc.py deleted file mode 100644 index 6e4ec81..0000000 --- a/echo/echo_pb2_grpc.py +++ /dev/null @@ -1,175 +0,0 @@ -# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! -import grpc - -from echo import echo_pb2 as echo_dot_echo__pb2 - - -class EchoServiceStub(object): - """A simple echo service. - """ - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.Echo = channel.unary_unary( - '/grpc.gateway.testing.EchoService/Echo', - request_serializer=echo_dot_echo__pb2.EchoRequest.SerializeToString, - response_deserializer=echo_dot_echo__pb2.EchoResponse.FromString, - ) - self.EchoAbort = channel.unary_unary( - '/grpc.gateway.testing.EchoService/EchoAbort', - request_serializer=echo_dot_echo__pb2.EchoRequest.SerializeToString, - response_deserializer=echo_dot_echo__pb2.EchoResponse.FromString, - ) - self.NoOp = channel.unary_unary( - '/grpc.gateway.testing.EchoService/NoOp', - request_serializer=echo_dot_echo__pb2.Empty.SerializeToString, - response_deserializer=echo_dot_echo__pb2.Empty.FromString, - ) - self.ServerStreamingEcho = channel.unary_stream( - '/grpc.gateway.testing.EchoService/ServerStreamingEcho', - request_serializer=echo_dot_echo__pb2.ServerStreamingEchoRequest.SerializeToString, - response_deserializer=echo_dot_echo__pb2.ServerStreamingEchoResponse.FromString, - ) - self.ServerStreamingEchoAbort = channel.unary_stream( - '/grpc.gateway.testing.EchoService/ServerStreamingEchoAbort', - request_serializer=echo_dot_echo__pb2.ServerStreamingEchoRequest.SerializeToString, - response_deserializer=echo_dot_echo__pb2.ServerStreamingEchoResponse.FromString, - ) - self.ClientStreamingEcho = channel.stream_unary( - '/grpc.gateway.testing.EchoService/ClientStreamingEcho', - request_serializer=echo_dot_echo__pb2.ClientStreamingEchoRequest.SerializeToString, - response_deserializer=echo_dot_echo__pb2.ClientStreamingEchoResponse.FromString, - ) - self.FullDuplexEcho = channel.stream_stream( - '/grpc.gateway.testing.EchoService/FullDuplexEcho', - request_serializer=echo_dot_echo__pb2.EchoRequest.SerializeToString, - response_deserializer=echo_dot_echo__pb2.EchoResponse.FromString, - ) - self.HalfDuplexEcho = channel.stream_stream( - '/grpc.gateway.testing.EchoService/HalfDuplexEcho', - request_serializer=echo_dot_echo__pb2.EchoRequest.SerializeToString, - response_deserializer=echo_dot_echo__pb2.EchoResponse.FromString, - ) - - -class EchoServiceServicer(object): - """A simple echo service. - """ - - def Echo(self, request, context): - """One request followed by one response - The server returns the client message as-is. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def EchoAbort(self, request, context): - """Sends back abort status. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def NoOp(self, request, context): - """One empty request, ZERO processing, followed by one empty response - (minimum effort to do message serialization). - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ServerStreamingEcho(self, request, context): - """One request followed by a sequence of responses (streamed download). - The server will return the same client message repeatedly. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ServerStreamingEchoAbort(self, request, context): - """One request followed by a sequence of responses (streamed download). - The server abort directly. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ClientStreamingEcho(self, request_iterator, context): - """A sequence of requests followed by one response (streamed upload). - The server returns the total number of messages as the result. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def FullDuplexEcho(self, request_iterator, context): - """A sequence of requests with each message echoed by the server immediately. - The server returns the same client messages in order. - E.g. this is how the speech API works. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def HalfDuplexEcho(self, request_iterator, context): - """A sequence of requests followed by a sequence of responses. - The server buffers all the client messages and then returns the same - client messages one by one after the client half-closes the stream. - This is how an image recognition API may work. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_EchoServiceServicer_to_server(servicer, server): - rpc_method_handlers = { - 'Echo': grpc.unary_unary_rpc_method_handler( - servicer.Echo, - request_deserializer=echo_dot_echo__pb2.EchoRequest.FromString, - response_serializer=echo_dot_echo__pb2.EchoResponse.SerializeToString, - ), - 'EchoAbort': grpc.unary_unary_rpc_method_handler( - servicer.EchoAbort, - request_deserializer=echo_dot_echo__pb2.EchoRequest.FromString, - response_serializer=echo_dot_echo__pb2.EchoResponse.SerializeToString, - ), - 'NoOp': grpc.unary_unary_rpc_method_handler( - servicer.NoOp, - request_deserializer=echo_dot_echo__pb2.Empty.FromString, - response_serializer=echo_dot_echo__pb2.Empty.SerializeToString, - ), - 'ServerStreamingEcho': grpc.unary_stream_rpc_method_handler( - servicer.ServerStreamingEcho, - request_deserializer=echo_dot_echo__pb2.ServerStreamingEchoRequest.FromString, - response_serializer=echo_dot_echo__pb2.ServerStreamingEchoResponse.SerializeToString, - ), - 'ServerStreamingEchoAbort': grpc.unary_stream_rpc_method_handler( - servicer.ServerStreamingEchoAbort, - request_deserializer=echo_dot_echo__pb2.ServerStreamingEchoRequest.FromString, - response_serializer=echo_dot_echo__pb2.ServerStreamingEchoResponse.SerializeToString, - ), - 'ClientStreamingEcho': grpc.stream_unary_rpc_method_handler( - servicer.ClientStreamingEcho, - request_deserializer=echo_dot_echo__pb2.ClientStreamingEchoRequest.FromString, - response_serializer=echo_dot_echo__pb2.ClientStreamingEchoResponse.SerializeToString, - ), - 'FullDuplexEcho': grpc.stream_stream_rpc_method_handler( - servicer.FullDuplexEcho, - request_deserializer=echo_dot_echo__pb2.EchoRequest.FromString, - response_serializer=echo_dot_echo__pb2.EchoResponse.SerializeToString, - ), - 'HalfDuplexEcho': grpc.stream_stream_rpc_method_handler( - servicer.HalfDuplexEcho, - request_deserializer=echo_dot_echo__pb2.EchoRequest.FromString, - response_serializer=echo_dot_echo__pb2.EchoResponse.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'grpc.gateway.testing.EchoService', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) diff --git a/echo/wsgi.py b/echo/wsgi.py deleted file mode 100644 index 0a41548..0000000 --- a/echo/wsgi.py +++ /dev/null @@ -1,38 +0,0 @@ -import sys -from wsgiref.simple_server import make_server -import sonora.wsgi - -from echo import echo_pb2 -from echo import echo_pb2_grpc - -import grpc - - -class Echo(echo_pb2_grpc.EchoServiceServicer): - def Echo(self, request, context): - return echo_pb2.EchoResponse(message=request.message) - - def EchoAbort(self, request, context): - context.set_code(grpc.StatusCode.ABORTED) - return echo_pb2.EchoResponse(message=request.message) - - def ServerStreamingEcho(self, request, context): - for _ in range(request.message_count): - yield echo_pb2.EchoResponse(message=request.message) - - def ServerStreamingEchoAbort(self, request, context): - for _ in range(request.message_count // 2): - yield echo_pb2.EchoResponse(message=request.message) - context.set_code(grpc.StatusCode.ABORTED) - - -def main(args): - grpc_wsgi_app = sonora.wsgi.grpcWSGI(None) - - with make_server("", 8080, grpc_wsgi_app) as httpd: - echo_pb2_grpc.add_EchoServiceServicer_to_server(Echo(), grpc_wsgi_app) - httpd.serve_forever() - - -if __name__ == "__main__": - sys.exit(main(sys.argv)) diff --git a/sonora/asgi.py b/sonora/asgi.py index 3a3e3a0..93012cc 100644 --- a/sonora/asgi.py +++ b/sonora/asgi.py @@ -35,13 +35,14 @@ async def __call__(self, scope, receive, send): if rpc_method: if request_method == "POST": context = self._create_context(scope) + try: async with timeout(context.time_remaining()): await self._do_grpc_request(rpc_method, context, receive, send) except asyncio.TimeoutError: context.code = grpc.StatusCode.DEADLINE_EXCEEDED context.details = "request timed out at the server" - await self._do_grpc_error(context, send) + await self._do_grpc_error(send, context) elif request_method == "OPTIONS": await self._do_cors_preflight(scope, receive, send) @@ -87,10 +88,9 @@ def _create_context(self, scope): return ServicerContext(timeout, metadata) async def _do_grpc_request(self, rpc_method, context, receive, send): - request_proto_iterator = ( - rpc_method.request_deserializer(message) - async for _, _, message in protocol.unwrap_message_asgi(receive) - ) + headers = context._response_headers + wrap_message = context._wrap_message + unwrap_message = context._unwrap_message if not rpc_method.request_streaming and not rpc_method.response_streaming: method = rpc_method.unary_unary @@ -103,40 +103,48 @@ async def _do_grpc_request(self, rpc_method, context, receive, send): else: raise NotImplementedError - if rpc_method.request_streaming: - coroutine = method(request_proto_iterator, context) - else: - request_proto = await anext(request_proto_iterator) - coroutine = method(request_proto, context) + request_proto_iterator = ( + rpc_method.request_deserializer(message) + async for _, _, message in unwrap_message(receive) + ) - headers = [ - (b"Content-Type", b"application/grpc-web+proto"), - (b"Access-Control-Allow-Origin", b"*"), - (b"Access-Control-Expose-Headers", b"*"), - ] + try: + if rpc_method.request_streaming: + coroutine = method(request_proto_iterator, context) + else: + request_proto = await anext( + request_proto_iterator, None + ) or rpc_method.request_deserializer(b"") + coroutine = method(request_proto, context) + except NotImplementedError: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + coroutine = None try: if rpc_method.response_streaming: await self._do_streaming_response( - rpc_method, receive, send, context, headers, coroutine + rpc_method, receive, send, wrap_message, context, coroutine ) else: await self._do_unary_response( - rpc_method, receive, send, context, headers, coroutine + rpc_method, receive, send, wrap_message, context, coroutine ) except grpc.RpcError: - await self._do_grpc_error(context, send) + await self._do_grpc_error(send, context) async def _do_streaming_response( - self, rpc_method, receive, send, context, headers, coroutine + self, rpc_method, receive, send, wrap_message, context, coroutine ): - message = await anext(coroutine) + headers = context._response_headers - status = protocol.grpc_status_to_http_status(context.code) + if coroutine: + message = await anext(coroutine) + else: + message = b"" - body = protocol.wrap_message( - False, False, rpc_method.response_serializer(message) - ) + status = 200 + + body = wrap_message(False, False, rpc_method.response_serializer(message)) if context._initial_metadata: headers.extend(context._initial_metadata) @@ -148,9 +156,7 @@ async def _do_streaming_response( await send({"type": "http.response.body", "body": body, "more_body": True}) async for message in coroutine: - body = protocol.wrap_message( - False, False, rpc_method.response_serializer(message) - ) + body = wrap_message(False, False, rpc_method.response_serializer(message)) send_task = asyncio.create_task( send({"type": "http.response.body", "body": body, "more_body": True}) @@ -178,24 +184,32 @@ async def _do_streaming_response( trailers.extend(context._trailing_metadata) trailer_message = protocol.pack_trailers(trailers) - body = protocol.wrap_message(True, False, trailer_message) + body = wrap_message(True, False, trailer_message) await send({"type": "http.response.body", "body": body, "more_body": False}) async def _do_unary_response( - self, rpc_method, receive, send, context, headers, coroutine + self, rpc_method, receive, send, wrap_message, context, coroutine ): - message = await coroutine + headers = context._response_headers + + if coroutine is None: + message = None + else: + message = await coroutine + + status = 200 - status = protocol.grpc_status_to_http_status(context.code) headers.append((b"grpc-status", str(context.code.value[0]).encode())) if context.details: - headers.append((b"grpc-message", quote(context.details))) + headers.append( + (b"grpc-message", quote(context.details.encode("utf8")).encode("ascii")) + ) if context._initial_metadata: headers.extend(context._initial_metadata) if message is not None: - message_data = protocol.wrap_message( + message_data = wrap_message( False, False, rpc_method.response_serializer(message) ) else: @@ -203,8 +217,9 @@ async def _do_unary_response( if context._trailing_metadata: trailers = context._trailing_metadata + trailer_message = protocol.pack_trailers(trailers) - trailer_data = protocol.wrap_message(True, False, trailer_message) + trailer_data = wrap_message(True, False, trailer_message) else: trailer_data = b"" @@ -222,18 +237,15 @@ async def _do_unary_response( {"type": "http.response.body", "body": trailer_data, "more_body": False} ) - async def _do_grpc_error(self, context, send): - headers = [ - (b"Content-Type", b"application/grpc-web+proto"), - (b"Access-Control-Allow-Origin", b"*"), - (b"Access-Control-Expose-Headers", b"*"), - ] - - status = protocol.grpc_status_to_http_status(context.code) + async def _do_grpc_error(self, send, context): + status = 200 + headers = context._response_headers headers.append((b"grpc-status", str(context.code.value[0]).encode())) if context.details: - headers.append((b"grpc-message", quote(context.details).encode())) + headers.append( + (b"grpc-message", quote(context.details.encode("utf8")).encode("ascii")) + ) await send( {"type": "http.response.start", "status": status, "headers": headers} @@ -241,6 +253,11 @@ async def _do_grpc_error(self, context, send): await send({"type": "http.response.body", "body": b"", "more_body": False}) async def _do_cors_preflight(self, scope, receive, send): + origin = next( + (value for header, value in scope["headers"] if header == "host"), + scope["server"][0], + ) + await send( { "type": "http.response.start", @@ -250,7 +267,7 @@ async def _do_cors_preflight(self, scope, receive, send): (b"Content-Length", b"0"), (b"Access-Control-Allow-Methods", b"POST, OPTIONS"), (b"Access-Control-Allow-Headers", b"*"), - (b"Access-Control-Allow-Origin", b"*"), + (b"Access-Control-Allow-Origin", origin), (b"Access-Control-Allow-Credentials", b"true"), (b"Access-Control-Expose-Headers", b"*"), ], @@ -290,8 +307,46 @@ def __init__(self, timeout=None, metadata=None): self._initial_metadata = None self._trailing_metadata = None + response_content_type = "application/grpc-web+proto" + + self._wrap_message = protocol.wrap_message + self._unwrap_message = protocol.unwrap_message_asgi + origin = None + + for header, value in metadata: + if header == "content-type": + if value == "application/grpc-web-text": + self._wrap_message = protocol.b64_wrap_message + self._unwrap_message = protocol.b64_unwrap_message_asgi + elif header == "accept": + response_content_type = value.split(",")[0].strip() + elif header == "host": + origin = value + + if not origin: + raise ValueError("Request is missing the host header") + + self._response_headers = [ + (b"Content-Type", response_content_type.encode("ascii")), + (b"Access-Control-Allow-Origin", origin.encode("ascii")), + (b"Access-Control-Expose-Headers", b"*"), + ] + def set_code(self, code): - self.code = code + if isinstance(code, grpc.StatusCode): + self.code = code + + elif isinstance(code, int): + for status_code in grpc.StatusCode: + if status_code.value[0] == code: + self.code = status_code + break + else: + raise ValueError(f"Unknown StatusCode: {code}") + else: + raise NotImplementedError( + f"Unsupported status code type: {type(code)} with value {code}" + ) def set_details(self, details): self.details = details @@ -315,11 +370,11 @@ async def abort_with_status(self, status): async def send_initial_metadata(self, initial_metadata): self._initial_metadata = [ - (key.encode("ascii"), value.encode("ascii")) + (key.encode("ascii"), value.encode("utf8")) for key, value in protocol.encode_headers(initial_metadata) ] - async def set_trailing_metadata(self, trailing_metadata): + def set_trailing_metadata(self, trailing_metadata): self._trailing_metadata = protocol.encode_headers(trailing_metadata) def invocation_metadata(self): diff --git a/sonora/client.py b/sonora/client.py index 4cbc27f..fc0b52c 100644 --- a/sonora/client.py +++ b/sonora/client.py @@ -54,7 +54,10 @@ def __init__(self, session, url, path, request_serializer, request_deserializer) self._path = path self._rpc_url = urljoin(url, path) - self._metadata = [("x-user-agent", "grpc-web-python/0.1")] + self._metadata = [ + ("x-user-agent", "grpc-web-python/0.1"), + ("content-type", "application/grpc-web+proto"), + ] self._serializer = request_serializer self._deserializer = request_deserializer diff --git a/sonora/protocol.py b/sonora/protocol.py index a1c9d9f..d745dd1 100644 --- a/sonora/protocol.py +++ b/sonora/protocol.py @@ -1,4 +1,5 @@ import base64 +import functools import struct from urllib.parse import unquote @@ -29,6 +30,10 @@ def wrap_message(trailers, compressed, message): ) +def b64_wrap_message(trailers, compressed, message): + return base64.b64encode(wrap_message(trailers, compressed, message)) + + def unwrap_message(message): flags, length = struct.unpack(_HEADER_FORMAT, message[:_HEADER_LENGTH]) data = message[_HEADER_LENGTH : _HEADER_LENGTH + length] @@ -41,6 +46,10 @@ def unwrap_message(message): return trailers, compressed, data +def b64_unwrap_message(message): + return unwrap_message(base64.b64decode(message)) + + def unwrap_message_stream(stream): data = stream.read(_HEADER_LENGTH) @@ -71,7 +80,7 @@ async def unwrap_message_stream_async(stream): data = await stream.readexactly(_HEADER_LENGTH) -async def unwrap_message_asgi(receive): +async def unwrap_message_asgi(receive, decoder=None): buffer = bytearray() waiting = False flags = None @@ -81,7 +90,12 @@ async def unwrap_message_asgi(receive): event = await receive() assert event["type"].startswith("http.") - buffer += event["body"] + if decoder: + chunk = decoder(event["body"]) + else: + chunk = event["body"] + + buffer += chunk if len(buffer) >= _HEADER_LENGTH: if not waiting: @@ -91,6 +105,7 @@ async def unwrap_message_asgi(receive): waiting = False data = buffer[_HEADER_LENGTH : _HEADER_LENGTH + length] trailers, compressed = _unpack_header_flags(flags) + yield trailers, compressed, data buffer = buffer[_HEADER_LENGTH + length :] else: @@ -100,6 +115,11 @@ async def unwrap_message_asgi(receive): break +b64_unwrap_message_asgi = functools.partial( + unwrap_message_asgi, decoder=base64.b64decode +) + + def pack_trailers(trailers): message = [] for k, v in trailers: @@ -132,27 +152,6 @@ def encode_headers(metadata): yield header, value -def grpc_status_to_http_status(code): - if code == grpc.StatusCode.OK: - return 200 - elif code is None: - return 200 - elif code == grpc.StatusCode.UNKNOWN: - return 500 - elif code == grpc.StatusCode.INTERNAL: - return 500 - elif code == grpc.StatusCode.UNAVAILABLE: - return 503 - elif code == grpc.StatusCode.INVALID_ARGUMENT: - return 400 - elif code == grpc.StatusCode.UNIMPLEMENTED: - return 404 - elif code == grpc.StatusCode.PERMISSION_DENIED: - return 403 - else: - return 500 - - class WebRpcError(grpc.RpcError): _code_to_enum = {code.value[0]: code for code in grpc.StatusCode} diff --git a/sonora/wsgi.py b/sonora/wsgi.py index 09249f7..099da3d 100644 --- a/sonora/wsgi.py +++ b/sonora/wsgi.py @@ -76,7 +76,11 @@ def _do_grpc_request(self, rpc_method, environ, start_response): context = self._create_context(environ) - _, _, message = protocol.unwrap_message(request_data) + if environ["CONTENT_TYPE"] == "application/grpc-web-text": + _, _, message = protocol.b64_unwrap_message(request_data) + else: + _, _, message = protocol.unwrap_message(request_data) + request_proto = rpc_method.request_deserializer(message) resp = None @@ -92,27 +96,42 @@ def _do_grpc_request(self, rpc_method, environ, start_response): raise NotImplementedError() except grpc.RpcError: pass + except NotImplementedError: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + + response_content_type = ( + environ.get("HTTP_ACCEPT", "application/grpc-web+proto") + .split(",")[0] + .strip() + ) headers = [ - ("Content-Type", "application/grpc-web+proto"), - ("Access-Control-Allow-Origin", "*"), + ("Content-Type", response_content_type), + ( + "Access-Control-Allow-Origin", + environ.get("HTTP_HOST") or environ["SERVER_NAME"], + ), ("Access-Control-Expose-Headers", "*"), ] + if response_content_type == "application/grpc-web-text": + wrap_message = protocol.b64_wrap_message + else: + wrap_message = protocol.wrap_message + if rpc_method.response_streaming: yield from self._do_streaming_response( - rpc_method, start_response, context, headers, resp + rpc_method, start_response, wrap_message, context, headers, resp ) else: yield from self._do_unary_response( - rpc_method, start_response, context, headers, resp + rpc_method, start_response, wrap_message, context, headers, resp ) def _do_streaming_response( - self, rpc_method, start_response, context, headers, resp + self, rpc_method, start_response, wrap_message, context, headers, resp ): - try: first_message = next(resp) except grpc.RpcError: @@ -121,15 +140,13 @@ def _do_streaming_response( if context._initial_metadata: headers.extend(protocol.encode_headers(context._initial_metadata)) - start_response(_grpc_status_to_wsgi_status(context.code), headers) + start_response("200 OK", headers) - yield protocol.wrap_message( - False, False, rpc_method.response_serializer(first_message) - ) + yield wrap_message(False, False, rpc_method.response_serializer(first_message)) try: for message in resp: - yield protocol.wrap_message( + yield wrap_message( False, False, rpc_method.response_serializer(message) ) except grpc.RpcError: @@ -138,18 +155,20 @@ def _do_streaming_response( trailers = [("grpc-status", str(context.code.value[0]))] if context.details: - trailers.append(("grpc-message", quote(context.details))) + trailers.append(("grpc-message", quote(context.details.encode("utf8")))) if context._trailing_metadata: trailers.extend(protocol.encode_headers(context._trailing_metadata)) trailer_message = protocol.pack_trailers(trailers) - yield protocol.wrap_message(True, False, trailer_message) + yield wrap_message(True, False, trailer_message) - def _do_unary_response(self, rpc_method, start_response, context, headers, resp): + def _do_unary_response( + self, rpc_method, start_response, wrap_message, context, headers, resp + ): if resp: - message_data = protocol.wrap_message( + message_data = wrap_message( False, False, rpc_method.response_serializer(resp) ) else: @@ -158,7 +177,7 @@ def _do_unary_response(self, rpc_method, start_response, context, headers, resp) if context._trailing_metadata: trailers = protocol.encode_headers(context._trailing_metadata) trailer_message = protocol.pack_trailers(trailers) - trailer_data = protocol.wrap_message(True, False, trailer_message) + trailer_data = wrap_message(True, False, trailer_message) else: trailer_data = b"" @@ -169,12 +188,13 @@ def _do_unary_response(self, rpc_method, start_response, context, headers, resp) headers.append(("grpc-status", str(context.code.value[0]))) if context.details: - headers.append(("grpc-message", quote(context.details))) + headers.append(("grpc-message", quote(context.details.encode("utf8")))) if context._initial_metadata: headers.extend(protocol.encode_headers(context._initial_metadata)) - start_response(_grpc_status_to_wsgi_status(context.code), headers) + start_response("200 OK", headers) + yield message_data yield trailer_data @@ -186,7 +206,10 @@ def _do_cors_preflight(self, environ, start_response): ("Content-Length", "0"), ("Access-Control-Allow-Methods", "POST, OPTIONS"), ("Access-Control-Allow-Headers", "*"), - ("Access-Control-Allow-Origin", "*"), + ( + "Access-Control-Allow-Origin", + environ.get("HTTP_HOST") or environ["SERVER_NAME"], + ), ("Access-Control-Allow-Credentials", "true"), ("Access-Control-Expose-Headers", "*"), ], @@ -274,7 +297,20 @@ def __init__(self, timeout=None, metadata=None): self._trailing_metadata = None def set_code(self, code): - self.code = code + if isinstance(code, grpc.StatusCode): + self.code = code + + elif isinstance(code, int): + for status_code in grpc.StatusCode: + if status_code.value[0] == code: + self.code = status_code + break + else: + raise ValueError(f"Unknown StatusCode: {code}") + else: + raise NotImplementedError( + f"Unsupported status code type: {type(code)} with value {code}" + ) def set_details(self, details): self.details = details @@ -333,27 +369,6 @@ def is_active(self): raise NotImplementedError() -def _grpc_status_to_wsgi_status(code): - if code == grpc.StatusCode.OK: - return "200 OK" - elif code is None: - return "200 OK" - elif code == grpc.StatusCode.UNKNOWN: - return "500 Internal Server Error" - elif code == grpc.StatusCode.INTERNAL: - return "500 Internal Server Error" - elif code == grpc.StatusCode.UNAVAILABLE: - return "503 Service Unavailable" - elif code == grpc.StatusCode.INVALID_ARGUMENT: - return "400 Bad Request" - elif code == grpc.StatusCode.UNIMPLEMENTED: - return "404 Not Found" - elif code == grpc.StatusCode.PERMISSION_DENIED: - return "403 Forbidden" - else: - return "500 Internal Server Error" - - def _timeout_generator(context, gen): while 1: if context.time_remaining() > 0: diff --git a/echo/__init__.py b/test_server/__init__.py similarity index 100% rename from echo/__init__.py rename to test_server/__init__.py diff --git a/test_server/asgi.py b/test_server/asgi.py new file mode 100644 index 0000000..dffd26b --- /dev/null +++ b/test_server/asgi.py @@ -0,0 +1,102 @@ +import asyncio +import datetime +import sys +from wsgiref.simple_server import make_server + +import sonora.asgi + +import daphne.server + +from test_server import empty_pb2, messages_pb2, test_pb2_grpc + +_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial" +_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin" +_US_IN_A_SECOND = 1000 * 1000 +UNARY_CALL_WITH_SLEEP_VALUE = 0.2 + +async def _maybe_echo_metadata(servicer_context): + """Copies metadata from request to response if it is present.""" + invocation_metadata = dict(servicer_context.invocation_metadata()) + if _INITIAL_METADATA_KEY in invocation_metadata: + initial_metadatum = (_INITIAL_METADATA_KEY, + invocation_metadata[_INITIAL_METADATA_KEY]) + await servicer_context.send_initial_metadata((initial_metadatum,)) + + if _TRAILING_METADATA_KEY in invocation_metadata: + trailing_metadatum = (_TRAILING_METADATA_KEY, + invocation_metadata[_TRAILING_METADATA_KEY]) + servicer_context.set_trailing_metadata((trailing_metadatum,)) + + +async def _maybe_echo_status(request: messages_pb2.SimpleRequest, + servicer_context): + """Echos the RPC status if demanded by the request.""" + if request.HasField('response_status'): + await servicer_context.abort(request.response_status.code, + request.response_status.message) + + +class TestServiceServicer(test_pb2_grpc.TestServiceServicer): + + async def UnaryCall(self, request, context): + await _maybe_echo_metadata(context) + await _maybe_echo_status(request, context) + + return messages_pb2.SimpleResponse( + payload=messages_pb2.Payload(type=messages_pb2.COMPRESSABLE, + body=b'\x00' * request.response_size)) + + async def EmptyCall(self, request, context): + return empty_pb2.Empty() + + async def StreamingOutputCall( + self, request: messages_pb2.StreamingOutputCallRequest, + unused_context): + for response_parameters in request.response_parameters: + if response_parameters.interval_us != 0: + await asyncio.sleep( + datetime.timedelta(microseconds=response_parameters. + interval_us).total_seconds()) + if response_parameters.size != 0: + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload(type=request.response_type, + body=b'\x00' * + response_parameters.size)) + else: + yield messages_pb2.StreamingOutputCallResponse() + + # Next methods are extra ones that are registred programatically + # when the sever is instantiated. They are not being provided by + # the proto file. + async def UnaryCallWithSleep(self, unused_request, unused_context): + await asyncio.sleep(UNARY_CALL_WITH_SLEEP_VALUE) + return messages_pb2.SimpleResponse() + + async def StreamingInputCall(self, request_async_iterator, unused_context): + aggregate_size = 0 + async for request in request_async_iterator: + if request.payload is not None and request.payload.body: + aggregate_size += len(request.payload.body) + return messages_pb2.StreamingInputCallResponse( + aggregated_payload_size=aggregate_size) + + async def FullDuplexCall(self, request_async_iterator, context): + await _maybe_echo_metadata(context) + async for request in request_async_iterator: + await _maybe_echo_status(request, context) + for response_parameters in request.response_parameters: + if response_parameters.interval_us != 0: + await asyncio.sleep( + datetime.timedelta(microseconds=response_parameters. + interval_us).total_seconds()) + if response_parameters.size != 0: + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload(type=request.payload.type, + body=b'\x00' * + response_parameters.size)) + else: + yield messages_pb2.StreamingOutputCallResponse() + + +application = sonora.asgi.grpcASGI() +test_pb2_grpc.add_TestServiceServicer_to_server(TestServiceServicer(), application) diff --git a/test_server/empty.proto b/test_server/empty.proto new file mode 100644 index 0000000..6a0aa88 --- /dev/null +++ b/test_server/empty.proto @@ -0,0 +1,28 @@ + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package grpc.testing; + +// An empty message that you can re-use to avoid defining duplicated empty +// messages in your project. A typical example is to use it as argument or the +// return value of a service API. For instance: +// +// service Foo { +// rpc Bar (grpc.testing.Empty) returns (grpc.testing.Empty) { }; +// }; +// +message Empty {} diff --git a/test_server/messages.proto b/test_server/messages.proto new file mode 100644 index 0000000..5993bc6 --- /dev/null +++ b/test_server/messages.proto @@ -0,0 +1,209 @@ + +// Copyright 2015-2016 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Message definitions to be used by integration test service definitions. + +syntax = "proto3"; + +package grpc.testing; + +// TODO(dgq): Go back to using well-known types once +// https://github.com/grpc/grpc/issues/6980 has been fixed. +// import "google/protobuf/wrappers.proto"; +message BoolValue { + // The bool value. + bool value = 1; +} + +// The type of payload that should be returned. +enum PayloadType { + // Compressable text format. + COMPRESSABLE = 0; +} + +// A block of data, to simply increase gRPC message size. +message Payload { + // The type of data in body. + PayloadType type = 1; + // Primary contents of payload. + bytes body = 2; +} + +// A protobuf representation for grpc status. This is used by test +// clients to specify a status that the server should attempt to return. +message EchoStatus { + int32 code = 1; + string message = 2; +} + +// The type of route that a client took to reach a server w.r.t. gRPCLB. +// The server must fill in "fallback" if it detects that the RPC reached +// the server via the "gRPCLB fallback" path, and "backend" if it detects +// that the RPC reached the server via "gRPCLB backend" path (i.e. if it got +// the address of this server from the gRPCLB server BalanceLoad RPC). Exactly +// how this detection is done is context and server dependent. +enum GrpclbRouteType { + // Server didn't detect the route that a client took to reach it. + GRPCLB_ROUTE_TYPE_UNKNOWN = 0; + // Indicates that a client reached a server via gRPCLB fallback. + GRPCLB_ROUTE_TYPE_FALLBACK = 1; + // Indicates that a client reached a server as a gRPCLB-given backend. + GRPCLB_ROUTE_TYPE_BACKEND = 2; +} + +// Unary request. +message SimpleRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, server randomly chooses one from other formats. + PayloadType response_type = 1; + + // Desired payload size in the response from the server. + int32 response_size = 2; + + // Optional input payload sent along with the request. + Payload payload = 3; + + // Whether SimpleResponse should include username. + bool fill_username = 4; + + // Whether SimpleResponse should include OAuth scope. + bool fill_oauth_scope = 5; + + // Whether to request the server to compress the response. This field is + // "nullable" in order to interoperate seamlessly with clients not able to + // implement the full compression tests by introspecting the call to verify + // the response's compression status. + BoolValue response_compressed = 6; + + // Whether server should return a given status + EchoStatus response_status = 7; + + // Whether the server should expect this request to be compressed. + BoolValue expect_compressed = 8; + + // Whether SimpleResponse should include server_id. + bool fill_server_id = 9; + + // Whether SimpleResponse should include grpclb_route_type. + bool fill_grpclb_route_type = 10; +} + +// Unary response, as configured by the request. +message SimpleResponse { + // Payload to increase message size. + Payload payload = 1; + // The user the request came from, for verifying authentication was + // successful when the client expected it. + string username = 2; + // OAuth scope. + string oauth_scope = 3; + + // Server ID. This must be unique among different server instances, + // but the same across all RPC's made to a particular server instance. + string server_id = 4; + // gRPCLB Path. + GrpclbRouteType grpclb_route_type = 5; + + // Server hostname. + string hostname = 6; +} + +// Client-streaming request. +message StreamingInputCallRequest { + // Optional input payload sent along with the request. + Payload payload = 1; + + // Whether the server should expect this request to be compressed. This field + // is "nullable" in order to interoperate seamlessly with servers not able to + // implement the full compression tests by introspecting the call to verify + // the request's compression status. + BoolValue expect_compressed = 2; + + // Not expecting any payload from the response. +} + +// Client-streaming response. +message StreamingInputCallResponse { + // Aggregated size of payloads received from the client. + int32 aggregated_payload_size = 1; +} + +// Configuration for a particular response. +message ResponseParameters { + // Desired payload sizes in responses from the server. + int32 size = 1; + + // Desired interval between consecutive responses in the response stream in + // microseconds. + int32 interval_us = 2; + + // Whether to request the server to compress the response. This field is + // "nullable" in order to interoperate seamlessly with clients not able to + // implement the full compression tests by introspecting the call to verify + // the response's compression status. + BoolValue compressed = 3; +} + +// Server-streaming request. +message StreamingOutputCallRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, the payload from each response in the stream + // might be of different types. This is to simulate a mixed type of payload + // stream. + PayloadType response_type = 1; + + // Configuration for each expected response message. + repeated ResponseParameters response_parameters = 2; + + // Optional input payload sent along with the request. + Payload payload = 3; + + // Whether server should return a given status + EchoStatus response_status = 7; +} + +// Server-streaming response, as configured by the request and parameters. +message StreamingOutputCallResponse { + // Payload to increase response size. + Payload payload = 1; +} + +// For reconnect interop test only. +// Client tells server what reconnection parameters it used. +message ReconnectParams { + int32 max_reconnect_backoff_ms = 1; +} + +// For reconnect interop test only. +// Server tells client whether its reconnects are following the spec and the +// reconnect backoffs it saw. +message ReconnectInfo { + bool passed = 1; + repeated int32 backoff_ms = 2; +} + +message LoadBalancerStatsRequest { + // Request stats for the next num_rpcs sent by client. + int32 num_rpcs = 1; + // If num_rpcs have not completed within timeout_sec, return partial results. + int32 timeout_sec = 2; +} + +message LoadBalancerStatsResponse { + // The number of completed RPCs for each peer. + map rpcs_by_peer = 1; + // The number of RPCs that failed to record a remote peer. + int32 num_failures = 2; +} diff --git a/test_server/test.proto b/test_server/test.proto new file mode 100644 index 0000000..9e92654 --- /dev/null +++ b/test_server/test.proto @@ -0,0 +1,86 @@ + +// Copyright 2015-2016 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// An integration test service that covers all the method signature permutations +// of unary/streaming requests/responses. + +syntax = "proto3"; + +import "test_server/empty.proto"; +import "test_server/messages.proto"; + +package grpc.testing; + +// A simple service to test the various types of RPCs and experiment with +// performance with various types of payload. +service TestService { + // One empty request followed by one empty response. + rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty); + + // One request followed by one response. + rpc UnaryCall(SimpleRequest) returns (SimpleResponse); + + // One request followed by one response. Response has cache control + // headers set such that a caching HTTP proxy (such as GFE) can + // satisfy subsequent requests. + rpc CacheableUnaryCall(SimpleRequest) returns (SimpleResponse); + + // One request followed by a sequence of responses (streamed download). + // The server returns the payload with client desired type and sizes. + rpc StreamingOutputCall(StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by one response (streamed upload). + // The server returns the aggregated size of client payload as the result. + rpc StreamingInputCall(stream StreamingInputCallRequest) + returns (StreamingInputCallResponse); + + // A sequence of requests with each request served by the server immediately. + // As one request could lead to multiple responses, this interface + // demonstrates the idea of full duplexing. + rpc FullDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by a sequence of responses. + // The server buffers all the client requests and then serves them in order. A + // stream of responses are returned to the client when the server starts with + // first request. + rpc HalfDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // The test server will not implement this method. It will be used + // to test the behavior when clients call unimplemented methods. + rpc UnimplementedCall(grpc.testing.Empty) returns (grpc.testing.Empty); +} + +// A simple service NOT implemented at servers so clients can test for +// that case. +service UnimplementedService { + // A call that no server should implement + rpc UnimplementedCall(grpc.testing.Empty) returns (grpc.testing.Empty); +} + +// A service used to control reconnect server. +service ReconnectService { + rpc Start(grpc.testing.ReconnectParams) returns (grpc.testing.Empty); + rpc Stop(grpc.testing.Empty) returns (grpc.testing.ReconnectInfo); +} + +// A service used to obtain stats for verifying LB behavior. +service LoadBalancerStatsService { + // Gets the backend distribution for RPCs sent by a test client. + rpc GetClientStats(LoadBalancerStatsRequest) + returns (LoadBalancerStatsResponse) {} +} diff --git a/test_server/wsgi.py b/test_server/wsgi.py new file mode 100644 index 0000000..aefa4fe --- /dev/null +++ b/test_server/wsgi.py @@ -0,0 +1,92 @@ +import sys +import time +from wsgiref.simple_server import make_server + +import sonora.wsgi + +from test_server import empty_pb2, messages_pb2, test_pb2_grpc + +_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial" +_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin" +_US_IN_A_SECOND = 1000 * 1000 + + +def _maybe_echo_metadata(servicer_context): + """Copies metadata from request to response if it is present.""" + invocation_metadata = dict(servicer_context.invocation_metadata()) + if _INITIAL_METADATA_KEY in invocation_metadata: + initial_metadatum = (_INITIAL_METADATA_KEY, + invocation_metadata[_INITIAL_METADATA_KEY]) + servicer_context.send_initial_metadata((initial_metadatum,)) + if _TRAILING_METADATA_KEY in invocation_metadata: + trailing_metadatum = (_TRAILING_METADATA_KEY, + invocation_metadata[_TRAILING_METADATA_KEY]) + servicer_context.set_trailing_metadata((trailing_metadatum,)) + + +def _maybe_echo_status_and_message(request, servicer_context): + """Sets the response context code and details if the request asks for them""" + if request.HasField('response_status'): + servicer_context.set_code(request.response_status.code) + servicer_context.set_details(request.response_status.message) + + +class TestServiceServicer(test_pb2_grpc.TestServiceServicer): + def EmptyCall(self, request, context): + _maybe_echo_metadata(context) + return empty_pb2.Empty() + + def UnaryCall(self, request, context): + _maybe_echo_metadata(context) + _maybe_echo_status_and_message(request, context) + return messages_pb2.SimpleResponse( + payload=messages_pb2.Payload(type=messages_pb2.COMPRESSABLE, + body=b'\x00' * request.response_size)) + + def StreamingOutputCall(self, request, context): + _maybe_echo_status_and_message(request, context) + for response_parameters in request.response_parameters: + if response_parameters.interval_us != 0: + time.sleep(response_parameters.interval_us / _US_IN_A_SECOND) + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload(type=request.response_type, + body=b'\x00' * + response_parameters.size)) + + def StreamingInputCall(self, request_iterator, context): + aggregate_size = 0 + for request in request_iterator: + if request.payload is not None and request.payload.body: + aggregate_size += len(request.payload.body) + return messages_pb2.StreamingInputCallResponse( + aggregated_payload_size=aggregate_size) + + def FullDuplexCall(self, request_iterator, context): + _maybe_echo_metadata(context) + for request in request_iterator: + _maybe_echo_status_and_message(request, context) + for response_parameters in request.response_parameters: + if response_parameters.interval_us != 0: + time.sleep(response_parameters.interval_us / + _US_IN_A_SECOND) + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload(type=request.payload.type, + body=b'\x00' * + response_parameters.size)) + + # NOTE(nathaniel): Apparently this is the same as the full-duplex call? + # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)... + def HalfDuplexCall(self, request_iterator, context): + return self.FullDuplexCall(request_iterator, context) + +def main(args): + grpc_wsgi_app = sonora.wsgi.grpcWSGI(None) + + with make_server("", 8080, grpc_wsgi_app) as httpd: + test_pb2_grpc.add_TestServiceServicer_to_server(TestServiceServicer(), grpc_wsgi_app) + print("Server up on 0.0.0.0:8080") + httpd.serve_forever() + + +if __name__ == "__main__": + sys.exit(main(sys.argv)) diff --git a/tests/conftest.py b/tests/conftest.py index dca1459..2f29874 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -117,7 +117,7 @@ async def HelloMetadata(self, request, context): for key, value in context.invocation_metadata() ) - await context.set_trailing_metadata( + context.set_trailing_metadata( (f"trailing-{key}", repr(value)) for key, value in context.invocation_metadata() ) @@ -139,7 +139,7 @@ async def HelloStreamMetadata(self, request, context): for c in value: yield helloworld_pb2.HelloReply(message=c) - await context.set_trailing_metadata( + context.set_trailing_metadata( (f"trailing-{key}", repr(value)) for key, value in context.invocation_metadata() )