Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

is using push/pull <-> router a valid socket combination? #4722

Open
John-Trager opened this issue Aug 10, 2024 · 2 comments
Open

is using push/pull <-> router a valid socket combination? #4722

John-Trager opened this issue Aug 10, 2024 · 2 comments

Comments

@John-Trager
Copy link

Please use this template for reporting suspected bugs or requests for help.

Issue description

I am writing an async client and server. For my needs the client needs to be able to send and receive messages at the same time, ie have a thread for receiving the messages and processing them, and then another thread that can send messages to the server. In my case I will have multiple clients connecting to the server at the same time so that is why I went with the router on the server side.

BUT, since I can't share a dealer socket for the client(since threads to recv and send at the same time), could I use a push/pull sockets to send and recv on the client to the server router?

Environment

  • libzmq version (commit hash if unreleased):
  • OS:

Minimal test code / Steps to reproduce the issue

N/A

What's the actual result? (include assertion message & call stack if applicable)

N/A

What's the expected result?

use push/pull on client and router on server. Be able to set identity for push/pull socket.

@John-Trager
Copy link
Author

John-Trager commented Aug 10, 2024

This discussion over on jeromq, seems to point that it is a valid option but I put together a simple example in python which seems not to work:

client:

import zmq
import threading

class ZmqClient:
    def __init__(self, server_address):
        self.context = zmq.Context()
        self.pull_socket = self.context.socket(zmq.PULL)
        self.push_socket = self.context.socket(zmq.PUSH)

        # same identity so server can send messages back to the client pull sock
        self.pull_socket.setsockopt(zmq.IDENTITY, b'client')
        self.push_socket.setsockopt(zmq.IDENTITY, b'client')

        self.pull_socket.connect(server_address)
        self.push_socket.connect(server_address)

    def start(self):
        threading.Thread(target=self.receive_messages, daemon=True).start()

    def receive_messages(self):
        while True:
            message = self.pull_socket.recv_string()
            print(f"Received message: {message}")

    def send_message(self, message):
        self.push_socket.send_string(message)

if __name__ == "__main__":
    server_address = "tcp://localhost:5555"
    client = ZmqClient(server_address)
    client.start()

    while True:
        message = input("Enter a message to send: ")
        client.send_message(message)

server:

import zmq

class Server:
    def __init__(self, address):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.ROUTER)
        self.socket.bind(address)

    def run(self):
        while True:
            identity, _, message = self.socket.recv_multipart()
            print(f"Received message: {message} from {identity}")
            self.socket.send_multipart([identity, b"", b"Hello, " + message])

if __name__ == "__main__":
    server = Server("tcp://localhost:5555")
    server.run()

The result is the server never even receives the message sent by the client.

@John-Trager
Copy link
Author

alternatively I can use an agent model: a pair socket and poller to get the desired results without push/pull but still much less clean then just integrating with push/pull <-> router.

client code (a little long):

import zmq
import threading

class ThreadedDealerClient:
    def __init__(self, server_address):
        self.server_address = server_address
        self.dealer_id = b"client"
        self.context = zmq.Context()

        # create pair socket for thread comms
        # sender sends messages to forwarder which 
        # which then uses dealer to send to server
        self.sender = self.context.socket(zmq.PAIR)
        self.sender.bind("inproc://sender")


    def start(self):
        threading.Thread(target=self.agent, daemon=True).start()

    def agent(self):
        # create dealer for comms with server
        dealer = self.context.socket(zmq.DEALER)
        dealer.setsockopt(zmq.IDENTITY, self.dealer_id)
        dealer.connect(self.server_address)

        # receiver pair for message from main thread
        forwarder = self.context.socket(zmq.PAIR)
        forwarder.connect("inproc://sender")

        poller = zmq.Poller()
        poller.register(dealer, zmq.POLLIN)
        poller.register(forwarder, zmq.POLLIN)

        while True:
            try:
                socks = dict(poller.poll())
            except KeyboardInterrupt:
                break

            if forwarder in socks:
                message = forwarder.recv()
                # forward message to server
                print(f"Forwarding message: {message.decode("utf-8")}")
                dealer.send(message)

            if dealer in socks:
                message = dealer.recv_string()
                # process message from server
                # ... do some work ...
                print(f"Received message: {message}")

    def send_message(self, message):
        self.sender.send_string(message)

    def close(self):
        self.sender.close()
        self.context.term()

if __name__ == "__main__":
    client = ThreadedDealerClient("tcp://localhost:5555")
    client.start()
    while True:
        message = input("Enter message: ")
        client.send_message(message)
    client.close() 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant