diff --git a/README.md b/README.md index 512fa26c..d1f96c82 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ A thread-per-core Rust runtime with io_uring/epoll/kqueue. [zh-readme-url]: README-zh.md ## Design Goal -Monoio is a pure io_uring/epoll/kqueue Rust async runtime. Part of the design has been borrowed from Tokio and Tokio-uring. However, unlike Tokio-uring, Monoio does not run on top of another runtime, rendering it more efficient. +Monoio is a pure io_uring/epoll/kqueue Rust async runtime. Part of the design has been borrowed from Tokio and Tokio-uring. However, unlike Tokio-uring, Monoio does not run on top of another runtime, rendering it more efficient. Moreover, Monoio is designed with a thread-per-core model in mind. Users do not need to worry about tasks being `Send` or `Sync`, as thread local storage can be used safely. In other words, the data does not escape the thread on await points, unlike on work-stealing runtimes such as Tokio. This is because for some use cases, specifically those targeted by this runtime, it is not necessary to make task schedulable between threads. For example, if we were to write a load balancer like NGINX, we would write it in a thread-per-core way. The thread local data does not need to be shared between threads, so the `Sync` and `Send` do not need to be implemented in the first place. diff --git a/docs/en/io-cancel.md b/docs/en/io-cancel.md index 9e2bc903..84ddcbfc 100644 --- a/docs/en/io-cancel.md +++ b/docs/en/io-cancel.md @@ -1,6 +1,7 @@ --- title: Cancel IO date: 2023-02-27 15:09:00 +updated: 2023-11-06 16:49:00 author: ihciah --- @@ -14,28 +15,17 @@ Taking reads as an example, the following code defines a cancelable asynchronous ```rust /// CancelableAsyncReadRent: async read with an ownership of a buffer and ability to cancel io. pub trait CancelableAsyncReadRent: AsyncReadRent { - /// The future of read Result - type CancelableReadFuture<'a, T>: Future> - where - Self: 'a, - T: IoBufMut + 'a; - /// The future of readv Result - type CancelableReadvFuture<'a, T>: Future> - where - Self: 'a, - T: IoVecBufMut + 'a; - fn cancelable_read( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadFuture<'_, T>; + ) -> impl Future>; fn cancelable_readv( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadvFuture<'_, T>; + ) -> impl Future>; } ``` diff --git a/docs/en/why-GAT.md b/docs/en/why-GAT.md index edb27886..e9f144a6 100644 --- a/docs/en/why-GAT.md +++ b/docs/en/why-GAT.md @@ -1,6 +1,7 @@ --- title: Why GAT date: 2021-11-24 20:00:00 +updated: 2023-11-06 16:49:00 author: ihciah --- @@ -39,3 +40,17 @@ trait AsyncReadRent { ``` The only problem here is, if you use GAT style, you should always use it. Providing `poll` style based on GAT is not easy. As an example, `monoio-compat` implement tokio `AsyncRead` and `AsyncWrite` based on GAT style future with some unsafe hack(and also with a `Box` cost). + +## async_fn_in_trait +`async_fn_in_trait` and `return_position_impl_trait_in_trait` is stable now in rust and can be used to replace GAT usage here(related [issue](https://github.com/rust-lang/rust/issues/91611)). + +Now we can define and impl async trait easier: +```rust +trait AsyncReadRent { + fn read(&mut self, buf: T) -> impl Future>; +} + +impl AsyncReadRent for Demo { + async fn read(&mut self, buf: T) -> BufResult { ... } +} +``` diff --git a/docs/zh/io-cancel.md b/docs/zh/io-cancel.md index 8337c90e..ebd62131 100644 --- a/docs/zh/io-cancel.md +++ b/docs/zh/io-cancel.md @@ -1,6 +1,7 @@ --- title: 取消 IO date: 2023-02-27 15:09:00 +updated: 2023-11-06 16:49:00 author: ihciah --- @@ -14,28 +15,17 @@ author: ihciah ```rust /// CancelableAsyncReadRent: async read with a ownership of a buffer and ability to cancel io. pub trait CancelableAsyncReadRent: AsyncReadRent { - /// The future of read Result - type CancelableReadFuture<'a, T>: Future> - where - Self: 'a, - T: IoBufMut + 'a; - /// The future of readv Result - type CancelableReadvFuture<'a, T>: Future> - where - Self: 'a, - T: IoVecBufMut + 'a; - fn cancelable_read( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadFuture<'_, T>; + ) -> impl Future>; fn cancelable_readv( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadvFuture<'_, T>; + ) -> impl Future>; } ``` diff --git a/docs/zh/why-GAT.md b/docs/zh/why-GAT.md index 1c0eecc4..92a81814 100644 --- a/docs/zh/why-GAT.md +++ b/docs/zh/why-GAT.md @@ -1,6 +1,7 @@ --- title: 为什么使用 GAT date: 2021-11-24 20:00:00 +updated: 2023-11-06 16:49:00 author: ihciah --- @@ -42,3 +43,17 @@ trait AsyncReadRent { ``` 这是银弹吗?不是。唯一的问题在于,如果使用了 GAT 这一套模式,就要总是使用它。如果你在 `poll` 形式和 GAT 形式之间反复横跳,那你会十分痛苦。基于 `poll` 形式接口自行维护状态,确实可以实现 Future(最简单的实现如 `poll_fn`);但反过来就很难受了:你很难存储一个带生命周期的 Future。虽然使用一些 unsafe 的 hack 可以做(也有 cost)这件事,但是仍旧,限制很多且并不推荐这么做。`monoio-compat` 基于 GAT 的 future 实现了 Tokio 的 `AsyncRead` 和 `AsyncWrite`,如果你非要试一试,可以参考它。 + +## async_fn_in_trait +Rust 已经稳定了 `async_fn_in_trait`,结合 `return_position_impl_trait_in_trait` 可以替代这里的 GAT(相关 [issue](https://github.com/rust-lang/rust/issues/91611))。 + +现在我们可以更简单地定义并实现 trait: +```rust +trait AsyncReadRent { + fn read(&mut self, buf: T) -> impl Future>; +} + +impl AsyncReadRent for Demo { + async fn read(&mut self, buf: T) -> BufResult { ... } +} +``` \ No newline at end of file diff --git a/monoio-compat/Cargo.toml b/monoio-compat/Cargo.toml index bee107c6..c116efa0 100644 --- a/monoio-compat/Cargo.toml +++ b/monoio-compat/Cargo.toml @@ -8,12 +8,12 @@ license = "MIT/Apache-2.0" name = "monoio-compat" readme = "README.md" repository = "https://github.com/bytedance/monoio" -version = "0.1.2" +version = "0.2.0" [dependencies] -monoio = {version = "0.1.0", path = "../monoio", default-features = false} +monoio = {version = "0.2.0", path = "../monoio", default-features = false} reusable-box-future = "0.2" tokio = {version = "1", default-features = false, features = ["io-util"]} [dev-dependencies] -monoio = {version = "0.1.0", path = "../monoio", features = ["async-cancel", "macros"]} +monoio = {version = "0.2.0", path = "../monoio", features = ["async-cancel", "macros"]} diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml index d523cb64..28ee83d7 100644 --- a/monoio/Cargo.toml +++ b/monoio/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT/Apache-2.0" name = "monoio" readme = "../README.md" repository = "https://github.com/bytedance/monoio" -version = "0.1.9" +version = "0.2.0" # common dependencies [dependencies] diff --git a/monoio/src/io/async_buf_read.rs b/monoio/src/io/async_buf_read.rs index a6a52291..149544df 100644 --- a/monoio/src/io/async_buf_read.rs +++ b/monoio/src/io/async_buf_read.rs @@ -4,13 +4,8 @@ use crate::io::AsyncReadRent; /// AsyncBufRead: async read with buffered content pub trait AsyncBufRead: AsyncReadRent { - /// The returned future of fill_buf - type FillBufFuture<'a>: Future> - where - Self: 'a; - /// Try read data and get a reference to the internal buffer - fn fill_buf(&mut self) -> Self::FillBufFuture<'_>; + fn fill_buf(&mut self) -> impl Future>; /// Mark how much data is read fn consume(&mut self, amt: usize); } diff --git a/monoio/src/io/async_buf_read_ext.rs b/monoio/src/io/async_buf_read_ext.rs index 20d9b91c..102b861a 100644 --- a/monoio/src/io/async_buf_read_ext.rs +++ b/monoio/src/io/async_buf_read_ext.rs @@ -55,11 +55,6 @@ where /// AsyncBufReadExt pub trait AsyncBufReadExt { - /// The future of read until Result - type ReadUntilFuture<'a>: Future> - where - Self: 'a; - /// This function will read bytes from the underlying stream until the delimiter or EOF is /// found. Once found, all bytes up to, and including, the delimiter (if found) will be appended /// to buf. @@ -69,12 +64,11 @@ pub trait AsyncBufReadExt { /// # Errors /// This function will ignore all instances of ErrorKind::Interrupted and will otherwise return /// any errors returned by fill_buf. - fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec) -> Self::ReadUntilFuture<'a>; - - /// The future of read line Result - type ReadLineFuture<'a>: Future> - where - Self: 'a; + fn read_until<'a>( + &'a mut self, + byte: u8, + buf: &'a mut Vec, + ) -> impl Future>; /// This function will read bytes from the underlying stream until the newline delimiter (the /// 0xA byte) or EOF is found. Once found, all bytes up to, and including, the delimiter (if @@ -88,41 +82,39 @@ pub trait AsyncBufReadExt { /// This function has the same error semantics as read_until and will also return an error if /// the read bytes are not valid UTF-8. If an I/O error is encountered then buf may contain some /// bytes already read in the event that all data read so far was valid UTF-8. - fn read_line<'a>(&'a mut self, buf: &'a mut String) -> Self::ReadLineFuture<'a>; + fn read_line<'a>(&'a mut self, buf: &'a mut String) -> impl Future>; } impl AsyncBufReadExt for A where A: AsyncBufRead + ?Sized, { - type ReadUntilFuture<'a> = impl Future> + 'a where Self: 'a; - - fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec) -> Self::ReadUntilFuture<'a> { + fn read_until<'a>( + &'a mut self, + byte: u8, + buf: &'a mut Vec, + ) -> impl Future> { read_until(self, byte, buf) } - type ReadLineFuture<'a> = impl Future> + 'a where Self: 'a; - - fn read_line<'a>(&'a mut self, buf: &'a mut String) -> Self::ReadLineFuture<'a> { - async { - unsafe { - let mut g = Guard { - len: buf.len(), - buf: buf.as_mut_vec(), - }; + async fn read_line<'a>(&'a mut self, buf: &'a mut String) -> Result { + unsafe { + let mut g = Guard { + len: buf.len(), + buf: buf.as_mut_vec(), + }; - let ret = read_until(self, b'\n', g.buf).await; - if from_utf8(&g.buf[g.len..]).is_err() { - ret.and_then(|_| { - Err(Error::new( - ErrorKind::InvalidData, - "stream did not contain valid UTF-8", - )) - }) - } else { - g.len = g.buf.len(); - ret - } + let ret = read_until(self, b'\n', g.buf).await; + if from_utf8(&g.buf[g.len..]).is_err() { + ret.and_then(|_| { + Err(Error::new( + ErrorKind::InvalidData, + "stream did not contain valid UTF-8", + )) + }) + } else { + g.len = g.buf.len(); + ret } } } diff --git a/monoio/src/io/async_read_rent.rs b/monoio/src/io/async_read_rent.rs index ba0a1d28..e61fd628 100644 --- a/monoio/src/io/async_read_rent.rs +++ b/monoio/src/io/async_read_rent.rs @@ -7,64 +7,36 @@ use crate::{ /// AsyncReadRent: async read with a ownership of a buffer pub trait AsyncReadRent { - /// The future of read Result - type ReadFuture<'a, T>: Future> - where - Self: 'a, - T: IoBufMut + 'a; - /// The future of readv Result - type ReadvFuture<'a, T>: Future> - where - Self: 'a, - T: IoVecBufMut + 'a; - /// Same as read(2) - fn read(&mut self, buf: T) -> Self::ReadFuture<'_, T>; + fn read(&mut self, buf: T) -> impl Future>; /// Same as readv(2) - fn readv(&mut self, buf: T) -> Self::ReadvFuture<'_, T>; + fn readv(&mut self, buf: T) -> impl Future>; } /// AsyncReadRentAt: async read with a ownership of a buffer and a position pub trait AsyncReadRentAt { - /// The future of Result - type Future<'a, T>: Future> - where - Self: 'a, - T: 'a; - - /// Same as Read(2) - fn read_at(&mut self, buf: T, pos: usize) -> Self::Future<'_, T>; + /// Same as pread(2) + fn read_at( + &mut self, + buf: T, + pos: usize, + ) -> impl Future>; } impl AsyncReadRent for &mut A { - type ReadFuture<'a, T> = A::ReadFuture<'a, T> - where - Self: 'a, - T: IoBufMut + 'a; - - type ReadvFuture<'a, T> = A::ReadvFuture<'a, T> - where - Self: 'a, - T: IoVecBufMut + 'a; - #[inline] - fn read(&mut self, buf: T) -> Self::ReadFuture<'_, T> { + fn read(&mut self, buf: T) -> impl Future> { (**self).read(buf) } #[inline] - fn readv(&mut self, buf: T) -> Self::ReadvFuture<'_, T> { + fn readv(&mut self, buf: T) -> impl Future> { (**self).readv(buf) } } impl AsyncReadRent for &[u8] { - type ReadFuture<'a, B> = impl std::future::Future> where - B: IoBufMut + 'a, Self: 'a; - type ReadvFuture<'a, B> = impl std::future::Future> where - B: IoVecBufMut + 'a, Self: 'a; - - fn read(&mut self, mut buf: T) -> Self::ReadFuture<'_, T> { + fn read(&mut self, mut buf: T) -> impl Future> { let amt = std::cmp::min(self.len(), buf.bytes_total()); let (a, b) = self.split_at(amt); unsafe { @@ -75,7 +47,7 @@ impl AsyncReadRent for &[u8] { async move { (Ok(amt), buf) } } - fn readv(&mut self, mut buf: T) -> Self::ReadvFuture<'_, T> { + fn readv(&mut self, mut buf: T) -> impl Future> { // # Safety // We do it in pure sync way. let n = match unsafe { RawBuf::new_from_iovec_mut(&mut buf) } { diff --git a/monoio/src/io/async_read_rent_ext.rs b/monoio/src/io/async_read_rent_ext.rs index c5ddd9b2..d3e98ccb 100644 --- a/monoio/src/io/async_read_rent_ext.rs +++ b/monoio/src/io/async_read_rent_ext.rs @@ -8,73 +8,50 @@ use crate::{ macro_rules! reader_trait { ($future: ident, $n_ty: ty, $f: ident) => { - /// Read number result - type $future<'a>: Future> - where - Self: 'a; - /// Read number in async way - fn $f(&mut self) -> Self::$future<'_>; + fn $f(&mut self) -> impl Future>; }; } macro_rules! reader_be_impl { ($future: ident, $n_ty: ty, $f: ident) => { - type $future<'a> = impl Future> + 'a where A: 'a; - - fn $f(&mut self) -> Self::$future<'_> { - async { - let (res, buf) = self - .read_exact(std::boxed::Box::new([0; std::mem::size_of::<$n_ty>()])) - .await; - res?; - use crate::utils::box_into_inner::IntoInner; - Ok(<$n_ty>::from_be_bytes(Box::consume(buf))) - } + async fn $f(&mut self) -> std::io::Result<$n_ty> { + let (res, buf) = self + .read_exact(std::boxed::Box::new([0; std::mem::size_of::<$n_ty>()])) + .await; + res?; + use crate::utils::box_into_inner::IntoInner; + Ok(<$n_ty>::from_be_bytes(Box::consume(buf))) } }; } macro_rules! reader_le_impl { ($future: ident, $n_ty: ty, $f: ident) => { - type $future<'a> = impl Future> + 'a where A: 'a; - - fn $f(&mut self) -> Self::$future<'_> { - async { - let (res, buf) = self - .read_exact(std::boxed::Box::new([0; std::mem::size_of::<$n_ty>()])) - .await; - res?; - use crate::utils::box_into_inner::IntoInner; - Ok(<$n_ty>::from_le_bytes(Box::consume(buf))) - } + async fn $f(&mut self) -> std::io::Result<$n_ty> { + let (res, buf) = self + .read_exact(std::boxed::Box::new([0; std::mem::size_of::<$n_ty>()])) + .await; + res?; + use crate::utils::box_into_inner::IntoInner; + Ok(<$n_ty>::from_le_bytes(Box::consume(buf))) } }; } /// AsyncReadRentExt pub trait AsyncReadRentExt { - /// The future of Result - type ReadExactFuture<'a, T>: Future> - where - Self: 'a, - T: IoBufMut + 'a; - /// Read until buf capacity is fulfilled - fn read_exact(&mut self, buf: T) -> Self::ReadExactFuture<'_, T> - where - T: 'static + IoBufMut; - - /// The future of Result - type ReadVectoredExactFuture<'a, T>: Future> - where - Self: 'a, - T: IoVecBufMut + 'a; + fn read_exact( + &mut self, + buf: T, + ) -> impl Future>; /// Readv until buf capacity is fulfilled - fn read_vectored_exact(&mut self, buf: T) -> Self::ReadVectoredExactFuture<'_, T> - where - T: 'static + IoVecBufMut; + fn read_vectored_exact( + &mut self, + buf: T, + ) -> impl Future>; reader_trait!(ReadU8Future, u8, read_u8); reader_trait!(ReadU16Future, u16, read_u16); @@ -107,74 +84,61 @@ impl AsyncReadRentExt for A where A: AsyncReadRent + ?Sized, { - type ReadExactFuture<'a, T> = impl Future> + 'a where A: 'a, T: IoBufMut + 'a; - - fn read_exact(&mut self, mut buf: T) -> Self::ReadExactFuture<'_, T> - where - T: 'static + IoBufMut, - { - async move { - let len = buf.bytes_total(); - let mut read = 0; - while read < len { - let buf_slice = unsafe { SliceMut::new_unchecked(buf, read, len) }; - let (result, buf_slice) = self.read(buf_slice).await; - buf = buf_slice.into_inner(); - match result { - Ok(0) => { - return ( - Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - )), - buf, - ) - } - Ok(n) => { - read += n; - unsafe { buf.set_init(read) }; - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (Err(e), buf), + async fn read_exact(&mut self, mut buf: T) -> BufResult { + let len = buf.bytes_total(); + let mut read = 0; + while read < len { + let buf_slice = unsafe { SliceMut::new_unchecked(buf, read, len) }; + let (result, buf_slice) = self.read(buf_slice).await; + buf = buf_slice.into_inner(); + match result { + Ok(0) => { + return ( + Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )), + buf, + ) } + Ok(n) => { + read += n; + unsafe { buf.set_init(read) }; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return (Err(e), buf), } - (Ok(read), buf) } + (Ok(read), buf) } - type ReadVectoredExactFuture<'a, T> = impl Future> + 'a where A: 'a, T: IoVecBufMut + 'a; - - fn read_vectored_exact( + async fn read_vectored_exact( &mut self, mut buf: T, - ) -> Self::ReadVectoredExactFuture<'_, T> - where - T: 'static + IoVecBufMut, - { + ) -> BufResult { let mut meta = crate::buf::write_vec_meta(&mut buf); let len = meta.len(); let mut read = 0; - async move { - while read < len { - let (res, meta_) = self.readv(meta).await; - meta = meta_; - match res { - Ok(0) => { - return ( - Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - )), - buf, - ) - } - Ok(n) => read += n, - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (Err(e), buf), + + while read < len { + let (res, meta_) = self.readv(meta).await; + meta = meta_; + match res { + Ok(0) => { + return ( + Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )), + buf, + ) } + Ok(n) => read += n, + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return (Err(e), buf), } - (Ok(read), buf) } + (Ok(read), buf) } reader_be_impl!(ReadU8Future, u8, read_u8); diff --git a/monoio/src/io/async_rent_cancelable.rs b/monoio/src/io/async_rent_cancelable.rs index 4ba3c905..1dd447a2 100644 --- a/monoio/src/io/async_rent_cancelable.rs +++ b/monoio/src/io/async_rent_cancelable.rs @@ -8,48 +8,27 @@ use crate::{ /// CancelableAsyncReadRent: async read with a ownership of a buffer and ability to cancel io. pub trait CancelableAsyncReadRent: AsyncReadRent { - /// The future of read Result - type CancelableReadFuture<'a, T>: Future> - where - Self: 'a, - T: IoBufMut + 'a; - /// The future of readv Result - type CancelableReadvFuture<'a, T>: Future> - where - Self: 'a, - T: IoVecBufMut + 'a; - /// Same as read(2) fn cancelable_read( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadFuture<'_, T>; + ) -> impl Future>; /// Same as readv(2) fn cancelable_readv( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadvFuture<'_, T>; + ) -> impl Future>; } impl CancelableAsyncReadRent for &mut A { - type CancelableReadFuture<'a, T> = A::CancelableReadFuture<'a, T> - where - Self: 'a, - T: IoBufMut + 'a; - - type CancelableReadvFuture<'a, T> = A::CancelableReadvFuture<'a, T> - where - Self: 'a, - T: IoVecBufMut + 'a; - #[inline] fn cancelable_read( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadFuture<'_, T> { + ) -> impl Future> { (**self).cancelable_read(buf, c) } @@ -58,80 +37,42 @@ impl CancelableAsyncReadRent for &mut A { &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadvFuture<'_, T> { + ) -> impl Future> { (**self).cancelable_readv(buf, c) } } /// CancelableAsyncWriteRent: async write with a ownership of a buffer and ability to cancel io. pub trait CancelableAsyncWriteRent: AsyncWriteRent { - /// The future of write Result - type CancelableWriteFuture<'a, T>: Future> - where - Self: 'a, - T: IoBuf + 'a; - /// The future of writev Result - type CancelableWritevFuture<'a, T>: Future> - where - Self: 'a, - T: IoVecBuf + 'a; - - /// The future of flush - type CancelableFlushFuture<'a>: Future> - where - Self: 'a; - - /// The future of shutdown - type CancelableShutdownFuture<'a>: Future> - where - Self: 'a; - /// Same as write(2) fn cancelable_write( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableWriteFuture<'_, T>; + ) -> impl Future>; /// Same as writev(2) fn cancelable_writev( &mut self, buf_vec: T, c: CancelHandle, - ) -> Self::CancelableWritevFuture<'_, T>; + ) -> impl Future>; /// Flush buffered data if needed - fn cancelable_flush(&mut self, c: CancelHandle) -> Self::CancelableFlushFuture<'_>; + fn cancelable_flush(&mut self, c: CancelHandle) -> impl Future>; /// Same as shutdown - fn cancelable_shutdown(&mut self, c: CancelHandle) -> Self::CancelableShutdownFuture<'_>; + fn cancelable_shutdown(&mut self, c: CancelHandle) + -> impl Future>; } impl CancelableAsyncWriteRent for &mut A { - type CancelableWriteFuture<'a, T> = A::CancelableWriteFuture<'a, T> - where - Self: 'a, - T: IoBuf + 'a; - - type CancelableWritevFuture<'a, T> = A::CancelableWritevFuture<'a, T> - where - Self: 'a, - T: IoVecBuf + 'a; - - type CancelableFlushFuture<'a> = A::CancelableFlushFuture<'a> - where - Self: 'a; - - type CancelableShutdownFuture<'a> = A::CancelableShutdownFuture<'a> - where - Self: 'a; - #[inline] fn cancelable_write( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableWriteFuture<'_, T> { + ) -> impl Future> { (**self).cancelable_write(buf, c) } @@ -140,17 +81,20 @@ impl CancelableAsyncWriteRent for &mut A { &mut self, buf_vec: T, c: CancelHandle, - ) -> Self::CancelableWritevFuture<'_, T> { + ) -> impl Future> { (**self).cancelable_writev(buf_vec, c) } #[inline] - fn cancelable_flush(&mut self, c: CancelHandle) -> Self::CancelableFlushFuture<'_> { + fn cancelable_flush(&mut self, c: CancelHandle) -> impl Future> { (**self).cancelable_flush(c) } #[inline] - fn cancelable_shutdown(&mut self, c: CancelHandle) -> Self::CancelableShutdownFuture<'_> { + fn cancelable_shutdown( + &mut self, + c: CancelHandle, + ) -> impl Future> { (**self).cancelable_shutdown(c) } } diff --git a/monoio/src/io/async_rent_cancelable_ext.rs b/monoio/src/io/async_rent_cancelable_ext.rs index 523200a5..4b3efc0b 100644 --- a/monoio/src/io/async_rent_cancelable_ext.rs +++ b/monoio/src/io/async_rent_cancelable_ext.rs @@ -8,87 +8,52 @@ use crate::{ macro_rules! reader_trait { ($future: ident, $n_ty: ty, $f: ident) => { - /// Read number result - type $future<'a>: Future> - where - Self: 'a; - /// Read number in async way - fn $f(&mut self, c: CancelHandle) -> Self::$future<'_>; + fn $f(&mut self, c: CancelHandle) -> impl Future>; }; } macro_rules! reader_be_impl { ($future: ident, $n_ty: ty, $f: ident) => { - type $future<'a> = impl Future> + 'a where A: 'a; - - fn $f(&mut self, c: CancelHandle) -> Self::$future<'_> { - async { - let (res, buf) = self - .cancelable_read_exact( - std::boxed::Box::new([0; std::mem::size_of::<$n_ty>()]), - c, - ) - .await; - res?; - use crate::utils::box_into_inner::IntoInner; - Ok(<$n_ty>::from_be_bytes(Box::consume(buf))) - } + async fn $f(&mut self, c: CancelHandle) -> std::io::Result<$n_ty> { + let (res, buf) = self + .cancelable_read_exact(std::boxed::Box::new([0; std::mem::size_of::<$n_ty>()]), c) + .await; + res?; + use crate::utils::box_into_inner::IntoInner; + Ok(<$n_ty>::from_be_bytes(Box::consume(buf))) } }; } macro_rules! reader_le_impl { ($future: ident, $n_ty: ty, $f: ident) => { - type $future<'a> = impl Future> + 'a where A: 'a; - - fn $f(&mut self, c: CancelHandle) -> Self::$future<'_> { - async { - let (res, buf) = self - .cancelable_read_exact( - std::boxed::Box::new([0; std::mem::size_of::<$n_ty>()]), - c, - ) - .await; - res?; - use crate::utils::box_into_inner::IntoInner; - Ok(<$n_ty>::from_le_bytes(Box::consume(buf))) - } + async fn $f(&mut self, c: CancelHandle) -> std::io::Result<$n_ty> { + let (res, buf) = self + .cancelable_read_exact(std::boxed::Box::new([0; std::mem::size_of::<$n_ty>()]), c) + .await; + res?; + use crate::utils::box_into_inner::IntoInner; + Ok(<$n_ty>::from_le_bytes(Box::consume(buf))) } }; } /// CancelableAsyncReadRentExt pub trait CancelableAsyncReadRentExt { - /// The future of Result - type CancelableReadExactFuture<'a, T>: Future> - where - Self: 'a, - T: IoBufMut + 'a; - /// Read until buf capacity is fulfilled - fn cancelable_read_exact( + fn cancelable_read_exact( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadExactFuture<'_, T> - where - T: 'static + IoBufMut; - - /// The future of Result - type CancelableReadVectoredExactFuture<'a, T>: Future> - where - Self: 'a, - T: IoVecBufMut + 'a; + ) -> impl Future>; /// Readv until buf capacity is fulfilled - fn cancelable_read_vectored_exact( + fn cancelable_read_vectored_exact( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadVectoredExactFuture<'_, T> - where - T: 'static + IoVecBufMut; + ) -> impl Future>; reader_trait!(ReadU8Future, u8, cancelable_read_u8); reader_trait!(ReadU16Future, u16, cancelable_read_u16); @@ -121,79 +86,66 @@ impl CancelableAsyncReadRentExt for A where A: CancelableAsyncReadRent + ?Sized, { - type CancelableReadExactFuture<'a, T> = impl Future> + 'a where A: 'a, T: IoBufMut + 'a; - - fn cancelable_read_exact( + async fn cancelable_read_exact( &mut self, mut buf: T, c: CancelHandle, - ) -> Self::CancelableReadExactFuture<'_, T> - where - T: 'static + IoBufMut, - { - async move { - let len = buf.bytes_total(); - let mut read = 0; - while read < len { - let buf_slice = unsafe { SliceMut::new_unchecked(buf, read, len) }; - let (result, buf_slice) = self.cancelable_read(buf_slice, c.clone()).await; - buf = buf_slice.into_inner(); - match result { - Ok(0) => { - return ( - Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - )), - buf, - ) - } - Ok(n) => { - read += n; - unsafe { buf.set_init(read) }; - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (Err(e), buf), + ) -> BufResult { + let len = buf.bytes_total(); + let mut read = 0; + while read < len { + let buf_slice = unsafe { SliceMut::new_unchecked(buf, read, len) }; + let (result, buf_slice) = self.cancelable_read(buf_slice, c.clone()).await; + buf = buf_slice.into_inner(); + match result { + Ok(0) => { + return ( + Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )), + buf, + ) } + Ok(n) => { + read += n; + unsafe { buf.set_init(read) }; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return (Err(e), buf), } - (Ok(read), buf) } + (Ok(read), buf) } - type CancelableReadVectoredExactFuture<'a, T> = impl Future> + 'a where A: 'a, T: IoVecBufMut + 'a; - - fn cancelable_read_vectored_exact( + async fn cancelable_read_vectored_exact( &mut self, mut buf: T, c: CancelHandle, - ) -> Self::CancelableReadVectoredExactFuture<'_, T> - where - T: 'static + IoVecBufMut, - { + ) -> BufResult { let mut meta = crate::buf::write_vec_meta(&mut buf); let len = meta.len(); let mut read = 0; - async move { - while read < len { - let (res, meta_) = self.cancelable_readv(meta, c.clone()).await; - meta = meta_; - match res { - Ok(0) => { - return ( - Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - )), - buf, - ) - } - Ok(n) => read += n, - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (Err(e), buf), + + while read < len { + let (res, meta_) = self.cancelable_readv(meta, c.clone()).await; + meta = meta_; + match res { + Ok(0) => { + return ( + Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )), + buf, + ) } + Ok(n) => read += n, + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return (Err(e), buf), } - (Ok(read), buf) } + (Ok(read), buf) } reader_be_impl!(ReadU8Future, u8, cancelable_read_u8); @@ -225,106 +177,84 @@ where /// CancelableAsyncWriteRentExt pub trait CancelableAsyncWriteRentExt { - /// The future of Result - type WriteExactFuture<'a, T>: Future> - where - Self: 'a, - T: IoBuf + 'a; - /// Write all - fn write_all(&mut self, buf: T, c: CancelHandle) -> Self::WriteExactFuture<'_, T> - where - T: 'static + IoBuf; - - /// The future of Result - type WriteVectoredExactFuture<'a, T>: Future> - where - Self: 'a, - T: IoVecBuf + 'a; + fn write_all( + &mut self, + buf: T, + c: CancelHandle, + ) -> impl Future>; /// Write all - fn write_vectored_all( + fn write_vectored_all( &mut self, buf: T, c: CancelHandle, - ) -> Self::WriteVectoredExactFuture<'_, T> - where - T: 'static + IoVecBuf; + ) -> impl Future>; } impl CancelableAsyncWriteRentExt for A where A: CancelableAsyncWriteRent + ?Sized, { - type WriteExactFuture<'a, T> = impl Future> + 'a where A: 'a, T: IoBuf + 'a; - - fn write_all(&mut self, mut buf: T, c: CancelHandle) -> Self::WriteExactFuture<'_, T> - where - T: 'static + IoBuf, - { - async move { - let len = buf.bytes_init(); - let mut written = 0; - while written < len { - let buf_slice = unsafe { Slice::new_unchecked(buf, written, len) }; - let (result, buf_slice) = self.cancelable_write(buf_slice, c.clone()).await; - buf = buf_slice.into_inner(); - match result { - Ok(0) => { - return ( - Err(std::io::Error::new( - std::io::ErrorKind::WriteZero, - "failed to write whole buffer", - )), - buf, - ) - } - Ok(n) => written += n, - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (Err(e), buf), + async fn write_all( + &mut self, + mut buf: T, + c: CancelHandle, + ) -> BufResult { + let len = buf.bytes_init(); + let mut written = 0; + while written < len { + let buf_slice = unsafe { Slice::new_unchecked(buf, written, len) }; + let (result, buf_slice) = self.cancelable_write(buf_slice, c.clone()).await; + buf = buf_slice.into_inner(); + match result { + Ok(0) => { + return ( + Err(std::io::Error::new( + std::io::ErrorKind::WriteZero, + "failed to write whole buffer", + )), + buf, + ) } + Ok(n) => written += n, + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return (Err(e), buf), } - (Ok(written), buf) } + (Ok(written), buf) } - type WriteVectoredExactFuture<'a, T> = impl Future> + 'a where A: 'a, T: IoVecBuf + 'a; - - fn write_vectored_all( + async fn write_vectored_all( &mut self, buf: T, c: CancelHandle, - ) -> Self::WriteVectoredExactFuture<'_, T> - where - T: 'static + IoVecBuf, - { + ) -> BufResult { let mut meta = crate::buf::read_vec_meta(&buf); let len = meta.len(); let mut written = 0; - async move { - while written < len { - let (res, meta_) = self.cancelable_writev(meta, c.clone()).await; - meta = meta_; - match res { - Ok(0) => { - return ( - Err(std::io::Error::new( - std::io::ErrorKind::WriteZero, - "failed to write whole buffer", - )), - buf, - ) - } - Ok(n) => { - written += n; - meta.consume(n); - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (Err(e), buf), + while written < len { + let (res, meta_) = self.cancelable_writev(meta, c.clone()).await; + meta = meta_; + match res { + Ok(0) => { + return ( + Err(std::io::Error::new( + std::io::ErrorKind::WriteZero, + "failed to write whole buffer", + )), + buf, + ) + } + Ok(n) => { + written += n; + meta.consume(n); } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return (Err(e), buf), } - (Ok(written), buf) } + (Ok(written), buf) } } diff --git a/monoio/src/io/async_write_rent.rs b/monoio/src/io/async_write_rent.rs index 1b474a27..3c8c70bb 100644 --- a/monoio/src/io/async_write_rent.rs +++ b/monoio/src/io/async_write_rent.rs @@ -7,88 +7,47 @@ use crate::{ /// AsyncWriteRent: async write with a ownership of a buffer pub trait AsyncWriteRent { - /// The future of write Result - type WriteFuture<'a, T>: Future> - where - Self: 'a, - T: IoBuf + 'a; - /// The future of writev Result - type WritevFuture<'a, T>: Future> - where - Self: 'a, - T: IoVecBuf + 'a; - - /// The future of flush - type FlushFuture<'a>: Future> - where - Self: 'a; - - /// The future of shutdown - type ShutdownFuture<'a>: Future> - where - Self: 'a; - /// Same as write(2) - fn write(&mut self, buf: T) -> Self::WriteFuture<'_, T>; + fn write(&mut self, buf: T) -> impl Future>; /// Same as writev(2) - fn writev(&mut self, buf_vec: T) -> Self::WritevFuture<'_, T>; + fn writev(&mut self, buf_vec: T) -> impl Future>; /// Flush buffered data if needed - fn flush(&mut self) -> Self::FlushFuture<'_>; + fn flush(&mut self) -> impl Future>; /// Same as shutdown - fn shutdown(&mut self) -> Self::ShutdownFuture<'_>; + fn shutdown(&mut self) -> impl Future>; } /// AsyncWriteRentAt: async write with a ownership of a buffer and a position pub trait AsyncWriteRentAt { - /// The future of Result - type Future<'a, T>: Future> - where - Self: 'a, - T: 'a; - /// Write buf at given offset - fn write_at(&self, buf: T, pos: usize) -> Self::Future<'_, T>; + fn write_at( + &self, + buf: T, + pos: usize, + ) -> impl Future>; } impl AsyncWriteRent for &mut A { - type WriteFuture<'a, T> = A::WriteFuture<'a, T> - where - Self: 'a, - T: IoBuf + 'a; - - type WritevFuture<'a, T> = A::WritevFuture<'a, T> - where - Self: 'a, - T: IoVecBuf + 'a; - - type FlushFuture<'a> = A::FlushFuture<'a> - where - Self: 'a; - - type ShutdownFuture<'a> = A::ShutdownFuture<'a> - where - Self: 'a; - #[inline] - fn write(&mut self, buf: T) -> Self::WriteFuture<'_, T> { + fn write(&mut self, buf: T) -> impl Future> { (**self).write(buf) } #[inline] - fn writev(&mut self, buf_vec: T) -> Self::WritevFuture<'_, T> { + fn writev(&mut self, buf_vec: T) -> impl Future> { (**self).writev(buf_vec) } #[inline] - fn flush(&mut self) -> Self::FlushFuture<'_> { + fn flush(&mut self) -> impl Future> { (**self).flush() } #[inline] - fn shutdown(&mut self) -> Self::ShutdownFuture<'_> { + fn shutdown(&mut self) -> impl Future> { (**self).shutdown() } } diff --git a/monoio/src/io/async_write_rent_ext.rs b/monoio/src/io/async_write_rent_ext.rs index 03c3adff..7c270446 100644 --- a/monoio/src/io/async_write_rent_ext.rs +++ b/monoio/src/io/async_write_rent_ext.rs @@ -8,98 +8,74 @@ use crate::{ /// AsyncWriteRentExt pub trait AsyncWriteRentExt { - /// The future of Result - type WriteExactFuture<'a, T>: Future> - where - Self: 'a, - T: IoBuf + 'a; - /// Write all - fn write_all(&mut self, buf: T) -> Self::WriteExactFuture<'_, T> - where - T: 'static + IoBuf; - - /// The future of Result - type WriteVectoredExactFuture<'a, T>: Future> - where - Self: 'a, - T: IoVecBuf + 'a; + fn write_all( + &mut self, + buf: T, + ) -> impl Future>; - /// Write all - fn write_vectored_all(&mut self, buf: T) -> Self::WriteVectoredExactFuture<'_, T> - where - T: 'static + IoVecBuf; + /// Write vectored all + fn write_vectored_all( + &mut self, + buf: T, + ) -> impl Future>; } impl AsyncWriteRentExt for A where A: AsyncWriteRent + ?Sized, { - type WriteExactFuture<'a, T> = impl Future> + 'a where A: 'a, T: IoBuf + 'a; - - fn write_all(&mut self, mut buf: T) -> Self::WriteExactFuture<'_, T> - where - T: 'static + IoBuf, - { - async move { - let len = buf.bytes_init(); - let mut written = 0; - while written < len { - let buf_slice = unsafe { Slice::new_unchecked(buf, written, len) }; - let (result, buf_slice) = self.write(buf_slice).await; - buf = buf_slice.into_inner(); - match result { - Ok(0) => { - return ( - Err(std::io::Error::new( - std::io::ErrorKind::WriteZero, - "failed to write whole buffer", - )), - buf, - ) - } - Ok(n) => written += n, - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (Err(e), buf), + async fn write_all(&mut self, mut buf: T) -> BufResult { + let len = buf.bytes_init(); + let mut written = 0; + while written < len { + let buf_slice = unsafe { Slice::new_unchecked(buf, written, len) }; + let (result, buf_slice) = self.write(buf_slice).await; + buf = buf_slice.into_inner(); + match result { + Ok(0) => { + return ( + Err(std::io::Error::new( + std::io::ErrorKind::WriteZero, + "failed to write whole buffer", + )), + buf, + ) } + Ok(n) => written += n, + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return (Err(e), buf), } - (Ok(written), buf) } + (Ok(written), buf) } - type WriteVectoredExactFuture<'a, T> = impl Future> + 'a where A: 'a, T: IoVecBuf + 'a; - - fn write_vectored_all(&mut self, buf: T) -> Self::WriteVectoredExactFuture<'_, T> - where - T: 'static + IoVecBuf, - { + async fn write_vectored_all(&mut self, buf: T) -> BufResult { let mut meta = crate::buf::read_vec_meta(&buf); let len = meta.len(); let mut written = 0; - async move { - while written < len { - let (res, meta_) = self.writev(meta).await; - meta = meta_; - match res { - Ok(0) => { - return ( - Err(std::io::Error::new( - std::io::ErrorKind::WriteZero, - "failed to write whole buffer", - )), - buf, - ) - } - Ok(n) => { - written += n; - meta.consume(n); - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (Err(e), buf), + while written < len { + let (res, meta_) = self.writev(meta).await; + meta = meta_; + match res { + Ok(0) => { + return ( + Err(std::io::Error::new( + std::io::ErrorKind::WriteZero, + "failed to write whole buffer", + )), + buf, + ) + } + Ok(n) => { + written += n; + meta.consume(n); } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return (Err(e), buf), } - (Ok(written), buf) } + (Ok(written), buf) } } diff --git a/monoio/src/io/sink/mod.rs b/monoio/src/io/sink/mod.rs index f9bfc7a8..73439847 100644 --- a/monoio/src/io/sink/mod.rs +++ b/monoio/src/io/sink/mod.rs @@ -1,6 +1,8 @@ //! Sink trait in GAT style. mod sink_ext; +use std::future::Future; + pub use sink_ext::SinkExt; /// A `Sink` is a value into which other values can be sent in pure async/await. @@ -9,61 +11,28 @@ pub trait Sink { /// The type of value produced by the sink when an error occurs. type Error; - /// Future representing the send result. - type SendFuture<'a>: std::future::Future> - where - Self: 'a, - Item: 'a; - - /// Future representing the flush result. - type FlushFuture<'a>: std::future::Future> - where - Self: 'a; - - /// Future representing the close result. - type CloseFuture<'a>: std::future::Future> - where - Self: 'a; - /// Send item. - fn send<'a>(&'a mut self, item: Item) -> Self::SendFuture<'a> - where - Item: 'a; + fn send(&mut self, item: Item) -> impl Future>; /// Flush any remaining output from this sink. - fn flush(&mut self) -> Self::FlushFuture<'_>; + fn flush(&mut self) -> impl Future>; /// Flush any remaining output and close this sink, if necessary. - fn close(&mut self) -> Self::CloseFuture<'_>; + fn close(&mut self) -> impl Future>; } impl> Sink for &mut S { type Error = S::Error; - type SendFuture<'a> = S::SendFuture<'a> - where - Self: 'a, T: 'a; - - type FlushFuture<'a> = S::FlushFuture<'a> - where - Self: 'a; - - type CloseFuture<'a> = S::CloseFuture<'a> - where - Self: 'a; - - fn send<'a>(&'a mut self, item: T) -> Self::SendFuture<'a> - where - T: 'a, - { + fn send(&mut self, item: T) -> impl Future> { (**self).send(item) } - fn flush(&mut self) -> Self::FlushFuture<'_> { + fn flush(&mut self) -> impl Future> { (**self).flush() } - fn close(&mut self) -> Self::CloseFuture<'_> { + fn close(&mut self) -> impl Future> { (**self).close() } } diff --git a/monoio/src/io/sink/sink_ext.rs b/monoio/src/io/sink/sink_ext.rs index 2b10bd8c..271218fc 100644 --- a/monoio/src/io/sink/sink_ext.rs +++ b/monoio/src/io/sink/sink_ext.rs @@ -1,33 +1,19 @@ +use std::future::Future; + use super::Sink; /// Sink extensions. pub trait SinkExt: Sink { - /// SendFlushFuture. - type SendFlushFuture<'a>: std::future::Future> + 'a - where - Self: 'a, - T: 'a; - /// Send and flush. - fn send_and_flush<'a>(&'a mut self, item: T) -> Self::SendFlushFuture<'a> - where - T: 'a; + fn send_and_flush(&mut self, item: T) -> impl Future>; } impl SinkExt for A where A: Sink, { - type SendFlushFuture<'a> = impl std::future::Future> + 'a where - A: 'a, T: 'a; - - fn send_and_flush<'a>(&'a mut self, item: T) -> Self::SendFlushFuture<'a> - where - T: 'a, - { - async move { - Sink::::send(self, item).await?; - Sink::::flush(self).await - } + async fn send_and_flush(&mut self, item: T) -> Result<(), Self::Error> { + Sink::::send(self, item).await?; + Sink::::flush(self).await } } diff --git a/monoio/src/io/splice.rs b/monoio/src/io/splice.rs index c20ae434..1b868796 100644 --- a/monoio/src/io/splice.rs +++ b/monoio/src/io/splice.rs @@ -7,46 +7,46 @@ use crate::{driver::op::Op, net::Pipe}; /// Splice data from self to pipe. pub trait SpliceSource { - /// The returned future of splice_to_pipe - type SpliceFuture<'a>: Future> - where - Self: 'a; /// Splice data from self to pipe. - fn splice_to_pipe<'a>(&'a mut self, pipe: &'a mut Pipe, len: u32) -> Self::SpliceFuture<'_>; + fn splice_to_pipe<'a>( + &'a mut self, + pipe: &'a mut Pipe, + len: u32, + ) -> impl Future>; } /// Splice data from self from pipe. pub trait SpliceDestination { - /// The returned future of splice_from_pipe - type SpliceFuture<'a>: Future> - where - Self: 'a; /// Splice data from self from pipe. - fn splice_from_pipe<'a>(&'a mut self, pipe: &'a mut Pipe, len: u32) -> Self::SpliceFuture<'_>; + fn splice_from_pipe<'a>( + &'a mut self, + pipe: &'a mut Pipe, + len: u32, + ) -> impl Future>; } impl SpliceSource for T { - type SpliceFuture<'a> = impl Future> + 'a where Self: 'a; - #[inline] - fn splice_to_pipe<'a>(&'a mut self, pipe: &'a mut Pipe, len: u32) -> Self::SpliceFuture<'_> { - async move { - Op::splice_to_pipe(self.as_reader_fd().as_ref(), &pipe.fd, len)? - .splice() - .await - } + async fn splice_to_pipe<'a>( + &'a mut self, + pipe: &'a mut Pipe, + len: u32, + ) -> std::io::Result { + Op::splice_to_pipe(self.as_reader_fd().as_ref(), &pipe.fd, len)? + .splice() + .await } } impl SpliceDestination for T { - type SpliceFuture<'a> = impl Future> + 'a where Self: 'a; - #[inline] - fn splice_from_pipe<'a>(&'a mut self, pipe: &'a mut Pipe, len: u32) -> Self::SpliceFuture<'_> { - async move { - Op::splice_from_pipe(&pipe.fd, self.as_writer_fd().as_ref(), len)? - .splice() - .await - } + async fn splice_from_pipe<'a>( + &'a mut self, + pipe: &'a mut Pipe, + len: u32, + ) -> std::io::Result { + Op::splice_from_pipe(&pipe.fd, self.as_writer_fd().as_ref(), len)? + .splice() + .await } } diff --git a/monoio/src/io/stream/iter.rs b/monoio/src/io/stream/iter.rs index 7717c482..0bd98227 100644 --- a/monoio/src/io/stream/iter.rs +++ b/monoio/src/io/stream/iter.rs @@ -1,5 +1,3 @@ -use std::future::Future; - use super::{assert_stream, Stream}; /// Stream for the [`iter`] function. @@ -26,11 +24,8 @@ where { type Item = I::Item; - type NextFuture<'a> = impl Future> + 'a where - I: 'a; - - fn next(&mut self) -> Self::NextFuture<'_> { - async move { self.iter.next() } + async fn next(&mut self) -> Option { + self.iter.next() } fn size_hint(&self) -> (usize, Option) { diff --git a/monoio/src/io/stream/mod.rs b/monoio/src/io/stream/mod.rs index 7dc375b0..3a70d3e2 100644 --- a/monoio/src/io/stream/mod.rs +++ b/monoio/src/io/stream/mod.rs @@ -3,8 +3,10 @@ mod iter; mod stream_ext; +use std::future::Future; + pub use iter::{iter, Iter}; -pub use stream_ext::{ForEachFut, StreamExt}; +pub use stream_ext::StreamExt; /// A stream of values produced asynchronously in pure async/await. #[must_use = "streams do nothing unless polled"] @@ -12,15 +14,10 @@ pub trait Stream { /// Values yielded by the stream. type Item; - /// Future representing the next value of the stream. - type NextFuture<'a>: std::future::Future> - where - Self: 'a; - /// Attempt to pull out the next value of this stream, registering the /// current task for wakeup if the value is not yet available, and returning /// `None` if the stream is exhausted. - fn next(&mut self) -> Self::NextFuture<'_>; + fn next(&mut self) -> impl Future>; /// Returns the bounds on the remaining length of the stream. /// @@ -58,11 +55,7 @@ pub trait Stream { impl Stream for &mut S { type Item = S::Item; - type NextFuture<'a> = S::NextFuture<'a> - where - Self: 'a; - - fn next(&mut self) -> Self::NextFuture<'_> { + fn next(&mut self) -> impl Future> { (**self).next() } } diff --git a/monoio/src/io/stream/stream_ext.rs b/monoio/src/io/stream/stream_ext.rs index 5e4f1bde..207bfe1f 100644 --- a/monoio/src/io/stream/stream_ext.rs +++ b/monoio/src/io/stream/stream_ext.rs @@ -26,7 +26,7 @@ pub trait StreamExt: Stream { /// Runs this stream to completion, executing the provided asynchronous /// closure for each element on the stream. - fn for_each(mut self, mut f: F) -> ForEachFut + fn for_each(mut self, mut f: F) -> impl Future where F: FnMut(Self::Item) -> Fut, Fut: Future, @@ -42,10 +42,6 @@ pub trait StreamExt: Stream { impl StreamExt for T where T: Stream {} -/// A ForEachFut is a Future generated by Stream's for_each. -pub type ForEachFut, F: FnMut(::Item) -> Fut> = - impl Future; - #[must_use = "streams do nothing unless polled"] pub struct Map { stream: St, @@ -65,11 +61,8 @@ where { type Item = Item; - type NextFuture<'a> = impl Future> + 'a where - F: 'a, St: 'a; - - fn next(&mut self) -> Self::NextFuture<'_> { - async move { self.stream.next().await.map(&mut self.f) } + async fn next(&mut self) -> Option { + self.stream.next().await.map(&mut self.f) } } @@ -96,13 +89,8 @@ where { type Item = Fut::Output; - type NextFuture<'a> = impl Future> + 'a where - F: 'a, St: 'a,; - - fn next(&mut self) -> Self::NextFuture<'_> { - async move { - let item = self.stream.next().await?; - Some((self.f)(item).await) - } + async fn next(&mut self) -> Option { + let item = self.stream.next().await?; + Some((self.f)(item).await) } } diff --git a/monoio/src/io/util/buf_reader.rs b/monoio/src/io/util/buf_reader.rs index ef60ff4b..f081749d 100644 --- a/monoio/src/io/util/buf_reader.rs +++ b/monoio/src/io/util/buf_reader.rs @@ -3,6 +3,7 @@ use std::future::Future; use crate::{ buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, IoVecWrapperMut}, io::{AsyncBufRead, AsyncReadRent, AsyncWriteRent}, + BufResult, }; /// BufReader is a struct with a buffer. BufReader implements AsyncBufRead @@ -70,89 +71,75 @@ impl BufReader { } impl AsyncReadRent for BufReader { - type ReadFuture<'a, T> = impl Future> + 'a where - T: IoBufMut + 'a, R: 'a; - - type ReadvFuture<'a, T> = impl Future> + 'a where - T: IoVecBufMut + 'a, R: 'a; - - fn read(&mut self, mut buf: T) -> Self::ReadFuture<'_, T> { - async move { - // If we don't have any buffered data and we're doing a massive read - // (larger than our internal buffer), bypass our internal buffer - // entirely. - let owned_buf = self.buf.as_ref().unwrap(); - if self.pos == self.cap && buf.bytes_total() >= owned_buf.len() { - self.discard_buffer(); - return self.inner.read(buf).await; - } + async fn read(&mut self, mut buf: T) -> BufResult { + // If we don't have any buffered data and we're doing a massive read + // (larger than our internal buffer), bypass our internal buffer + // entirely. + let owned_buf = self.buf.as_ref().unwrap(); + if self.pos == self.cap && buf.bytes_total() >= owned_buf.len() { + self.discard_buffer(); + return self.inner.read(buf).await; + } - let rem = match self.fill_buf().await { - Ok(slice) => slice, - Err(e) => { - return (Err(e), buf); - } - }; - let amt = std::cmp::min(rem.len(), buf.bytes_total()); - unsafe { - buf.write_ptr().copy_from_nonoverlapping(rem.as_ptr(), amt); - buf.set_init(amt); + let rem = match self.fill_buf().await { + Ok(slice) => slice, + Err(e) => { + return (Err(e), buf); } - self.consume(amt); - (Ok(amt), buf) + }; + let amt = std::cmp::min(rem.len(), buf.bytes_total()); + unsafe { + buf.write_ptr().copy_from_nonoverlapping(rem.as_ptr(), amt); + buf.set_init(amt); } + self.consume(amt); + (Ok(amt), buf) } - fn readv(&mut self, mut buf: T) -> Self::ReadvFuture<'_, T> { - async move { - let slice = match IoVecWrapperMut::new(buf) { - Ok(slice) => slice, - Err(buf) => return (Ok(0), buf), - }; - - let (result, slice) = self.read(slice).await; - buf = slice.into_inner(); - if let Ok(n) = result { - unsafe { buf.set_init(n) }; - } - (result, buf) + async fn readv(&mut self, mut buf: T) -> BufResult { + let slice = match IoVecWrapperMut::new(buf) { + Ok(slice) => slice, + Err(buf) => return (Ok(0), buf), + }; + + let (result, slice) = self.read(slice).await; + buf = slice.into_inner(); + if let Ok(n) = result { + unsafe { buf.set_init(n) }; } + (result, buf) } } impl AsyncBufRead for BufReader { - type FillBufFuture<'a> = impl Future> where Self: 'a; - - fn fill_buf(&mut self) -> Self::FillBufFuture<'_> { - async { - if self.pos == self.cap { - // there's no buffered data - let buf = self - .buf - .take() - .expect("no buffer available, generated future must be awaited"); - let (res, buf_) = self.inner.read(buf).await; - self.buf = Some(buf_); - match res { - Ok(n) => { - self.pos = 0; - self.cap = n; - return Ok(unsafe { - // We just put the buf into Option, so it must be Some. - &(self.buf.as_ref().unwrap_unchecked().as_ref())[self.pos..self.cap] - }); - } - Err(e) => { - return Err(e); - } + async fn fill_buf(&mut self) -> std::io::Result<&[u8]> { + if self.pos == self.cap { + // there's no buffered data + let buf = self + .buf + .take() + .expect("no buffer available, generated future must be awaited"); + let (res, buf_) = self.inner.read(buf).await; + self.buf = Some(buf_); + match res { + Ok(n) => { + self.pos = 0; + self.cap = n; + return Ok(unsafe { + // We just put the buf into Option, so it must be Some. + &(self.buf.as_ref().unwrap_unchecked().as_ref())[self.pos..self.cap] + }); + } + Err(e) => { + return Err(e); } } - Ok(&(self - .buf - .as_ref() - .expect("no buffer available, generated future must be awaited") - .as_ref())[self.pos..self.cap]) } + Ok(&(self + .buf + .as_ref() + .expect("no buffer available, generated future must be awaited") + .as_ref())[self.pos..self.cap]) } fn consume(&mut self, amt: usize) { @@ -161,33 +148,23 @@ impl AsyncBufRead for BufReader { } impl AsyncWriteRent for BufReader { - type WriteFuture<'a, T> = R::WriteFuture<'a, T> where - T: IoBuf + 'a, R: 'a; - - type WritevFuture<'a, T>= R::WritevFuture<'a, T> where - T: IoVecBuf + 'a, R: 'a; - - type FlushFuture<'a> = R::FlushFuture<'a> where R: 'a; - - type ShutdownFuture<'a> = R::ShutdownFuture<'a> where R: 'a; - #[inline] - fn write(&mut self, buf: T) -> Self::WriteFuture<'_, T> { + fn write(&mut self, buf: T) -> impl Future> { self.inner.write(buf) } #[inline] - fn writev(&mut self, buf_vec: T) -> Self::WritevFuture<'_, T> { + fn writev(&mut self, buf_vec: T) -> impl Future> { self.inner.writev(buf_vec) } #[inline] - fn flush(&mut self) -> Self::FlushFuture<'_> { + fn flush(&mut self) -> impl Future> { self.inner.flush() } #[inline] - fn shutdown(&mut self) -> Self::ShutdownFuture<'_> { + fn shutdown(&mut self) -> impl Future> { self.inner.shutdown() } } diff --git a/monoio/src/io/util/buf_writer.rs b/monoio/src/io/util/buf_writer.rs index faf9d757..3eb4b28e 100644 --- a/monoio/src/io/util/buf_writer.rs +++ b/monoio/src/io/util/buf_writer.rs @@ -3,6 +3,7 @@ use std::{future::Future, io}; use crate::{ buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, IoVecWrapper, Slice}, io::{AsyncBufRead, AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt}, + BufResult, }; /// BufWriter is a struct with a buffer. BufWriter implements AsyncWriteRent, @@ -89,108 +90,80 @@ impl BufWriter { } impl AsyncWriteRent for BufWriter { - type WriteFuture<'a, T> = impl Future> + 'a where - T: IoBuf + 'a, W: 'a; - - type WritevFuture<'a, T> = impl Future> + 'a where - T: IoVecBuf + 'a, W: 'a; - - type FlushFuture<'a> = impl Future> + 'a where - W: 'a; - - type ShutdownFuture<'a> = impl Future> + 'a where - W: 'a; - - fn write(&mut self, buf: T) -> Self::WriteFuture<'_, T> { - async move { - let owned_buf = self.buf.as_ref().unwrap(); - let owned_len = owned_buf.len(); - let amt = buf.bytes_init(); - - if self.pos + amt > owned_len { - // Buf can not be copied directly into OwnedBuf, - // we must flush OwnedBuf first. - match self.flush_buf().await { - Ok(_) => (), - Err(e) => { - return (Err(e), buf); - } + async fn write(&mut self, buf: T) -> BufResult { + let owned_buf = self.buf.as_ref().unwrap(); + let owned_len = owned_buf.len(); + let amt = buf.bytes_init(); + + if self.pos + amt > owned_len { + // Buf can not be copied directly into OwnedBuf, + // we must flush OwnedBuf first. + match self.flush_buf().await { + Ok(_) => (), + Err(e) => { + return (Err(e), buf); } } + } - // Now there are two situations here: - // 1. OwnedBuf has data, and self.pos + amt <= owned_len, - // which means the data can be copied into OwnedBuf. - // 2. OwnedBuf is empty. If we can copy buf into OwnedBuf, - // we will copy it, otherwise we will send it directly(in - // this situation, the OwnedBuf must be already empty). - if amt > owned_len { - self.inner.write(buf).await - } else { - unsafe { - let owned_buf = self.buf.as_mut().unwrap(); - owned_buf - .as_mut_ptr() - .add(self.pos) - .copy_from_nonoverlapping(buf.read_ptr(), amt); - } - self.cap += amt; - (Ok(amt), buf) + // Now there are two situations here: + // 1. OwnedBuf has data, and self.pos + amt <= owned_len, + // which means the data can be copied into OwnedBuf. + // 2. OwnedBuf is empty. If we can copy buf into OwnedBuf, + // we will copy it, otherwise we will send it directly(in + // this situation, the OwnedBuf must be already empty). + if amt > owned_len { + self.inner.write(buf).await + } else { + unsafe { + let owned_buf = self.buf.as_mut().unwrap(); + owned_buf + .as_mut_ptr() + .add(self.pos) + .copy_from_nonoverlapping(buf.read_ptr(), amt); } + self.cap += amt; + (Ok(amt), buf) } } // TODO: implement it as real io_vec - fn writev(&mut self, buf: T) -> Self::WritevFuture<'_, T> { - async move { - let slice = match IoVecWrapper::new(buf) { - Ok(slice) => slice, - Err(buf) => return (Ok(0), buf), - }; - - let (result, slice) = self.write(slice).await; - (result, slice.into_inner()) - } + async fn writev(&mut self, buf: T) -> BufResult { + let slice = match IoVecWrapper::new(buf) { + Ok(slice) => slice, + Err(buf) => return (Ok(0), buf), + }; + + let (result, slice) = self.write(slice).await; + (result, slice.into_inner()) } - fn flush(&mut self) -> Self::FlushFuture<'_> { - async move { - self.flush_buf().await?; - self.inner.flush().await - } + async fn flush(&mut self) -> std::io::Result<()> { + self.flush_buf().await?; + self.inner.flush().await } - fn shutdown(&mut self) -> Self::ShutdownFuture<'_> { - async move { - self.flush_buf().await?; - self.inner.shutdown().await - } + async fn shutdown(&mut self) -> std::io::Result<()> { + self.flush_buf().await?; + self.inner.shutdown().await } } impl AsyncReadRent for BufWriter { - type ReadFuture<'a, T> = W::ReadFuture<'a, T> where - T: IoBufMut + 'a, W: 'a; - - type ReadvFuture<'a, T> = W::ReadvFuture<'a, T> where - T: IoVecBufMut + 'a, W: 'a; - #[inline] - fn read(&mut self, buf: T) -> Self::ReadFuture<'_, T> { + fn read(&mut self, buf: T) -> impl Future> { self.inner.read(buf) } #[inline] - fn readv(&mut self, buf: T) -> Self::ReadvFuture<'_, T> { + fn readv(&mut self, buf: T) -> impl Future> { self.inner.readv(buf) } } impl AsyncBufRead for BufWriter { - type FillBufFuture<'a> = W::FillBufFuture<'a> where W: 'a; - #[inline] - fn fill_buf(&mut self) -> Self::FillBufFuture<'_> { + fn fill_buf(&mut self) -> impl Future> { self.inner.fill_buf() } diff --git a/monoio/src/io/util/prefixed_io.rs b/monoio/src/io/util/prefixed_io.rs index f523a4ea..f5c2d29e 100644 --- a/monoio/src/io/util/prefixed_io.rs +++ b/monoio/src/io/util/prefixed_io.rs @@ -1,7 +1,10 @@ +use std::future::Future; + use super::{split::Split, CancelHandle}; use crate::{ buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut, IoVecWrapperMut}, io::{AsyncReadRent, AsyncWriteRent, CancelableAsyncReadRent, CancelableAsyncWriteRent}, + BufResult, }; /// PrefixedReadIO facilitates the addition of a prefix to an IO stream, @@ -57,172 +60,128 @@ impl PrefixedReadIo { } impl AsyncReadRent for PrefixedReadIo { - type ReadFuture<'a, T> = impl std::future::Future> + 'a - where - T: IoBufMut + 'a, Self: 'a; - - type ReadvFuture<'a, T> = impl std::future::Future> + 'a - where - T: IoVecBufMut + 'a, Self: 'a; - - fn read(&mut self, mut buf: T) -> Self::ReadFuture<'_, T> { - async move { - if buf.bytes_total() == 0 { - return (Ok(0), buf); - } - if !self.prefix_finished { - let slice = unsafe { - &mut *std::ptr::slice_from_raw_parts_mut(buf.write_ptr(), buf.bytes_total()) - }; - match self.prefix.read(slice) { - Ok(0) => { - // prefix finished - self.prefix_finished = true; - } - Ok(n) => { - unsafe { buf.set_init(n) }; - return (Ok(n), buf); - } - Err(e) => { - return (Err(e), buf); - } + async fn read(&mut self, mut buf: T) -> BufResult { + if buf.bytes_total() == 0 { + return (Ok(0), buf); + } + if !self.prefix_finished { + let slice = unsafe { + &mut *std::ptr::slice_from_raw_parts_mut(buf.write_ptr(), buf.bytes_total()) + }; + match self.prefix.read(slice) { + Ok(0) => { + // prefix finished + self.prefix_finished = true; + } + Ok(n) => { + unsafe { buf.set_init(n) }; + return (Ok(n), buf); + } + Err(e) => { + return (Err(e), buf); } } - // prefix eof now, read io directly - self.io.read(buf).await } + // prefix eof now, read io directly + self.io.read(buf).await } - fn readv(&mut self, mut buf: T) -> Self::ReadvFuture<'_, T> { - async move { - let slice = match IoVecWrapperMut::new(buf) { - Ok(slice) => slice, - Err(buf) => return (Ok(0), buf), - }; + async fn readv(&mut self, mut buf: T) -> BufResult { + let slice = match IoVecWrapperMut::new(buf) { + Ok(slice) => slice, + Err(buf) => return (Ok(0), buf), + }; - let (result, slice) = self.read(slice).await; - buf = slice.into_inner(); - if let Ok(n) = result { - unsafe { buf.set_init(n) }; - } - (result, buf) + let (result, slice) = self.read(slice).await; + buf = slice.into_inner(); + if let Ok(n) = result { + unsafe { buf.set_init(n) }; } + (result, buf) } } impl CancelableAsyncReadRent for PrefixedReadIo { - type CancelableReadFuture<'a, T> = impl std::future::Future> + 'a - where - T: IoBufMut + 'a, Self: 'a; - - type CancelableReadvFuture<'a, T> = impl std::future::Future> + 'a - where - T: IoVecBufMut + 'a, Self: 'a; - - fn cancelable_read( + async fn cancelable_read( &mut self, mut buf: T, c: CancelHandle, - ) -> Self::CancelableReadFuture<'_, T> { - async move { - if buf.bytes_total() == 0 { - return (Ok(0), buf); - } - if !self.prefix_finished { - let slice = unsafe { - &mut *std::ptr::slice_from_raw_parts_mut(buf.write_ptr(), buf.bytes_total()) - }; - match self.prefix.read(slice) { - Ok(0) => { - // prefix finished - self.prefix_finished = true; - } - Ok(n) => { - unsafe { buf.set_init(n) }; - return (Ok(n), buf); - } - Err(e) => { - return (Err(e), buf); - } + ) -> crate::BufResult { + if buf.bytes_total() == 0 { + return (Ok(0), buf); + } + if !self.prefix_finished { + let slice = unsafe { + &mut *std::ptr::slice_from_raw_parts_mut(buf.write_ptr(), buf.bytes_total()) + }; + match self.prefix.read(slice) { + Ok(0) => { + // prefix finished + self.prefix_finished = true; + } + Ok(n) => { + unsafe { buf.set_init(n) }; + return (Ok(n), buf); + } + Err(e) => { + return (Err(e), buf); } } - // prefix eof now, read io directly - self.io.cancelable_read(buf, c).await } + // prefix eof now, read io directly + self.io.cancelable_read(buf, c).await } - fn cancelable_readv( + async fn cancelable_readv( &mut self, mut buf: T, c: CancelHandle, - ) -> Self::CancelableReadvFuture<'_, T> { - async move { - let slice = match IoVecWrapperMut::new(buf) { - Ok(slice) => slice, - Err(buf) => return (Ok(0), buf), - }; - - let (result, slice) = self.cancelable_read(slice, c).await; - buf = slice.into_inner(); - if let Ok(n) = result { - unsafe { buf.set_init(n) }; - } - (result, buf) + ) -> crate::BufResult { + let slice = match IoVecWrapperMut::new(buf) { + Ok(slice) => slice, + Err(buf) => return (Ok(0), buf), + }; + + let (result, slice) = self.cancelable_read(slice, c).await; + buf = slice.into_inner(); + if let Ok(n) = result { + unsafe { buf.set_init(n) }; } + (result, buf) } } impl AsyncWriteRent for PrefixedReadIo { - type WriteFuture<'a, T> = I::WriteFuture<'a, T> where - T: IoBuf + 'a, Self: 'a; - - type WritevFuture<'a, T>= I::WritevFuture<'a, T> where - T: IoVecBuf + 'a, Self: 'a; - - type FlushFuture<'a> = I::FlushFuture<'a> where Self: 'a; - - type ShutdownFuture<'a> = I::ShutdownFuture<'a> where Self: 'a; - #[inline] - fn write(&mut self, buf: T) -> Self::WriteFuture<'_, T> { + fn write(&mut self, buf: T) -> impl Future> { self.io.write(buf) } #[inline] - fn writev(&mut self, buf_vec: T) -> Self::WritevFuture<'_, T> { + fn writev(&mut self, buf_vec: T) -> impl Future> { self.io.writev(buf_vec) } #[inline] - fn flush(&mut self) -> Self::FlushFuture<'_> { + fn flush(&mut self) -> impl Future> { self.io.flush() } #[inline] - fn shutdown(&mut self) -> Self::ShutdownFuture<'_> { + fn shutdown(&mut self) -> impl Future> { self.io.shutdown() } } impl CancelableAsyncWriteRent for PrefixedReadIo { - type CancelableWriteFuture<'a, T> = I::CancelableWriteFuture<'a, T> where - T: IoBuf + 'a, Self: 'a; - - type CancelableWritevFuture<'a, T>= I::CancelableWritevFuture<'a, T> where - T: IoVecBuf + 'a, Self: 'a; - - type CancelableFlushFuture<'a> = I::CancelableFlushFuture<'a> where Self: 'a; - - type CancelableShutdownFuture<'a> = I::CancelableShutdownFuture<'a> where Self: 'a; - #[inline] fn cancelable_write( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableWriteFuture<'_, T> { + ) -> impl Future> { self.io.cancelable_write(buf, c) } @@ -231,17 +190,20 @@ impl CancelableAsyncWriteRent for PrefixedReadIo &mut self, buf_vec: T, c: CancelHandle, - ) -> Self::CancelableWritevFuture<'_, T> { + ) -> impl Future> { self.io.cancelable_writev(buf_vec, c) } #[inline] - fn cancelable_flush(&mut self, c: CancelHandle) -> Self::CancelableFlushFuture<'_> { + fn cancelable_flush(&mut self, c: CancelHandle) -> impl Future> { self.io.cancelable_flush(c) } #[inline] - fn cancelable_shutdown(&mut self, c: CancelHandle) -> Self::CancelableShutdownFuture<'_> { + fn cancelable_shutdown( + &mut self, + c: CancelHandle, + ) -> impl Future> { self.io.cancelable_shutdown(c) } } diff --git a/monoio/src/io/util/split.rs b/monoio/src/io/util/split.rs index fcf36394..44f394e6 100644 --- a/monoio/src/io/util/split.rs +++ b/monoio/src/io/util/split.rs @@ -7,7 +7,10 @@ use std::{ }; use super::CancelHandle; -use crate::io::{AsyncReadRent, AsyncWriteRent, CancelableAsyncReadRent, CancelableAsyncWriteRent}; +use crate::{ + io::{AsyncReadRent, AsyncWriteRent, CancelableAsyncReadRent, CancelableAsyncWriteRent}, + BufResult, +}; /// Owned Read Half Part #[derive(Debug)] @@ -60,24 +63,20 @@ impl AsyncReadRent for OwnedReadHalf where Inner: AsyncReadRent, { - type ReadFuture<'a, T> = impl std::future::Future> + 'a - where - Self: 'a, - T: crate::buf::IoBufMut + 'a; - - type ReadvFuture<'a, T> = impl std::future::Future> + 'a - where - Self: 'a, - T: crate::buf::IoVecBufMut + 'a; - #[inline] - fn read(&mut self, buf: T) -> Self::ReadFuture<'_, T> { + fn read( + &mut self, + buf: T, + ) -> impl Future> { let stream = unsafe { &mut *self.0.get() }; stream.read(buf) } #[inline] - fn readv(&mut self, buf: T) -> Self::ReadvFuture<'_, T> { + fn readv( + &mut self, + buf: T, + ) -> impl Future> { let stream = unsafe { &mut *self.0.get() }; stream.readv(buf) } @@ -87,22 +86,12 @@ impl CancelableAsyncReadRent for OwnedReadHalf where Inner: CancelableAsyncReadRent, { - type CancelableReadFuture<'a, T> = impl std::future::Future> + 'a - where - Self: 'a, - T: crate::buf::IoBufMut + 'a; - - type CancelableReadvFuture<'a, T> = impl std::future::Future> + 'a - where - Self: 'a, - T: crate::buf::IoVecBufMut + 'a; - #[inline] fn cancelable_read( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadFuture<'_, T> { + ) -> impl Future> { let stream = unsafe { &mut *self.0.get() }; stream.cancelable_read(buf, c) } @@ -112,7 +101,7 @@ where &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadvFuture<'_, T> { + ) -> impl Future> { let stream = unsafe { &mut *self.0.get() }; stream.cancelable_readv(buf, c) } @@ -122,44 +111,29 @@ impl AsyncWriteRent for OwnedWriteHalf where Inner: AsyncWriteRent, { - type WriteFuture<'a, T> = impl Future> + 'a - where - Self: 'a, - T: crate::buf::IoBuf + 'a; - - type WritevFuture<'a, T> = impl Future> + 'a - where - Self: 'a, - T: crate::buf::IoVecBuf + 'a; - - type FlushFuture<'a> = impl Future> + 'a - where - Self: 'a; - - type ShutdownFuture<'a> = impl Future> + 'a - where - Self: 'a; - #[inline] - fn write(&mut self, buf: T) -> Self::WriteFuture<'_, T> { + fn write(&mut self, buf: T) -> impl Future> { let stream = unsafe { &mut *self.0.get() }; stream.write(buf) } #[inline] - fn writev(&mut self, buf_vec: T) -> Self::WritevFuture<'_, T> { + fn writev( + &mut self, + buf_vec: T, + ) -> impl Future> { let stream = unsafe { &mut *self.0.get() }; stream.writev(buf_vec) } #[inline] - fn flush(&mut self) -> Self::FlushFuture<'_> { + fn flush(&mut self) -> impl Future> { let stream = unsafe { &mut *self.0.get() }; stream.flush() } #[inline] - fn shutdown(&mut self) -> Self::ShutdownFuture<'_> { + fn shutdown(&mut self) -> impl Future> { let stream = unsafe { &mut *self.0.get() }; stream.shutdown() } @@ -169,30 +143,12 @@ impl CancelableAsyncWriteRent for OwnedWriteHalf where Inner: CancelableAsyncWriteRent, { - type CancelableWriteFuture<'a, T> = impl Future> + 'a - where - Self: 'a, - T: crate::buf::IoBuf + 'a; - - type CancelableWritevFuture<'a, T> = impl Future> + 'a - where - Self: 'a, - T: crate::buf::IoVecBuf + 'a; - - type CancelableFlushFuture<'a> = impl Future> + 'a - where - Self: 'a; - - type CancelableShutdownFuture<'a> = impl Future> + 'a - where - Self: 'a; - #[inline] fn cancelable_write( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableWriteFuture<'_, T> { + ) -> impl Future> { let stream = unsafe { &mut *self.0.get() }; stream.cancelable_write(buf, c) } @@ -202,19 +158,22 @@ where &mut self, buf_vec: T, c: CancelHandle, - ) -> Self::CancelableWritevFuture<'_, T> { + ) -> impl Future> { let stream = unsafe { &mut *self.0.get() }; stream.cancelable_writev(buf_vec, c) } #[inline] - fn cancelable_flush(&mut self, c: CancelHandle) -> Self::CancelableFlushFuture<'_> { + fn cancelable_flush(&mut self, c: CancelHandle) -> impl Future> { let stream = unsafe { &mut *self.0.get() }; stream.cancelable_flush(c) } #[inline] - fn cancelable_shutdown(&mut self, c: CancelHandle) -> Self::CancelableShutdownFuture<'_> { + fn cancelable_shutdown( + &mut self, + c: CancelHandle, + ) -> impl Future> { let stream = unsafe { &mut *self.0.get() }; stream.cancelable_shutdown(c) } diff --git a/monoio/src/lib.rs b/monoio/src/lib.rs index 85afd918..099d748d 100644 --- a/monoio/src/lib.rs +++ b/monoio/src/lib.rs @@ -1,8 +1,6 @@ #![doc = include_str!("../../README.md")] #![warn(missing_docs, unreachable_pub)] #![allow(stable_features)] -#![feature(type_alias_impl_trait)] -#![feature(impl_trait_in_assoc_type)] #![feature(io_error_more)] #![feature(lazy_cell)] #![feature(slice_internals)] diff --git a/monoio/src/net/tcp/listener.rs b/monoio/src/net/tcp/listener.rs index 9cc81d18..4695ab39 100644 --- a/monoio/src/net/tcp/listener.rs +++ b/monoio/src/net/tcp/listener.rs @@ -4,7 +4,6 @@ use std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::os::windows::prelude::{AsRawHandle, FromRawSocket, RawHandle}; use std::{ cell::UnsafeCell, - future::Future, io, net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}, }; @@ -261,11 +260,9 @@ impl TcpListener { impl Stream for TcpListener { type Item = io::Result<(TcpStream, SocketAddr)>; - type NextFuture<'a> = impl Future> + 'a; - #[inline] - fn next(&mut self) -> Self::NextFuture<'_> { - async move { Some(self.accept().await) } + async fn next(&mut self) -> Option { + Some(self.accept().await) } } diff --git a/monoio/src/net/tcp/stream.rs b/monoio/src/net/tcp/stream.rs index af3fe262..b43ae199 100644 --- a/monoio/src/net/tcp/stream.rs +++ b/monoio/src/net/tcp/stream.rs @@ -18,6 +18,7 @@ use crate::{ operation_canceled, AsyncReadRent, AsyncWriteRent, CancelHandle, CancelableAsyncReadRent, CancelableAsyncWriteRent, Split, }, + BufResult, }; /// Custom tcp connect options @@ -298,34 +299,27 @@ impl std::fmt::Debug for TcpStream { } impl AsyncWriteRent for TcpStream { - type WriteFuture<'a, B> = impl Future> where - B: IoBuf + 'a; - type WritevFuture<'a, B> = impl Future> where - B: IoVecBuf + 'a; - type FlushFuture<'a> = impl Future>; - type ShutdownFuture<'a> = impl Future>; - #[inline] - fn write(&mut self, buf: T) -> Self::WriteFuture<'_, T> { + fn write(&mut self, buf: T) -> impl Future> { // Submit the write operation let op = Op::send(self.fd.clone(), buf).unwrap(); op.write() } #[inline] - fn writev(&mut self, buf_vec: T) -> Self::WritevFuture<'_, T> { + fn writev(&mut self, buf_vec: T) -> impl Future> { let op = Op::writev(&self.fd, buf_vec).unwrap(); op.write() } #[inline] - fn flush(&mut self) -> Self::FlushFuture<'_> { + async fn flush(&mut self) -> std::io::Result<()> { // Tcp stream does not need flush. - async move { Ok(()) } + Ok(()) } #[cfg(unix)] - fn shutdown(&mut self) -> Self::ShutdownFuture<'_> { + fn shutdown(&mut self) -> impl Future> { // We could use shutdown op here, which requires kernel 5.11+. // However, for simplicity, we just close the socket using direct syscall. let fd = self.as_raw_fd(); @@ -337,63 +331,54 @@ impl AsyncWriteRent for TcpStream { } #[cfg(windows)] - fn shutdown(&mut self) -> Self::ShutdownFuture<'_> { + fn shutdown(&mut self) -> impl Future> { async { unimplemented!() } } } impl CancelableAsyncWriteRent for TcpStream { - type CancelableWriteFuture<'a, B> = impl Future> where - B: IoBuf + 'a; - type CancelableWritevFuture<'a, B> = impl Future> where - B: IoVecBuf + 'a; - type CancelableFlushFuture<'a> = impl Future>; - type CancelableShutdownFuture<'a> = impl Future>; - #[inline] - fn cancelable_write( + async fn cancelable_write( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableWriteFuture<'_, T> { + ) -> crate::BufResult { let fd = self.fd.clone(); - async move { - if c.canceled() { - return (Err(operation_canceled()), buf); - } - let op = Op::send(fd, buf).unwrap(); - let _guard = c.associate_op(op.op_canceller()); - op.write().await + if c.canceled() { + return (Err(operation_canceled()), buf); } + + let op = Op::send(fd, buf).unwrap(); + let _guard = c.associate_op(op.op_canceller()); + op.write().await } #[inline] - fn cancelable_writev( + async fn cancelable_writev( &mut self, buf_vec: T, c: CancelHandle, - ) -> Self::CancelableWritevFuture<'_, T> { + ) -> crate::BufResult { let fd = self.fd.clone(); - async move { - if c.canceled() { - return (Err(operation_canceled()), buf_vec); - } - let op = Op::writev(&fd, buf_vec).unwrap(); - let _guard = c.associate_op(op.op_canceller()); - op.write().await + if c.canceled() { + return (Err(operation_canceled()), buf_vec); } + + let op = Op::writev(&fd, buf_vec).unwrap(); + let _guard = c.associate_op(op.op_canceller()); + op.write().await } #[inline] - fn cancelable_flush(&mut self, _c: CancelHandle) -> Self::CancelableFlushFuture<'_> { + async fn cancelable_flush(&mut self, _c: CancelHandle) -> io::Result<()> { // Tcp stream does not need flush. - async move { Ok(()) } + Ok(()) } #[cfg(unix)] - fn cancelable_shutdown(&mut self, _c: CancelHandle) -> Self::CancelableShutdownFuture<'_> { + fn cancelable_shutdown(&mut self, _c: CancelHandle) -> impl Future> { // We could use shutdown op here, which requires kernel 5.11+. // However, for simplicity, we just close the socket using direct syscall. let fd = self.as_raw_fd(); @@ -405,26 +390,21 @@ impl CancelableAsyncWriteRent for TcpStream { } #[cfg(windows)] - fn cancelable_shutdown(&mut self, _c: CancelHandle) -> Self::CancelableShutdownFuture<'_> { + fn cancelable_shutdown(&mut self, _c: CancelHandle) -> impl Future> { async { unimplemented!() } } } impl AsyncReadRent for TcpStream { - type ReadFuture<'a, B> = impl std::future::Future> where - B: IoBufMut + 'a; - type ReadvFuture<'a, B> = impl std::future::Future> where - B: IoVecBufMut + 'a; - #[inline] - fn read(&mut self, buf: T) -> Self::ReadFuture<'_, T> { + fn read(&mut self, buf: T) -> impl Future> { // Submit the read operation let op = Op::recv(self.fd.clone(), buf).unwrap(); op.read() } #[inline] - fn readv(&mut self, buf: T) -> Self::ReadvFuture<'_, T> { + fn readv(&mut self, buf: T) -> impl Future> { // Submit the read operation let op = Op::readv(self.fd.clone(), buf).unwrap(); op.read() @@ -432,45 +412,38 @@ impl AsyncReadRent for TcpStream { } impl CancelableAsyncReadRent for TcpStream { - type CancelableReadFuture<'a, B> = impl std::future::Future> where - B: IoBufMut + 'a; - type CancelableReadvFuture<'a, B> = impl std::future::Future> where - B: IoVecBufMut + 'a; - #[inline] - fn cancelable_read( + async fn cancelable_read( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadFuture<'_, T> { + ) -> crate::BufResult { let fd = self.fd.clone(); - async move { - if c.canceled() { - return (Err(operation_canceled()), buf); - } - let op = Op::recv(fd, buf).unwrap(); - let _guard = c.associate_op(op.op_canceller()); - op.read().await + if c.canceled() { + return (Err(operation_canceled()), buf); } + + let op = Op::recv(fd, buf).unwrap(); + let _guard = c.associate_op(op.op_canceller()); + op.read().await } #[inline] - fn cancelable_readv( + async fn cancelable_readv( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadvFuture<'_, T> { + ) -> crate::BufResult { let fd = self.fd.clone(); - async move { - if c.canceled() { - return (Err(operation_canceled()), buf); - } - let op = Op::readv(fd, buf).unwrap(); - let _guard = c.associate_op(op.op_canceller()); - op.read().await + if c.canceled() { + return (Err(operation_canceled()), buf); } + + let op = Op::readv(fd, buf).unwrap(); + let _guard = c.associate_op(op.op_canceller()); + op.read().await } } diff --git a/monoio/src/net/unix/listener.rs b/monoio/src/net/unix/listener.rs index 75b22e9a..3f4b47d1 100644 --- a/monoio/src/net/unix/listener.rs +++ b/monoio/src/net/unix/listener.rs @@ -1,5 +1,4 @@ use std::{ - future::Future, io, mem::{ManuallyDrop, MaybeUninit}, os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, RawFd}, @@ -149,11 +148,9 @@ impl UnixListener { impl Stream for UnixListener { type Item = io::Result<(UnixStream, SocketAddr)>; - type NextFuture<'a> = impl Future> + 'a; - #[inline] - fn next(&mut self) -> Self::NextFuture<'_> { - async move { Some(self.accept().await) } + async fn next(&mut self) -> Option { + Some(self.accept().await) } } diff --git a/monoio/src/net/unix/seq_packet/listener.rs b/monoio/src/net/unix/seq_packet/listener.rs index 28e34121..72feabcd 100644 --- a/monoio/src/net/unix/seq_packet/listener.rs +++ b/monoio/src/net/unix/seq_packet/listener.rs @@ -1,5 +1,4 @@ use std::{ - future::Future, io, os::fd::{AsRawFd, RawFd}, path::Path, @@ -83,10 +82,8 @@ impl std::fmt::Debug for UnixSeqpacketListener { impl Stream for UnixSeqpacketListener { type Item = io::Result<(UnixSeqpacket, SocketAddr)>; - type NextFuture<'a> = impl Future> + 'a; - #[inline] - fn next(&mut self) -> Self::NextFuture<'_> { - async move { Some(self.accept().await) } + async fn next(&mut self) -> Option { + Some(self.accept().await) } } diff --git a/monoio/src/net/unix/stream.rs b/monoio/src/net/unix/stream.rs index 0135937d..a4ac01a5 100644 --- a/monoio/src/net/unix/stream.rs +++ b/monoio/src/net/unix/stream.rs @@ -18,6 +18,7 @@ use crate::{ CancelableAsyncWriteRent, Split, }, net::new_socket, + BufResult, }; /// UnixStream @@ -172,33 +173,26 @@ impl std::fmt::Debug for UnixStream { } impl AsyncWriteRent for UnixStream { - type WriteFuture<'a, B> = impl Future> where - B: IoBuf + 'a; - type WritevFuture<'a, B> = impl Future> where - B: IoVecBuf + 'a; - type FlushFuture<'a> = impl Future>; - type ShutdownFuture<'a> = impl Future>; - #[inline] - fn write(&mut self, buf: T) -> Self::WriteFuture<'_, T> { + fn write(&mut self, buf: T) -> impl Future> { // Submit the write operation let op = Op::send(self.fd.clone(), buf).unwrap(); op.write() } #[inline] - fn writev(&mut self, buf_vec: T) -> Self::WritevFuture<'_, T> { + fn writev(&mut self, buf_vec: T) -> impl Future> { let op = Op::writev(&self.fd, buf_vec).unwrap(); op.write() } #[inline] - fn flush(&mut self) -> Self::FlushFuture<'_> { + async fn flush(&mut self) -> std::io::Result<()> { // Unix stream does not need flush. - async move { Ok(()) } + Ok(()) } - fn shutdown(&mut self) -> Self::ShutdownFuture<'_> { + fn shutdown(&mut self) -> impl Future> { // We could use shutdown op here, which requires kernel 5.11+. // However, for simplicity, we just close the socket using direct syscall. let fd = self.as_raw_fd(); @@ -212,83 +206,67 @@ impl AsyncWriteRent for UnixStream { } impl CancelableAsyncWriteRent for UnixStream { - type CancelableWriteFuture<'a, B> = impl Future> where - B: IoBuf + 'a; - type CancelableWritevFuture<'a, B> = impl Future> where - B: IoVecBuf + 'a; - type CancelableFlushFuture<'a> = impl Future>; - type CancelableShutdownFuture<'a> = impl Future>; - #[inline] - fn cancelable_write( + async fn cancelable_write( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableWriteFuture<'_, T> { + ) -> crate::BufResult { let fd = self.fd.clone(); - async move { - if c.canceled() { - return (Err(operation_canceled()), buf); - } - let op = Op::send(fd, buf).unwrap(); - let _guard = c.associate_op(op.op_canceller()); - op.write().await + if c.canceled() { + return (Err(operation_canceled()), buf); } + + let op = Op::send(fd, buf).unwrap(); + let _guard = c.associate_op(op.op_canceller()); + op.write().await } #[inline] - fn cancelable_writev( + async fn cancelable_writev( &mut self, buf_vec: T, c: CancelHandle, - ) -> Self::CancelableWritevFuture<'_, T> { + ) -> crate::BufResult { let fd = self.fd.clone(); - async move { - if c.canceled() { - return (Err(operation_canceled()), buf_vec); - } - let op = Op::writev(&fd, buf_vec).unwrap(); - let _guard = c.associate_op(op.op_canceller()); - op.write().await + if c.canceled() { + return (Err(operation_canceled()), buf_vec); } + + let op = Op::writev(&fd, buf_vec).unwrap(); + let _guard = c.associate_op(op.op_canceller()); + op.write().await } #[inline] - fn cancelable_flush(&mut self, _c: CancelHandle) -> Self::CancelableFlushFuture<'_> { + async fn cancelable_flush(&mut self, _c: CancelHandle) -> io::Result<()> { // Unix stream does not need flush. - async move { Ok(()) } + Ok(()) } - fn cancelable_shutdown(&mut self, _c: CancelHandle) -> Self::CancelableShutdownFuture<'_> { + async fn cancelable_shutdown(&mut self, _c: CancelHandle) -> io::Result<()> { // We could use shutdown op here, which requires kernel 5.11+. // However, for simplicity, we just close the socket using direct syscall. let fd = self.as_raw_fd(); - async move { - match unsafe { libc::shutdown(fd, libc::SHUT_WR) } { - -1 => Err(io::Error::last_os_error()), - _ => Ok(()), - } + match unsafe { libc::shutdown(fd, libc::SHUT_WR) } { + -1 => Err(io::Error::last_os_error()), + _ => Ok(()), } } } impl AsyncReadRent for UnixStream { - type ReadFuture<'a, B> = impl std::future::Future> where - B: IoBufMut + 'a; - type ReadvFuture<'a, B> = impl std::future::Future> where - B: IoVecBufMut + 'a; - #[inline] - fn read(&mut self, buf: T) -> Self::ReadFuture<'_, T> { + fn read(&mut self, buf: T) -> impl Future> { // Submit the read operation let op = Op::recv(self.fd.clone(), buf).unwrap(); op.read() } #[inline] - fn readv(&mut self, buf: T) -> Self::ReadvFuture<'_, T> { + fn readv(&mut self, buf: T) -> impl Future> { // Submit the read operation let op = Op::readv(self.fd.clone(), buf).unwrap(); op.read() @@ -296,45 +274,38 @@ impl AsyncReadRent for UnixStream { } impl CancelableAsyncReadRent for UnixStream { - type CancelableReadFuture<'a, B> = impl std::future::Future> where - B: IoBufMut + 'a; - type CancelableReadvFuture<'a, B> = impl std::future::Future> where - B: IoVecBufMut + 'a; - #[inline] - fn cancelable_read( + async fn cancelable_read( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadFuture<'_, T> { + ) -> crate::BufResult { let fd = self.fd.clone(); - async move { - if c.canceled() { - return (Err(operation_canceled()), buf); - } - let op = Op::recv(fd, buf).unwrap(); - let _guard = c.associate_op(op.op_canceller()); - op.read().await + if c.canceled() { + return (Err(operation_canceled()), buf); } + + let op = Op::recv(fd, buf).unwrap(); + let _guard = c.associate_op(op.op_canceller()); + op.read().await } #[inline] - fn cancelable_readv( + async fn cancelable_readv( &mut self, buf: T, c: CancelHandle, - ) -> Self::CancelableReadvFuture<'_, T> { + ) -> crate::BufResult { let fd = self.fd.clone(); - async move { - if c.canceled() { - return (Err(operation_canceled()), buf); - } - let op = Op::readv(fd, buf).unwrap(); - let _guard = c.associate_op(op.op_canceller()); - op.read().await + if c.canceled() { + return (Err(operation_canceled()), buf); } + + let op = Op::readv(fd, buf).unwrap(); + let _guard = c.associate_op(op.op_canceller()); + op.read().await } } diff --git a/monoio/src/utils/rand.rs b/monoio/src/utils/rand.rs index 7af9e1d4..4ef2d67e 100644 --- a/monoio/src/utils/rand.rs +++ b/monoio/src/utils/rand.rs @@ -64,7 +64,7 @@ pub fn thread_rng_n(n: u32) -> u32 { use std::{ collections::hash_map::RandomState, - hash::{BuildHasher, Hash, Hasher}, + hash::BuildHasher, sync::atomic::{AtomicU32, Ordering::Relaxed}, }; @@ -72,14 +72,7 @@ static COUNTER: AtomicU32 = AtomicU32::new(1); fn seed() -> u64 { let rand_state = RandomState::new(); - - let mut hasher = rand_state.build_hasher(); - - // Hash some unique-ish data to generate some new state - COUNTER.fetch_add(1, Relaxed).hash(&mut hasher); - - // Get the seed - hasher.finish() + rand_state.hash_one(COUNTER.fetch_add(1, Relaxed)) } #[cfg(test)]