admin管理员组

文章数量:1406913

I have a type which semantically behaves like a stream but doesn't actually implement the futures_util::Stream trait. Instead, it has a method get_next which repeatedly returns a Future yielding the next item.

I'm trying to implement an adapter around this in order to use it as a stream but I'm running into some trouble. I created the following tow example:

use futures_util::Stream;
use std::{future::Future, pin::Pin, task::Poll};

struct FutureSource {/* omitted */}

impl FutureSource {
    pub fn get_next(&mut self) -> FutureString {
        FutureString {
            source: self,
            important_state: todo!(),
        }
    }
}

struct FutureString<'a> {
    source: &'a mut FutureSource,
    important_state: String,
}

impl<'a> Future for FutureString<'a> {
    type Output = String;

    fn poll(self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        unimplemented!()
    }
}

struct StreamAdapter<'a> {
    future_source: &'a mut FutureSource,
    future: Option<FutureString<'a>>,
}

impl<'a> Stream for StreamAdapter<'a> {
    type Item = <FutureString<'a> as Future>::Output;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        if self.future.is_none() {
            self.future.insert(self.future_source.get_next());
        }
        let fut = self.future.as_mut().unwrap();

        match Pin::new(fut).poll(cx) {
            Poll::Ready(value) => {
                self.future = None;
                Poll::Ready(Some(value))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

This does not compile and gives the following errors:

error[E0499]: cannot borrow `self` as mutable more than once at a time
  --> src/main.rs:41:32
   |
41 |             self.future.insert(self.future_source.get_next());
   |             ----        ------ ^^^^ second mutable borrow occurs here
   |             |           |
   |             |           first borrow later used by call
   |             first mutable borrow occurs here

error: lifetime may not live long enough
  --> src/main.rs:41:32
   |
33 | impl<'a> Stream for StreamAdapter<'a> {
   |      -- lifetime `'a` defined here
...
37 |         mut self: Pin<&mut Self>,
   |                       - let's call the lifetime of this reference `'1`
...
41 |             self.future.insert(self.future_source.get_next());
   |                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ argument requires that `'1` must outlive `'a`

error[E0499]: cannot borrow `self` as mutable more than once at a time
  --> src/main.rs:43:19
   |
33 | impl<'a> Stream for StreamAdapter<'a> {
   |      -- lifetime `'a` defined here
...
41 |             self.future.insert(self.future_source.get_next());
   |                                -----------------------------
   |                                |
   |                                first mutable borrow occurs here
   |                                argument requires that `self` is borrowed for `'a`
42 |         }
43 |         let fut = self.future.as_mut().unwrap();
   |                   ^^^^ second mutable borrow occurs here

error[E0499]: cannot borrow `self` as mutable more than once at a time
  --> src/main.rs:47:17
   |
33 | impl<'a> Stream for StreamAdapter<'a> {
   |      -- lifetime `'a` defined here
...
41 |             self.future.insert(self.future_source.get_next());
   |                                -----------------------------
   |                                |
   |                                first mutable borrow occurs here
   |                                argument requires that `self` is borrowed for `'a`
...
47 |                 self.future = None;
   |                 ^^^^ second mutable borrow occurs here

My understanding of the errors:

  • cannot borrow `self` as mutable more than once at a time
    The mutable borrow of self in self.future_source.get_next() has lifetime 'a which necessarily means the lifetime of the self borrow must be at least 'a. This lifetime is beyond the scope of the entire function call which means we cannot borrow self again at all.
  • lifetime may not live long enough:
    This one is slightly confusing to me, but here we go: The future borrows self for 'a which means that the lifetime of the reference to self must be at least 'a. While it's certainly possible, the compiler can't enforce that. I could specify that the reference to self has lifetime 'a but that would not align with the trait fn.

Is there a way to do this in safe Rust? Or do I need to go unsafe? Or is what I'm trying to do simply unsound regardless of how I go about it?

I have a type which semantically behaves like a stream but doesn't actually implement the futures_util::Stream trait. Instead, it has a method get_next which repeatedly returns a Future yielding the next item.

I'm trying to implement an adapter around this in order to use it as a stream but I'm running into some trouble. I created the following tow example:

use futures_util::Stream;
use std::{future::Future, pin::Pin, task::Poll};

struct FutureSource {/* omitted */}

impl FutureSource {
    pub fn get_next(&mut self) -> FutureString {
        FutureString {
            source: self,
            important_state: todo!(),
        }
    }
}

struct FutureString<'a> {
    source: &'a mut FutureSource,
    important_state: String,
}

impl<'a> Future for FutureString<'a> {
    type Output = String;

    fn poll(self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        unimplemented!()
    }
}

struct StreamAdapter<'a> {
    future_source: &'a mut FutureSource,
    future: Option<FutureString<'a>>,
}

impl<'a> Stream for StreamAdapter<'a> {
    type Item = <FutureString<'a> as Future>::Output;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        if self.future.is_none() {
            self.future.insert(self.future_source.get_next());
        }
        let fut = self.future.as_mut().unwrap();

        match Pin::new(fut).poll(cx) {
            Poll::Ready(value) => {
                self.future = None;
                Poll::Ready(Some(value))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

This does not compile and gives the following errors:

error[E0499]: cannot borrow `self` as mutable more than once at a time
  --> src/main.rs:41:32
   |
41 |             self.future.insert(self.future_source.get_next());
   |             ----        ------ ^^^^ second mutable borrow occurs here
   |             |           |
   |             |           first borrow later used by call
   |             first mutable borrow occurs here

error: lifetime may not live long enough
  --> src/main.rs:41:32
   |
33 | impl<'a> Stream for StreamAdapter<'a> {
   |      -- lifetime `'a` defined here
...
37 |         mut self: Pin<&mut Self>,
   |                       - let's call the lifetime of this reference `'1`
...
41 |             self.future.insert(self.future_source.get_next());
   |                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ argument requires that `'1` must outlive `'a`

error[E0499]: cannot borrow `self` as mutable more than once at a time
  --> src/main.rs:43:19
   |
33 | impl<'a> Stream for StreamAdapter<'a> {
   |      -- lifetime `'a` defined here
...
41 |             self.future.insert(self.future_source.get_next());
   |                                -----------------------------
   |                                |
   |                                first mutable borrow occurs here
   |                                argument requires that `self` is borrowed for `'a`
42 |         }
43 |         let fut = self.future.as_mut().unwrap();
   |                   ^^^^ second mutable borrow occurs here

error[E0499]: cannot borrow `self` as mutable more than once at a time
  --> src/main.rs:47:17
   |
33 | impl<'a> Stream for StreamAdapter<'a> {
   |      -- lifetime `'a` defined here
...
41 |             self.future.insert(self.future_source.get_next());
   |                                -----------------------------
   |                                |
   |                                first mutable borrow occurs here
   |                                argument requires that `self` is borrowed for `'a`
...
47 |                 self.future = None;
   |                 ^^^^ second mutable borrow occurs here

My understanding of the errors:

  • cannot borrow `self` as mutable more than once at a time
    The mutable borrow of self in self.future_source.get_next() has lifetime 'a which necessarily means the lifetime of the self borrow must be at least 'a. This lifetime is beyond the scope of the entire function call which means we cannot borrow self again at all.
  • lifetime may not live long enough:
    This one is slightly confusing to me, but here we go: The future borrows self for 'a which means that the lifetime of the reference to self must be at least 'a. While it's certainly possible, the compiler can't enforce that. I could specify that the reference to self has lifetime 'a but that would not align with the trait fn.

Is there a way to do this in safe Rust? Or do I need to go unsafe? Or is what I'm trying to do simply unsound regardless of how I go about it?

Share Improve this question edited Mar 6 at 13:19 Emil Sahlén asked Mar 6 at 8:22 Emil SahlénEmil Sahlén 2,1622 gold badges21 silver badges31 bronze badges 0
Add a comment  | 

1 Answer 1

Reset to default 2

What you a trying to do is known as problematic in Rust (a self referential struct), see Why can't I store a value and a reference to that value in the same struct?.

However, for futures there is a workaround: the trick is to wrap the value in a future that consumes the value and on completion, returns both the result and the value back. For example:

async fn future(mut source: FutureSource) -> (String, FutureSource) {
    let result = source.get_next().await;
    (result, source)
}

struct StreamAdapter {
    future: Pin<Box<dyn Future<Output = (String, FutureSource)>>>,
}

impl StreamAdapter {
    fn new(source: FutureSource) -> Self {
        Self { future: Box::pin(future(source)) }
    }
}

impl Stream for StreamAdapter {
    type Item = String;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let (result, source) = ready!(self.future.as_mut().poll(cx));
        self.future = Box::pin(future(source));
        Poll::Ready(Some(result))
    }
}

If you are concerned about the performance of heap-allocating the future every time, and until Type Alias Impl Trait (type F = impl Future) is ready, you can use something like tokio_util::sync::ReusableBoxFuture, that allows you to allocate only once.

本文标签: asynchronousStoring Future next to source of said FutureStack Overflow