Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-blocking client API #158

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open

Non-blocking client API #158

wants to merge 14 commits into from

Conversation

tohtana
Copy link
Contributor

@tohtana tohtana commented Mar 12, 2023

This PR implements a non-blocking client API.
As pointed out in #152, non-blocking calls increase the flexibility in some use cases. The PR simplifies the proposal in #152.

The new API query_async allows users to call MII service in an asynchronous manner.
The return value is a future-like object. The user can get the result by calling result().

future = generator.query_async(
            {"query": query}, do_sample=False, max_new_tokens=30)
print(future.result())

@lcw99
Copy link

lcw99 commented Mar 12, 2023

This PR implements a non-blocking client API. As pointed out in #152, non-blocking calls increase the flexibility in some use cases. The PR simplifies the proposal in #152.

The new API query_async allows users to call MII service in an asynchronous manner. The return value is a future-like object. The user can get the result by calling `result()'.

future = generator.query_async(
            {"query": query}, do_sample=False, max_new_tokens=30)
print(future.result())

query_async works well, but if query_async is called again while the event loop is running(previous call is on progress), the following error occurs.

File "/home/chang/AI/llm/DeepSpeed-MII/mii/client.py", line 60, in result
return self.asyncio_loop.run_until_complete(self.coro)
File "uvloop/loop.pyx", line 1511, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1504, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1377, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 518, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.

@tohtana
Copy link
Contributor Author

tohtana commented Mar 12, 2023

@lcw99 Thank you for trying this PR!

Looks like you are using uvloop, but we may not be able to support custom event loops.
Can you try the built-in python event loop?
I tested the non-blocking calls with this code.

generator = mii.mii_query_handle("bloom560m_deployment")
future_list = [generator.query_async(
    {"query": query}, do_sample=False, max_new_tokens=30) for _ in range(5)]
for future in future_list:
    print(future.result())

@lcw99
Copy link

lcw99 commented Mar 13, 2023

@lcw99 Thank you for trying this PR!

Looks like you are using uvloop, but we may not be able to support custom event loops. Can you try the built-in python event loop? I tested the non-blocking calls with this code.

generator = mii.mii_query_handle("bloom560m_deployment")
future_list = [generator.query_async(
    {"query": query}, do_sample=False, max_new_tokens=30) for _ in range(5)]
for future in future_list:
    print(future.result())

My situation is to call a generator in a telegram bot, but the bot by its nature calls the generator in a different thread when a message comes in. To solve this problem, I think the generator should respond to the multi-threaded call situation.

@tohtana
Copy link
Contributor Author

tohtana commented Mar 20, 2023

@lcw99 I implemented a different approach using multithreads. Can you try this PR?

The following is an example of API calls from multiple threads. Note that a client must be created in the main thread because grpc uses the event loop in the current thread.

import threading
import mii

def _run_parallel(client, i):
    print(f"thread {i} started.")

    query = ["Seattle is", "Bellevue is"]
    for _ in range(5):
        result = client.query(
            {"query": query}, do_sample=False, max_new_tokens=30)
        print(result)
    print(f"thread {i} finished.")

def main():
    deployment_name = "bloom560m_deployment"
    client = mii.mii_query_handle(deployment_name)
    threads = [threading.Thread(target=_run_parallel, args=(client, i)) for i in range(3)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

if __name__ == "__main__":
    main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants