admin管理员组文章数量:1295903
All I was trying to do was built a basic multithreading application where read and write request are sent in random from the main thread, the read and write thread will pick each of the request and process them accordingly.
I was able to get my code to run but when I don't put the sleep for 500ms after the end of the switch in the process_result, the request even though generated and processed by the main thread in random will execute read, read, read and then write, write, write or vice versa
But adding that sleep fixed it. I need help to understand this behaviour. I do get that unbounded channel have no order of processing so maybe that's why it's behaving that way.
use crossbeam_channel::unbounded; // Import from crossbeam_channel
use rand::prelude::*;
use std::sync::{mpsc, Arc, Mutex};
use std::thread::scope;
use std::time::Duration;
#[derive(Debug, Default)]
enum OpType {
#[default]
Read,
Write(u8),
}
impl OpType {
fn rand_write() -> Self {
let mut rng = rand::thread_rng();
OpType::Write(rng.gen_range(0..100))
}
}
pub fn test_thread3() {
let data = Arc::new(Mutex::new(0u8));
let (read_tx, read_rx) = unbounded::<OpType>();
let (write_tx, write_rx) = unbounded::<OpType>();
let (conn_tx, conn_rx) = mpsc::channel::<OpType>();
// Read thread process
let process_read = || {
println!("Starting Read Thread");
let data = data.clone();
while let Ok(OpType::Read) = read_rx.recv_timeout(Duration::from_millis(100)) {
if let Ok(guard) = data.lock() {
println!("Data = {:?}", *guard);
}
}
};
//write thread process
let process_write = || {
let data = data.clone();
println!("Starting Write Thread");
while let Ok(OpType::Write(b)) = write_rx.recv_timeout(Duration::from_millis(100)) {
if let Ok(mut guard) = data.lock() {
println!("Writing {:?}", b);
*guard = b;
}
}
};
// Main thread process
let process_result = move || {
println!("Main Thread Started");
while let Ok(req) = conn_rx.recv_timeout(Duration::from_millis(50)) {
match req {
OpType::Read => {
let res = read_tx.send(req);
if res.is_err() {
println!("Panicked when reading");
}
}
OpType::Write(_) => {
let res = write_tx.send(req);
if res.is_err() {
println!("Panicked when writing");
}
}
}
}
};
let mut user_request: Vec<OpType> = Vec::new();
for _ in 0..=2 {
user_request.push(OpType::default());
user_request.push(OpType::rand_write());
}
user_request.shuffle(&mut rand::thread_rng());
println!("{:?}", &user_request);
for connection_request in user_request {
conn_tx
.send(connection_request)
.expect("Error Sending Request To The ConnectionPool");
}
_ = scope(|s| {
s.spawn(process_result);
s.spawn(process_read);
s.spawn(process_write);
});
}
fn main() {
test_thread3();
}
Example Result
The current result handle all write requests first, then read requests
expected result should be follow the order of the queue.
[Write(92), Read, Write(52), Write(47), Read, Read]
Main Thread Started
Starting Write Thread
Starting Read Thread
Writing 92
Writing 52
Writing 47
Data = 47
Data = 47
Data = 47
All I was trying to do was built a basic multithreading application where read and write request are sent in random from the main thread, the read and write thread will pick each of the request and process them accordingly.
I was able to get my code to run but when I don't put the sleep for 500ms after the end of the switch in the process_result, the request even though generated and processed by the main thread in random will execute read, read, read and then write, write, write or vice versa
But adding that sleep fixed it. I need help to understand this behaviour. I do get that unbounded channel have no order of processing so maybe that's why it's behaving that way.
use crossbeam_channel::unbounded; // Import from crossbeam_channel
use rand::prelude::*;
use std::sync::{mpsc, Arc, Mutex};
use std::thread::scope;
use std::time::Duration;
#[derive(Debug, Default)]
enum OpType {
#[default]
Read,
Write(u8),
}
impl OpType {
fn rand_write() -> Self {
let mut rng = rand::thread_rng();
OpType::Write(rng.gen_range(0..100))
}
}
pub fn test_thread3() {
let data = Arc::new(Mutex::new(0u8));
let (read_tx, read_rx) = unbounded::<OpType>();
let (write_tx, write_rx) = unbounded::<OpType>();
let (conn_tx, conn_rx) = mpsc::channel::<OpType>();
// Read thread process
let process_read = || {
println!("Starting Read Thread");
let data = data.clone();
while let Ok(OpType::Read) = read_rx.recv_timeout(Duration::from_millis(100)) {
if let Ok(guard) = data.lock() {
println!("Data = {:?}", *guard);
}
}
};
//write thread process
let process_write = || {
let data = data.clone();
println!("Starting Write Thread");
while let Ok(OpType::Write(b)) = write_rx.recv_timeout(Duration::from_millis(100)) {
if let Ok(mut guard) = data.lock() {
println!("Writing {:?}", b);
*guard = b;
}
}
};
// Main thread process
let process_result = move || {
println!("Main Thread Started");
while let Ok(req) = conn_rx.recv_timeout(Duration::from_millis(50)) {
match req {
OpType::Read => {
let res = read_tx.send(req);
if res.is_err() {
println!("Panicked when reading");
}
}
OpType::Write(_) => {
let res = write_tx.send(req);
if res.is_err() {
println!("Panicked when writing");
}
}
}
}
};
let mut user_request: Vec<OpType> = Vec::new();
for _ in 0..=2 {
user_request.push(OpType::default());
user_request.push(OpType::rand_write());
}
user_request.shuffle(&mut rand::thread_rng());
println!("{:?}", &user_request);
for connection_request in user_request {
conn_tx
.send(connection_request)
.expect("Error Sending Request To The ConnectionPool");
}
_ = scope(|s| {
s.spawn(process_result);
s.spawn(process_read);
s.spawn(process_write);
});
}
fn main() {
test_thread3();
}
Example Result
The current result handle all write requests first, then read requests
expected result should be follow the order of the queue.
[Write(92), Read, Write(52), Write(47), Read, Read]
Main Thread Started
Starting Write Thread
Starting Read Thread
Writing 92
Writing 52
Writing 47
Data = 47
Data = 47
Data = 47
Share
Improve this question
edited Feb 12 at 8:29
cafce25
27.6k5 gold badges45 silver badges56 bronze badges
asked Feb 12 at 4:31
Apurba PokharelApurba Pokharel
11 silver badge
4
|
2 Answers
Reset to default 1expected result should be follow the order of the queue.
Why? You're using unbounded channels, so there is a dependency from the read and write workers to the main thread but that's it, aside from that any interleaving depends on the scheduling decision of the OS. Literally the one thing the main worker does is remove any possible dependency between the read and write queues.
So the main thread can go through the entire conn queue filling the read and write queues in one shot, then either the write worker can be scheduled and process their entire queue without wait then the read worker does the same, or the reverse, or there can be interleavings.
I do get that unbounded channel have no order of processing
Channels absolutely have an order of processing, they're strict FIFO. You're the one decorrelating reads and writes by splitting the original queue into a write queue and a read queue with no synchronisation between the two.
And note that even if the read and write queues were rendezvous channels (bounded channels with size 0) you can still get "unexpeced" interleavings, because a rendezvous channel only syncs on the channel interaction, so given the queue [read, write]
:
- main thread sends a read
- read thread retrieves read
- read thread is suspended
- main thread sends a write
- write thread retrieves write
- write thread acquires lock
is a perfectly valid interleaving, and still ends up with the write being performed before the read. It only restricts some interleavings.
First, scope
spawns OS thread, so the OS will handle scheduling, and they will execute a chunk of code before interleaving task thread.
Secondly, the hold locks the mutex, which mean when another thread check for mutex, it has a high chance being locked, and will backoff and sleep. That combine with the first reason, makes the other thread continue.
think of it like this, the mutex is locked not only the lifetime of the guard, but also a few line of code after end of life, since the other thread that's waiting is probably sleeping, and the chance that it immediately wakes up after the release is low, but the current thread locks the mutex again not long after
you can see it by changing size of the request queue, the number of each Op type from 2 to 10000 and you can see that the task executes in parallel, but a chunk at a time. playground
when you add sleep
in the main thread, it create a pause between each message, and force the other two thread wait for the message, and thus release the mutex.
If you want to make sure the mutex is FIFO, you might want to try parking_lot::FairMutex
本文标签: Multithreading in RustStack Overflow
版权声明:本文标题:Multithreading in Rust - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741621085a2388814.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
let data = data.clone();
lines in your threads are useless (and in fact you don't need anArc
at all,data
could be a plainMutex<u8>
). – Jmb Commented Feb 12 at 15:46Arc
is unnecessary because you're using scoped threads, which can access local variables from the calling function.Arc
is only required for regular threads that may live longer than the current function and so can only access'static
or owned data. – Jmb Commented Feb 14 at 13:12