Skip to content

Commit

Permalink
Added prototype of proxy function.
Browse files Browse the repository at this point in the history
Related to #127
  • Loading branch information
Alexei-Kornienko committed Jan 25, 2021
1 parent 36df6ba commit 729f4dd
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 28 deletions.
31 changes: 3 additions & 28 deletions examples/message_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ mod async_helpers;
use std::error::Error;
use zeromq::prelude::*;

use futures::{select, FutureExt};

#[async_helpers::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut frontend = zeromq::RouterSocket::new();
Expand All @@ -18,30 +16,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.bind("tcp://127.0.0.1:5560")
.await
.expect("Failed to bind");
loop {
select! {
router_mess = frontend.recv().fuse() => {
dbg!(&router_mess);
match router_mess {
Ok(message) => {
backend.send(message).await?;
}
Err(_) => {
todo!()
}
}
},
dealer_mess = backend.recv().fuse() => {
dbg!(&dealer_mess);
match dealer_mess {
Ok(message) => {
frontend.send(message).await?;
}
Err(_) => {
todo!()
}
}
}
};
}

zeromq::proxy(frontend, backend, None).await?;
Ok(())
}
38 changes: 38 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ extern crate enum_primitive_derive;

use async_trait::async_trait;
use futures::channel::mpsc;
use futures::FutureExt;
use futures_codec::FramedWrite;
use num_traits::ToPrimitive;
use parking_lot::Mutex;
Expand Down Expand Up @@ -285,6 +286,43 @@ pub trait Socket: Sized + Send {
}
}

pub async fn proxy<Frontend: SocketSend + SocketRecv, Backend: SocketSend + SocketRecv>(
mut frontend: Frontend,
mut backend: Backend,
mut capture: Option<Box<dyn SocketSend>>,
) -> ZmqResult<()> {
loop {
futures::select! {
frontend_mess = frontend.recv().fuse() => {
match frontend_mess {
Ok(message) => {
if let Some(capture) = &mut capture {
capture.send(message.clone()).await?;
}
backend.send(message).await?;
}
Err(_) => {
todo!()
}
}
},
backend_mess = backend.recv().fuse() => {
match backend_mess {
Ok(message) => {
if let Some(capture) = &mut capture {
capture.send(message.clone()).await?;
}
frontend.send(message).await?;
}
Err(_) => {
todo!()
}
}
}
};
}
}

pub mod prelude {
//! Re-exports important traits. Consider glob-importing.
Expand Down

0 comments on commit 729f4dd

Please sign in to comment.