admin管理员组

文章数量:1405580

Problem

I'm trying to ensure that coroutine calls in my Serial class are fully executed in order without interleaving. Despite using a strand, operations from different calls are still interleaving.

Here's my code ():

#include <fmt/core.h>
#include <boost/asio.hpp>

namespace asio = boost::asio;
using namespace std::chrono_literals;

class Serial {
   public:
    Serial(asio::any_io_executor ex)
        : ex_(ex), timer_1_{ex}, timer_2_{ex} {};
    asio::awaitable<void> Read(int i) {
        co_await asio::dispatch(bind_executor(ex_, asio::deferred));
        fmt::println("in read {}", i);
        co_await Send(i);
        timer_2_.expires_after(50ms);
        co_await timer_2_.async_wait(bind_executor(ex_, asio::deferred));
        int j = co_await Receive();
        fmt::println("out read {}", j);
        co_return;
    }

   private:
    asio::awaitable<void> Send(int i) {
        co_await asio::dispatch(bind_executor(ex_, asio::deferred));
        fmt::println("in send {}", i);
        i_ = i;
        timer_1_.expires_after(50ms);
        co_await timer_1_.async_wait(bind_executor(ex_, asio::deferred));
        fmt::println("out send {}", i);
        co_return;
    }

    asio::awaitable<int> Receive() {
        co_await asio::dispatch(bind_executor(ex_, asio::deferred));
        fmt::println("in receive {}", i_);
        timer_1_.expires_after(50ms);
        co_await timer_1_.async_wait(bind_executor(ex_, asio::deferred));
        fmt::println("out receive {}", i_);
        co_return i_;
    }

    asio::any_io_executor ex_;
    asio::steady_timer timer_1_;
    asio::steady_timer timer_2_;
    int i_ = 0;
};

asio::awaitable<void> TestRead(Serial& s, int i) {
    co_await s.Read(i);
    co_return;
}

int main() {
    asio::io_context io;

    auto strand = asio::make_strand(io);

    Serial s{strand};
    co_spawn(io, TestRead(s, 0), asio::detached);
    co_spawn(io, TestRead(s, 1), asio::detached);

    io.run();
}

Current Output

Program returned: 0
in read 0
in send 0
in read 1
in send 1
out send 1
in receive 1
out receive 1
out read 1

Expected Output

I want operations to fully complete in sequence, with no interleaving:

in read 0
in send 0
out send 0
in receive 0
out receive 0
out read 0
in read 1
in send 1
out send 1
in receive 1
out receive 1
out read 1

What I've Tried

  1. I'm already using a strand to attempt to serialize execution
  2. I've tried using post instead of dispatch
  3. I've tried explicitly binding the executor to each operation

Question

How can I modify my code to ensure that one full call to Read() (including its internal calls to Send() and Receive()) completes before the next call to Read() begins processing?

I want to keep using coroutines if possible, rather than rewriting it to use callbacks or a completely different architecture. I suppose a mutex would fix this, but is there an idiomatic Asio approach to this?

Problem

I'm trying to ensure that coroutine calls in my Serial class are fully executed in order without interleaving. Despite using a strand, operations from different calls are still interleaving.

Here's my code (https://godbolt./z/3cW6qzW4r):

#include <fmt/core.h>
#include <boost/asio.hpp>

namespace asio = boost::asio;
using namespace std::chrono_literals;

class Serial {
   public:
    Serial(asio::any_io_executor ex)
        : ex_(ex), timer_1_{ex}, timer_2_{ex} {};
    asio::awaitable<void> Read(int i) {
        co_await asio::dispatch(bind_executor(ex_, asio::deferred));
        fmt::println("in read {}", i);
        co_await Send(i);
        timer_2_.expires_after(50ms);
        co_await timer_2_.async_wait(bind_executor(ex_, asio::deferred));
        int j = co_await Receive();
        fmt::println("out read {}", j);
        co_return;
    }

   private:
    asio::awaitable<void> Send(int i) {
        co_await asio::dispatch(bind_executor(ex_, asio::deferred));
        fmt::println("in send {}", i);
        i_ = i;
        timer_1_.expires_after(50ms);
        co_await timer_1_.async_wait(bind_executor(ex_, asio::deferred));
        fmt::println("out send {}", i);
        co_return;
    }

    asio::awaitable<int> Receive() {
        co_await asio::dispatch(bind_executor(ex_, asio::deferred));
        fmt::println("in receive {}", i_);
        timer_1_.expires_after(50ms);
        co_await timer_1_.async_wait(bind_executor(ex_, asio::deferred));
        fmt::println("out receive {}", i_);
        co_return i_;
    }

    asio::any_io_executor ex_;
    asio::steady_timer timer_1_;
    asio::steady_timer timer_2_;
    int i_ = 0;
};

asio::awaitable<void> TestRead(Serial& s, int i) {
    co_await s.Read(i);
    co_return;
}

int main() {
    asio::io_context io;

    auto strand = asio::make_strand(io);

    Serial s{strand};
    co_spawn(io, TestRead(s, 0), asio::detached);
    co_spawn(io, TestRead(s, 1), asio::detached);

    io.run();
}

Current Output

Program returned: 0
in read 0
in send 0
in read 1
in send 1
out send 1
in receive 1
out receive 1
out read 1

Expected Output

I want operations to fully complete in sequence, with no interleaving:

in read 0
in send 0
out send 0
in receive 0
out receive 0
out read 0
in read 1
in send 1
out send 1
in receive 1
out receive 1
out read 1

What I've Tried

  1. I'm already using a strand to attempt to serialize execution
  2. I've tried using post instead of dispatch
  3. I've tried explicitly binding the executor to each operation

Question

How can I modify my code to ensure that one full call to Read() (including its internal calls to Send() and Receive()) completes before the next call to Read() begins processing?

I want to keep using coroutines if possible, rather than rewriting it to use callbacks or a completely different architecture. I suppose a mutex would fix this, but is there an idiomatic Asio approach to this?

Share Improve this question asked Mar 22 at 16:50 abroekhofabroekhof 8161 gold badge11 silver badges20 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 3

The strand correctly serializes all the functions invoked from it. But since some of these are asynchronous, the completions of them overlap. This is the entire point of asynchronous operations.

How can I modify my code to ensure that one full call to Read() (including its internal calls to Send() and Receive()) completes before the next call to Read() begins processing?

Don't spawn the requests in parallel. Instead of

co_spawn(io, TestRequest(s, 0), asio::detached);
co_spawn(io, TestRequest(s, 1), asio::detached);

do e.g.

co_spawn(
    io,
    [&s]() -> asio::awaitable<void> {
        co_await TestRequest(s, 2);
        co_await TestRequest(s, 3);
    },
    asio::detached);

What if you actually need to accept requests from multiple threads

It starts with good naming. Your read doesn't read. It Sends and Receives in coordination. So, call it DoRequest or similar. Next, make it a composed operation and make sure the requests are queued with their completions. You might hide these in an io_service, but you don't need to.

I'll draw up an example after dinner.

Demo

Creating the demo took longer than anticipated because I ran into a library bug: https://github/chriskohlhoff/asio/issues/1610

Regardless, here's the gist of what I described above:

Live On Coliru

#include <boost/asio.hpp>
#include <deque>
namespace asio = boost::asio;
using namespace std::chrono_literals;

#include <fmt/core.h>
static auto constexpr s_now    = std::chrono::steady_clock::now;
static auto const      s_start = s_now();
thread_local int const t_id    = [] { static std::atomic_int gen = 0; return gen++; }();
std::optional<asio::strand<asio::thread_pool::executor_type>> s_strand;

std::string_view strand_indicator() {
    return s_strand ? s_strand->running_in_this_thread() ? "[STRAND]" : "" : "";
}
#define myprint(...)                                                                                         \
    fmt::println("tid:{:02} at {:>5} ms {} {}", t_id, (s_now() - s_start) / 1ms, strand_indicator(),         \
                 fmt::format(__VA_ARGS__))

class Serial {
  public:
    Serial(asio::any_io_executor ex) : ex_(ex) {};

    using Sig = void(int);
    template <asio::completion_token_for<Sig> Token = asio::deferred_t>
    auto async_request(int i, Token&& token = {}) {
        auto init = [this](auto handler, int i) {
            asio::dispatch(ex_, [this, h = std::move(handler), i]() mutable {
                myprint("POST: queue size {}", queue_.size());
                queue_.emplace_back(i, std::move(h));
                if (queue_.size() == 1)
                    co_spawn(ex_, RequestLoop(), asio::detached);
            });
        };
        return asio::async_initiate<Token, Sig>(std::move(init), token, i);
    }

  private:
    struct Queued {
        int                               data_;
        asio::any_completion_handler<Sig> handler_;
    };

    asio::any_io_executor ex_;
    std::deque<Queued>    queue_;
    int                   i_ = 0;

    static asio::awaitable<void> delay(auto dur = 50ms) {
        co_await asio::steady_timer{co_await asio::this_coro::executor, dur}.async_wait(asio::deferred);
    }

    asio::awaitable<void> RequestLoop() {
        myprint("new RequestLoop ({} remaining)", queue_.size());
        for (; !queue_.empty(); queue_.pop_front()) {
            myprint("in RequestLoop ({} remaining)", queue_.size());
            auto [i, handler] = std::move(queue_.front());

            myprint("in Request {}", i);
            co_await Send(i);
            co_await delay(50ms);
            int j = co_await Receive();

            myprint("out Request {}", j);
            std::move(handler)(j);
        }
    }

    asio::awaitable<void> Send(int i) {
        myprint("in send {}", i);
        i_ = i;
        co_await delay(50ms);
        myprint("out send {}", i);
    }

    asio::awaitable<int> Receive() {
        myprint("in receive {}", i_);
        co_await delay(50ms);
        myprint("out receive {}", i_);
        co_return i_;
    }
};

using SerialPtr = std::shared_ptr<class Serial>;

asio::awaitable<void> TestRequest(SerialPtr s, int i) {
    myprint("in TestRequest {}", i);
    myprint("TestRequest completed {}", co_await s->async_request(i));
}

int main() {
    myprint("start");
    asio::thread_pool io;
    s_strand = make_strand(io);
    auto s   = std::make_shared<Serial>(*s_strand);

    for (auto i = 0; i < 3; ++i) {
        co_spawn(io, TestRequest(s, i), asio::detached);
        // Or directly:
        // co_spawn(io, s.async_request(i, asio::use_awaitable), asio::detached);
    }
    // std::this_thread::sleep_for(1s); // works around Asio 1.34.0 bug
    io.join();
    s_strand.reset();
    myprint("done");
}

With a local live demo:

本文标签: cHow to prevent interleaving of BoostAsio coroutines in a serialized classStack Overflow