admin管理员组文章数量:1405580
Problem
I'm trying to ensure that coroutine calls in my Serial
class are fully executed in order without interleaving. Despite using a strand, operations from different calls are still interleaving.
Here's my code ():
#include <fmt/core.h>
#include <boost/asio.hpp>
namespace asio = boost::asio;
using namespace std::chrono_literals;
class Serial {
public:
Serial(asio::any_io_executor ex)
: ex_(ex), timer_1_{ex}, timer_2_{ex} {};
asio::awaitable<void> Read(int i) {
co_await asio::dispatch(bind_executor(ex_, asio::deferred));
fmt::println("in read {}", i);
co_await Send(i);
timer_2_.expires_after(50ms);
co_await timer_2_.async_wait(bind_executor(ex_, asio::deferred));
int j = co_await Receive();
fmt::println("out read {}", j);
co_return;
}
private:
asio::awaitable<void> Send(int i) {
co_await asio::dispatch(bind_executor(ex_, asio::deferred));
fmt::println("in send {}", i);
i_ = i;
timer_1_.expires_after(50ms);
co_await timer_1_.async_wait(bind_executor(ex_, asio::deferred));
fmt::println("out send {}", i);
co_return;
}
asio::awaitable<int> Receive() {
co_await asio::dispatch(bind_executor(ex_, asio::deferred));
fmt::println("in receive {}", i_);
timer_1_.expires_after(50ms);
co_await timer_1_.async_wait(bind_executor(ex_, asio::deferred));
fmt::println("out receive {}", i_);
co_return i_;
}
asio::any_io_executor ex_;
asio::steady_timer timer_1_;
asio::steady_timer timer_2_;
int i_ = 0;
};
asio::awaitable<void> TestRead(Serial& s, int i) {
co_await s.Read(i);
co_return;
}
int main() {
asio::io_context io;
auto strand = asio::make_strand(io);
Serial s{strand};
co_spawn(io, TestRead(s, 0), asio::detached);
co_spawn(io, TestRead(s, 1), asio::detached);
io.run();
}
Current Output
Program returned: 0
in read 0
in send 0
in read 1
in send 1
out send 1
in receive 1
out receive 1
out read 1
Expected Output
I want operations to fully complete in sequence, with no interleaving:
in read 0
in send 0
out send 0
in receive 0
out receive 0
out read 0
in read 1
in send 1
out send 1
in receive 1
out receive 1
out read 1
What I've Tried
- I'm already using a strand to attempt to serialize execution
- I've tried using
post
instead ofdispatch
- I've tried explicitly binding the executor to each operation
Question
How can I modify my code to ensure that one full call to Read()
(including its internal calls to Send()
and Receive()
) completes before the next call to Read()
begins processing?
I want to keep using coroutines if possible, rather than rewriting it to use callbacks or a completely different architecture. I suppose a mutex would fix this, but is there an idiomatic Asio approach to this?
Problem
I'm trying to ensure that coroutine calls in my Serial
class are fully executed in order without interleaving. Despite using a strand, operations from different calls are still interleaving.
Here's my code (https://godbolt./z/3cW6qzW4r):
#include <fmt/core.h>
#include <boost/asio.hpp>
namespace asio = boost::asio;
using namespace std::chrono_literals;
class Serial {
public:
Serial(asio::any_io_executor ex)
: ex_(ex), timer_1_{ex}, timer_2_{ex} {};
asio::awaitable<void> Read(int i) {
co_await asio::dispatch(bind_executor(ex_, asio::deferred));
fmt::println("in read {}", i);
co_await Send(i);
timer_2_.expires_after(50ms);
co_await timer_2_.async_wait(bind_executor(ex_, asio::deferred));
int j = co_await Receive();
fmt::println("out read {}", j);
co_return;
}
private:
asio::awaitable<void> Send(int i) {
co_await asio::dispatch(bind_executor(ex_, asio::deferred));
fmt::println("in send {}", i);
i_ = i;
timer_1_.expires_after(50ms);
co_await timer_1_.async_wait(bind_executor(ex_, asio::deferred));
fmt::println("out send {}", i);
co_return;
}
asio::awaitable<int> Receive() {
co_await asio::dispatch(bind_executor(ex_, asio::deferred));
fmt::println("in receive {}", i_);
timer_1_.expires_after(50ms);
co_await timer_1_.async_wait(bind_executor(ex_, asio::deferred));
fmt::println("out receive {}", i_);
co_return i_;
}
asio::any_io_executor ex_;
asio::steady_timer timer_1_;
asio::steady_timer timer_2_;
int i_ = 0;
};
asio::awaitable<void> TestRead(Serial& s, int i) {
co_await s.Read(i);
co_return;
}
int main() {
asio::io_context io;
auto strand = asio::make_strand(io);
Serial s{strand};
co_spawn(io, TestRead(s, 0), asio::detached);
co_spawn(io, TestRead(s, 1), asio::detached);
io.run();
}
Current Output
Program returned: 0
in read 0
in send 0
in read 1
in send 1
out send 1
in receive 1
out receive 1
out read 1
Expected Output
I want operations to fully complete in sequence, with no interleaving:
in read 0
in send 0
out send 0
in receive 0
out receive 0
out read 0
in read 1
in send 1
out send 1
in receive 1
out receive 1
out read 1
What I've Tried
- I'm already using a strand to attempt to serialize execution
- I've tried using
post
instead ofdispatch
- I've tried explicitly binding the executor to each operation
Question
How can I modify my code to ensure that one full call to Read()
(including its internal calls to Send()
and Receive()
) completes before the next call to Read()
begins processing?
I want to keep using coroutines if possible, rather than rewriting it to use callbacks or a completely different architecture. I suppose a mutex would fix this, but is there an idiomatic Asio approach to this?
Share Improve this question asked Mar 22 at 16:50 abroekhofabroekhof 8161 gold badge11 silver badges20 bronze badges1 Answer
Reset to default 3The strand correctly serializes all the functions invoked from it. But since some of these are asynchronous, the completions of them overlap. This is the entire point of asynchronous operations.
How can I modify my code to ensure that one full call to Read() (including its internal calls to Send() and Receive()) completes before the next call to Read() begins processing?
Don't spawn the requests in parallel. Instead of
co_spawn(io, TestRequest(s, 0), asio::detached);
co_spawn(io, TestRequest(s, 1), asio::detached);
do e.g.
co_spawn(
io,
[&s]() -> asio::awaitable<void> {
co_await TestRequest(s, 2);
co_await TestRequest(s, 3);
},
asio::detached);
What if you actually need to accept requests from multiple threads
It starts with good naming. Your read
doesn't read. It Send
s and Receive
s in coordination. So, call it DoRequest
or similar. Next, make it a composed operation and make sure the requests are queued with their completions. You might hide these in an io_service, but you don't need to.
I'll draw up an example after dinner.
Demo
Creating the demo took longer than anticipated because I ran into a library bug: https://github/chriskohlhoff/asio/issues/1610
Regardless, here's the gist of what I described above:
Live On Coliru
#include <boost/asio.hpp>
#include <deque>
namespace asio = boost::asio;
using namespace std::chrono_literals;
#include <fmt/core.h>
static auto constexpr s_now = std::chrono::steady_clock::now;
static auto const s_start = s_now();
thread_local int const t_id = [] { static std::atomic_int gen = 0; return gen++; }();
std::optional<asio::strand<asio::thread_pool::executor_type>> s_strand;
std::string_view strand_indicator() {
return s_strand ? s_strand->running_in_this_thread() ? "[STRAND]" : "" : "";
}
#define myprint(...) \
fmt::println("tid:{:02} at {:>5} ms {} {}", t_id, (s_now() - s_start) / 1ms, strand_indicator(), \
fmt::format(__VA_ARGS__))
class Serial {
public:
Serial(asio::any_io_executor ex) : ex_(ex) {};
using Sig = void(int);
template <asio::completion_token_for<Sig> Token = asio::deferred_t>
auto async_request(int i, Token&& token = {}) {
auto init = [this](auto handler, int i) {
asio::dispatch(ex_, [this, h = std::move(handler), i]() mutable {
myprint("POST: queue size {}", queue_.size());
queue_.emplace_back(i, std::move(h));
if (queue_.size() == 1)
co_spawn(ex_, RequestLoop(), asio::detached);
});
};
return asio::async_initiate<Token, Sig>(std::move(init), token, i);
}
private:
struct Queued {
int data_;
asio::any_completion_handler<Sig> handler_;
};
asio::any_io_executor ex_;
std::deque<Queued> queue_;
int i_ = 0;
static asio::awaitable<void> delay(auto dur = 50ms) {
co_await asio::steady_timer{co_await asio::this_coro::executor, dur}.async_wait(asio::deferred);
}
asio::awaitable<void> RequestLoop() {
myprint("new RequestLoop ({} remaining)", queue_.size());
for (; !queue_.empty(); queue_.pop_front()) {
myprint("in RequestLoop ({} remaining)", queue_.size());
auto [i, handler] = std::move(queue_.front());
myprint("in Request {}", i);
co_await Send(i);
co_await delay(50ms);
int j = co_await Receive();
myprint("out Request {}", j);
std::move(handler)(j);
}
}
asio::awaitable<void> Send(int i) {
myprint("in send {}", i);
i_ = i;
co_await delay(50ms);
myprint("out send {}", i);
}
asio::awaitable<int> Receive() {
myprint("in receive {}", i_);
co_await delay(50ms);
myprint("out receive {}", i_);
co_return i_;
}
};
using SerialPtr = std::shared_ptr<class Serial>;
asio::awaitable<void> TestRequest(SerialPtr s, int i) {
myprint("in TestRequest {}", i);
myprint("TestRequest completed {}", co_await s->async_request(i));
}
int main() {
myprint("start");
asio::thread_pool io;
s_strand = make_strand(io);
auto s = std::make_shared<Serial>(*s_strand);
for (auto i = 0; i < 3; ++i) {
co_spawn(io, TestRequest(s, i), asio::detached);
// Or directly:
// co_spawn(io, s.async_request(i, asio::use_awaitable), asio::detached);
}
// std::this_thread::sleep_for(1s); // works around Asio 1.34.0 bug
io.join();
s_strand.reset();
myprint("done");
}
With a local live demo:
本文标签: cHow to prevent interleaving of BoostAsio coroutines in a serialized classStack Overflow
版权声明:本文标题:c++ - How to prevent interleaving of Boost.Asio coroutines in a serialized class? - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744307698a2599878.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论