Skip to content

Commit

Permalink
use ipc
Browse files Browse the repository at this point in the history
  • Loading branch information
aniketmaurya committed Jan 9, 2025
1 parent 593af5f commit e75f7b2
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 47 deletions.
6 changes: 3 additions & 3 deletions src/litserve/loops/loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def inference_worker(
callback_runner: CallbackRunner,
loop: Union[str, _BaseLoop],
use_zmq: bool,
zmq_port: Optional[int],
zmq_addr: Optional[str],
):
callback_runner.trigger_event(EventTypes.BEFORE_SETUP, lit_api=lit_api)
try:
Expand All @@ -82,8 +82,8 @@ def inference_worker(
if use_zmq:
ctx = zmq.Context()
socket = ctx.socket(zmq.PUB)
logger.debug(f"Inference worker binding to port {zmq_port}")
socket.bind(f"tcp://*:{zmq_port}")
logger.debug(f"Inference worker binding to {zmq_addr}")
socket.bind(zmq_addr)
loop.zmq_context = ctx

loop(
Expand Down
14 changes: 7 additions & 7 deletions src/litserve/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from litserve.middlewares import MaxSizeMiddleware, RequestCountMiddleware
from litserve.python_client import client_template
from litserve.specs.base import LitSpec
from litserve.utils import LitAPIStatus, WorkerSetupStatus, call_after_stream, get_random_port
from litserve.utils import LitAPIStatus, WorkerSetupStatus, call_after_stream, generate_random_ipc_address

mp.allow_connection_pickling()

Expand Down Expand Up @@ -75,15 +75,15 @@ async def response_queue_to_buffer(
stream: bool,
threadpool: ThreadPoolExecutor,
use_zmq: bool,
port: Optional[int] = None,
addr: Optional[str] = None,
):
loop = asyncio.get_running_loop()
socket = None
if use_zmq:
ctx = zmq.asyncio.Context()
socket = ctx.socket(zmq.SUB)
# TODO: Make the address configurable or select random available port
socket.connect(f"tcp://127.0.0.1:{port}")
socket.connect(addr)
socket.setsockopt_string(zmq.SUBSCRIBE, "")

async def _get_response():
Expand Down Expand Up @@ -246,8 +246,8 @@ def __init__(
self._connector = _Connector(accelerator=accelerator, devices=devices)
self._callback_runner = CallbackRunner(callbacks)
self.use_zmq = use_zmq
self._zmq_port = get_random_port() if use_zmq else None
logger.debug(f"ZMQ port: {self._zmq_port}")
self._zmq_addr = generate_random_ipc_address() if use_zmq else None
logger.debug(f"ZMQ port: {self._zmq_addr}")

specs = spec if spec is not None else []
self._specs = specs if isinstance(specs, Sequence) else [specs]
Expand Down Expand Up @@ -318,7 +318,7 @@ def launch_inference_worker(self, num_uvicorn_servers: int):
self._callback_runner,
self._loop,
self.use_zmq,
self._zmq_port,
self._zmq_addr,
),
)
process.start()
Expand All @@ -339,7 +339,7 @@ async def lifespan(self, app: FastAPI):
response_queue = self.response_queues[app.response_queue_id]
response_executor = ThreadPoolExecutor(max_workers=len(self.inference_workers))
future = response_queue_to_buffer(
response_queue, self.response_buffer, self.stream, response_executor, self.use_zmq, self._zmq_port
response_queue, self.response_buffer, self.stream, response_executor, self.use_zmq, self._zmq_addr
)
task = loop.create_task(future)

Expand Down
41 changes: 13 additions & 28 deletions src/litserve/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@
# limitations under the License.
import asyncio
import dataclasses
import errno
import logging
import os
import pickle
import random
import sys
import uuid
from contextlib import contextmanager
from typing import TYPE_CHECKING, AsyncIterator

import zmq
from fastapi import HTTPException
from zmq import ZMQBindError, ZMQError

if TYPE_CHECKING:
from litserve.server import LitServer
Expand Down Expand Up @@ -137,27 +135,14 @@ def add_log_handler(handler):
logging.getLogger("litserve").addHandler(handler)


def get_random_port(min_port=49152, max_port=65535, max_tries=100):
"""Get a random available open port on the local machine within a specified range."""
for i in range(max_tries):
ctx = None
sock = None
port = None
try:
port = random.randrange(min_port, max_port)
ctx = zmq.Context()
sock = ctx.socket(zmq.REP)
sock.bind(f"tcp://localhost:{port}")
except ZMQError as exception:
en = exception.errno
if en == zmq.EADDRINUSE or sys.platform == "win32" and en == errno.EACCES:
continue
raise
else:
return port
finally:
if sock:
sock.unbind(f"tcp://localhost:{port}")
sock.close()
ctx.term()
raise ZMQBindError("Could not find random port.")
def generate_random_ipc_address(temp_dir="/tmp"):
"""Generate a random IPC address in the /tmp directory.
Ensures the address is unique.
Returns:
str: A random IPC address suitable for ZeroMQ.
"""
unique_name = f"zmq-{uuid.uuid4().hex}.ipc"
ipc_path = os.path.join(temp_dir, unique_name)
return f"ipc://{ipc_path}"
4 changes: 2 additions & 2 deletions tests/test_loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def test_inference_worker(mock_single_loop, mock_batched_loop):
callback_runner=NOOP_CB_RUNNER,
loop="auto",
use_zmq=False,
zmq_port=None,
zmq_addr=None,
)
mock_batched_loop.assert_called_once()

Expand All @@ -216,7 +216,7 @@ def test_inference_worker(mock_single_loop, mock_batched_loop):
callback_runner=NOOP_CB_RUNNER,
loop="auto",
use_zmq=False,
zmq_port=None,
zmq_addr=None,
)
mock_single_loop.assert_called_once()

Expand Down
8 changes: 1 addition & 7 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest
from fastapi import HTTPException

from litserve.utils import call_after_stream, dump_exception, get_random_port
from litserve.utils import call_after_stream, dump_exception


def test_dump_exception():
Expand All @@ -31,9 +31,3 @@ async def test_call_after_stream():
pass
callback.assert_called()
callback.assert_called_with("first_arg", random_arg="second_arg")


def test_get_random_port():
port = get_random_port()
assert 1024 <= port <= 65535
assert isinstance(port, int)

0 comments on commit e75f7b2

Please sign in to comment.