admin管理员组文章数量:1406913
I have a type which semantically behaves like a stream but doesn't actually implement the futures_util::Stream
trait. Instead, it has a method get_next
which repeatedly returns a Future
yielding the next item.
I'm trying to implement an adapter around this in order to use it as a stream but I'm running into some trouble. I created the following tow example:
use futures_util::Stream;
use std::{future::Future, pin::Pin, task::Poll};
struct FutureSource {/* omitted */}
impl FutureSource {
pub fn get_next(&mut self) -> FutureString {
FutureString {
source: self,
important_state: todo!(),
}
}
}
struct FutureString<'a> {
source: &'a mut FutureSource,
important_state: String,
}
impl<'a> Future for FutureString<'a> {
type Output = String;
fn poll(self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
unimplemented!()
}
}
struct StreamAdapter<'a> {
future_source: &'a mut FutureSource,
future: Option<FutureString<'a>>,
}
impl<'a> Stream for StreamAdapter<'a> {
type Item = <FutureString<'a> as Future>::Output;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.future.is_none() {
self.future.insert(self.future_source.get_next());
}
let fut = self.future.as_mut().unwrap();
match Pin::new(fut).poll(cx) {
Poll::Ready(value) => {
self.future = None;
Poll::Ready(Some(value))
}
Poll::Pending => Poll::Pending,
}
}
}
This does not compile and gives the following errors:
error[E0499]: cannot borrow `self` as mutable more than once at a time
--> src/main.rs:41:32
|
41 | self.future.insert(self.future_source.get_next());
| ---- ------ ^^^^ second mutable borrow occurs here
| | |
| | first borrow later used by call
| first mutable borrow occurs here
error: lifetime may not live long enough
--> src/main.rs:41:32
|
33 | impl<'a> Stream for StreamAdapter<'a> {
| -- lifetime `'a` defined here
...
37 | mut self: Pin<&mut Self>,
| - let's call the lifetime of this reference `'1`
...
41 | self.future.insert(self.future_source.get_next());
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ argument requires that `'1` must outlive `'a`
error[E0499]: cannot borrow `self` as mutable more than once at a time
--> src/main.rs:43:19
|
33 | impl<'a> Stream for StreamAdapter<'a> {
| -- lifetime `'a` defined here
...
41 | self.future.insert(self.future_source.get_next());
| -----------------------------
| |
| first mutable borrow occurs here
| argument requires that `self` is borrowed for `'a`
42 | }
43 | let fut = self.future.as_mut().unwrap();
| ^^^^ second mutable borrow occurs here
error[E0499]: cannot borrow `self` as mutable more than once at a time
--> src/main.rs:47:17
|
33 | impl<'a> Stream for StreamAdapter<'a> {
| -- lifetime `'a` defined here
...
41 | self.future.insert(self.future_source.get_next());
| -----------------------------
| |
| first mutable borrow occurs here
| argument requires that `self` is borrowed for `'a`
...
47 | self.future = None;
| ^^^^ second mutable borrow occurs here
My understanding of the errors:
cannot borrow `self` as mutable more than once at a time
The mutable borrow ofself
inself.future_source.get_next()
has lifetime'a
which necessarily means the lifetime of theself
borrow must be at least'a
. This lifetime is beyond the scope of the entire function call which means we cannot borrowself
again at all.lifetime may not live long enough
:
This one is slightly confusing to me, but here we go: The future borrowsself
for'a
which means that the lifetime of the reference toself
must be at least'a
. While it's certainly possible, the compiler can't enforce that. I could specify that the reference toself
has lifetime'a
but that would not align with the trait fn.
Is there a way to do this in safe Rust? Or do I need to go unsafe? Or is what I'm trying to do simply unsound regardless of how I go about it?
I have a type which semantically behaves like a stream but doesn't actually implement the futures_util::Stream
trait. Instead, it has a method get_next
which repeatedly returns a Future
yielding the next item.
I'm trying to implement an adapter around this in order to use it as a stream but I'm running into some trouble. I created the following tow example:
use futures_util::Stream;
use std::{future::Future, pin::Pin, task::Poll};
struct FutureSource {/* omitted */}
impl FutureSource {
pub fn get_next(&mut self) -> FutureString {
FutureString {
source: self,
important_state: todo!(),
}
}
}
struct FutureString<'a> {
source: &'a mut FutureSource,
important_state: String,
}
impl<'a> Future for FutureString<'a> {
type Output = String;
fn poll(self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
unimplemented!()
}
}
struct StreamAdapter<'a> {
future_source: &'a mut FutureSource,
future: Option<FutureString<'a>>,
}
impl<'a> Stream for StreamAdapter<'a> {
type Item = <FutureString<'a> as Future>::Output;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.future.is_none() {
self.future.insert(self.future_source.get_next());
}
let fut = self.future.as_mut().unwrap();
match Pin::new(fut).poll(cx) {
Poll::Ready(value) => {
self.future = None;
Poll::Ready(Some(value))
}
Poll::Pending => Poll::Pending,
}
}
}
This does not compile and gives the following errors:
error[E0499]: cannot borrow `self` as mutable more than once at a time
--> src/main.rs:41:32
|
41 | self.future.insert(self.future_source.get_next());
| ---- ------ ^^^^ second mutable borrow occurs here
| | |
| | first borrow later used by call
| first mutable borrow occurs here
error: lifetime may not live long enough
--> src/main.rs:41:32
|
33 | impl<'a> Stream for StreamAdapter<'a> {
| -- lifetime `'a` defined here
...
37 | mut self: Pin<&mut Self>,
| - let's call the lifetime of this reference `'1`
...
41 | self.future.insert(self.future_source.get_next());
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ argument requires that `'1` must outlive `'a`
error[E0499]: cannot borrow `self` as mutable more than once at a time
--> src/main.rs:43:19
|
33 | impl<'a> Stream for StreamAdapter<'a> {
| -- lifetime `'a` defined here
...
41 | self.future.insert(self.future_source.get_next());
| -----------------------------
| |
| first mutable borrow occurs here
| argument requires that `self` is borrowed for `'a`
42 | }
43 | let fut = self.future.as_mut().unwrap();
| ^^^^ second mutable borrow occurs here
error[E0499]: cannot borrow `self` as mutable more than once at a time
--> src/main.rs:47:17
|
33 | impl<'a> Stream for StreamAdapter<'a> {
| -- lifetime `'a` defined here
...
41 | self.future.insert(self.future_source.get_next());
| -----------------------------
| |
| first mutable borrow occurs here
| argument requires that `self` is borrowed for `'a`
...
47 | self.future = None;
| ^^^^ second mutable borrow occurs here
My understanding of the errors:
cannot borrow `self` as mutable more than once at a time
The mutable borrow ofself
inself.future_source.get_next()
has lifetime'a
which necessarily means the lifetime of theself
borrow must be at least'a
. This lifetime is beyond the scope of the entire function call which means we cannot borrowself
again at all.lifetime may not live long enough
:
This one is slightly confusing to me, but here we go: The future borrowsself
for'a
which means that the lifetime of the reference toself
must be at least'a
. While it's certainly possible, the compiler can't enforce that. I could specify that the reference toself
has lifetime'a
but that would not align with the trait fn.
Is there a way to do this in safe Rust? Or do I need to go unsafe? Or is what I'm trying to do simply unsound regardless of how I go about it?
Share Improve this question edited Mar 6 at 13:19 Emil Sahlén asked Mar 6 at 8:22 Emil SahlénEmil Sahlén 2,1622 gold badges21 silver badges31 bronze badges 01 Answer
Reset to default 2What you a trying to do is known as problematic in Rust (a self referential struct), see Why can't I store a value and a reference to that value in the same struct?.
However, for futures there is a workaround: the trick is to wrap the value in a future that consumes the value and on completion, returns both the result and the value back. For example:
async fn future(mut source: FutureSource) -> (String, FutureSource) {
let result = source.get_next().await;
(result, source)
}
struct StreamAdapter {
future: Pin<Box<dyn Future<Output = (String, FutureSource)>>>,
}
impl StreamAdapter {
fn new(source: FutureSource) -> Self {
Self { future: Box::pin(future(source)) }
}
}
impl Stream for StreamAdapter {
type Item = String;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let (result, source) = ready!(self.future.as_mut().poll(cx));
self.future = Box::pin(future(source));
Poll::Ready(Some(result))
}
}
If you are concerned about the performance of heap-allocating the future every time, and until Type Alias Impl Trait (type F = impl Future
) is ready, you can use something like tokio_util::sync::ReusableBoxFuture
, that allows you to allocate only once.
本文标签: asynchronousStoring Future next to source of said FutureStack Overflow
版权声明:本文标题:asynchronous - Storing Future next to source of said Future - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744989957a2636322.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论